blob: 08c47307675167f57762edbd88b6d8615add8e72 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.jstorm.schedule.default_assign;
import java.util.*;
import java.util.Map.Entry;
import org.apache.commons.lang.math.NumberUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import backtype.storm.Config;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.client.WorkerAssignment;
import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.daemon.supervisor.SupervisorInfo;
import com.alibaba.jstorm.schedule.TopologyAssignContext;
import com.alibaba.jstorm.utils.FailedAssignTopologyException;
import com.alibaba.jstorm.utils.NetWorkUtils;
public class WorkerScheduler {
public static Logger LOG = LoggerFactory.getLogger(WorkerScheduler.class);
private static WorkerScheduler instance;
private WorkerScheduler() {
}
public static WorkerScheduler getInstance() {
if (instance == null) {
instance = new WorkerScheduler();
}
return instance;
}
public List<ResourceWorkerSlot> getAvailableWorkers(
DefaultTopologyAssignContext context, Set<Integer> needAssign,
int allocWorkerNum) {
int workersNum = getAvailableWorkersNum(context);
if (workersNum < allocWorkerNum) {
throw new FailedAssignTopologyException(
"there's no enough worker. allocWorkerNum="
+ allocWorkerNum + ", availableWorkerNum="
+ workersNum);
}
workersNum = allocWorkerNum;
List<ResourceWorkerSlot> assignedWorkers = new ArrayList<ResourceWorkerSlot>();
// userdefine assignments, but dont't try to use custom scheduling for
// TM bolts now.
getRightWorkers(
context,
needAssign,
assignedWorkers,
workersNum,
getUserDefineWorkers(context, ConfigExtension
.getUserDefineAssignment(context.getStormConf())));
// old assignments
if (ConfigExtension.isUseOldAssignment(context.getStormConf())) {
getRightWorkers(context, needAssign, assignedWorkers, workersNum,
context.getOldWorkers());
} else if (context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_REBALANCE
&& context.isReassign() == false) {
int cnt = 0;
for (ResourceWorkerSlot worker : context.getOldWorkers()) {
if (cnt < workersNum) {
ResourceWorkerSlot resFreeWorker = new ResourceWorkerSlot();
resFreeWorker.setPort(worker.getPort());
resFreeWorker.setHostname(worker.getHostname());
resFreeWorker.setNodeId(worker.getNodeId());
assignedWorkers.add(resFreeWorker);
cnt++;
} else {
break;
}
}
}
// calculate rest TM bolts
int workersForSingleTM = 0;
if (context.getAssignSingleWorkerForTM()) {
for (Integer taskId : needAssign) {
String componentName = context.getTaskToComponent().get(taskId);
if (componentName.equals(Common.TOPOLOGY_MASTER_COMPONENT_ID)) {
workersForSingleTM++;
}
}
}
LOG.info("Get workers from user define and old assignments: "
+ assignedWorkers);
int restWokerNum = workersNum - assignedWorkers.size();
if (restWokerNum < 0)
throw new FailedAssignTopologyException(
"Too much workers are needed for user define or old assignments. workersNum="
+ workersNum + ", assignedWokersNum="
+ assignedWorkers.size());
for (int i = 0; i < restWokerNum; i++) {
assignedWorkers.add(new ResourceWorkerSlot());
}
List<SupervisorInfo> isolationSupervisors = this
.getIsolationSupervisors(context);
if (isolationSupervisors.size() != 0) {
putAllWorkerToSupervisor(assignedWorkers,
getResAvailSupervisors(isolationSupervisors));
} else {
putAllWorkerToSupervisor(assignedWorkers,
getResAvailSupervisors(context.getCluster()));
}
this.setAllWorkerMemAndCpu(context.getStormConf(), assignedWorkers);
LOG.info("Assigned workers=" + assignedWorkers);
return assignedWorkers;
}
private void setAllWorkerMemAndCpu(Map conf,
List<ResourceWorkerSlot> assignedWorkers) {
long defaultSize = ConfigExtension.getMemSizePerWorker(conf);
int defaultCpu = ConfigExtension.getCpuSlotPerWorker(conf);
for (ResourceWorkerSlot worker : assignedWorkers) {
if (worker.getMemSize() <= 0)
worker.setMemSize(defaultSize);
if (worker.getCpu() <= 0)
worker.setCpu(defaultCpu);
}
}
private void putAllWorkerToSupervisor(
List<ResourceWorkerSlot> assignedWorkers,
List<SupervisorInfo> supervisors) {
for (ResourceWorkerSlot worker : assignedWorkers) {
if (worker.getHostname() != null) {
for (SupervisorInfo supervisor : supervisors) {
if (NetWorkUtils.equals(supervisor.getHostName(),
worker.getHostname())
&& supervisor.getAvailableWorkerPorts().size() > 0) {
putWorkerToSupervisor(supervisor, worker);
break;
}
}
}
}
supervisors = getResAvailSupervisors(supervisors);
Collections.sort(supervisors, new Comparator<SupervisorInfo>() {
@Override
public int compare(SupervisorInfo o1, SupervisorInfo o2) {
// TODO Auto-generated method stub
return -NumberUtils.compare(
o1.getAvailableWorkerPorts().size(), o2
.getAvailableWorkerPorts().size());
}
});
putWorkerToSupervisor(assignedWorkers, supervisors);
}
private void putWorkerToSupervisor(SupervisorInfo supervisor,
ResourceWorkerSlot worker) {
int port = worker.getPort();
if (!supervisor.getAvailableWorkerPorts().contains(worker.getPort())) {
port = supervisor.getAvailableWorkerPorts().iterator().next();
}
worker.setPort(port);
supervisor.getAvailableWorkerPorts().remove(port);
worker.setNodeId(supervisor.getSupervisorId());
}
private void putWorkerToSupervisor(
List<ResourceWorkerSlot> assignedWorkers,
List<SupervisorInfo> supervisors) {
int allUsedPorts = 0;
for (SupervisorInfo supervisor : supervisors) {
int supervisorUsedPorts = supervisor.getWorkerPorts().size()
- supervisor.getAvailableWorkerPorts().size();
allUsedPorts = allUsedPorts + supervisorUsedPorts;
}
// per supervisor should be allocated ports in theory
int theoryAveragePorts = (allUsedPorts + assignedWorkers.size())
/ supervisors.size() + 1;
// supervisor which use more than theoryAveragePorts ports will be
// pushed overLoadSupervisors
List<SupervisorInfo> overLoadSupervisors = new ArrayList<SupervisorInfo>();
int key = 0;
Iterator<ResourceWorkerSlot> iterator = assignedWorkers.iterator();
while (iterator.hasNext()) {
if (supervisors.size() == 0)
break;
if (key >= supervisors.size())
key = 0;
SupervisorInfo supervisor = supervisors.get(key);
int supervisorUsedPorts = supervisor.getWorkerPorts().size()
- supervisor.getAvailableWorkerPorts().size();
if (supervisorUsedPorts < theoryAveragePorts) {
ResourceWorkerSlot worker = iterator.next();
if (worker.getNodeId() != null)
continue;
worker.setHostname(supervisor.getHostName());
worker.setNodeId(supervisor.getSupervisorId());
worker.setPort(supervisor.getAvailableWorkerPorts().iterator()
.next());
supervisor.getAvailableWorkerPorts().remove(worker.getPort());
if (supervisor.getAvailableWorkerPorts().size() == 0)
supervisors.remove(supervisor);
key++;
} else {
overLoadSupervisors.add(supervisor);
supervisors.remove(supervisor);
}
}
// rest assignedWorkers will be allocate supervisor by deal
Collections.sort(overLoadSupervisors, new Comparator<SupervisorInfo>() {
@Override
public int compare(SupervisorInfo o1, SupervisorInfo o2) {
// TODO Auto-generated method stub
return -NumberUtils.compare(
o1.getAvailableWorkerPorts().size(), o2
.getAvailableWorkerPorts().size());
}
});
key = 0;
while (iterator.hasNext()) {
if (overLoadSupervisors.size() == 0)
break;
if (key >= overLoadSupervisors.size())
key = 0;
ResourceWorkerSlot worker = iterator.next();
if (worker.getNodeId() != null)
continue;
SupervisorInfo supervisor = overLoadSupervisors.get(key);
worker.setHostname(supervisor.getHostName());
worker.setNodeId(supervisor.getSupervisorId());
worker.setPort(supervisor.getAvailableWorkerPorts().iterator()
.next());
supervisor.getAvailableWorkerPorts().remove(worker.getPort());
if (supervisor.getAvailableWorkerPorts().size() == 0)
overLoadSupervisors.remove(supervisor);
key++;
}
}
private void getRightWorkers(DefaultTopologyAssignContext context,
Set<Integer> needAssign, List<ResourceWorkerSlot> assignedWorkers,
int workersNum, Collection<ResourceWorkerSlot> workers) {
Set<Integer> assigned = new HashSet<Integer>();
List<ResourceWorkerSlot> users = new ArrayList<ResourceWorkerSlot>();
if (workers == null)
return;
for (ResourceWorkerSlot worker : workers) {
boolean right = true;
Set<Integer> tasks = worker.getTasks();
if (tasks == null)
continue;
for (Integer task : tasks) {
if (!needAssign.contains(task) || assigned.contains(task)) {
right = false;
break;
}
}
if (right) {
assigned.addAll(tasks);
users.add(worker);
}
}
if (users.size() + assignedWorkers.size() > workersNum) {
LOG.warn(
"There are no enough workers for user define scheduler / keeping old assignment, userdefineWorkers={}, assignedWorkers={}, workerNum={}",
users, assignedWorkers, workersNum);
return;
}
assignedWorkers.addAll(users);
needAssign.removeAll(assigned);
}
private int getAvailableWorkersNum(DefaultTopologyAssignContext context) {
Map<String, SupervisorInfo> supervisors = context.getCluster();
List<SupervisorInfo> isolationSupervisors = this
.getIsolationSupervisors(context);
int slotNum = 0;
if (isolationSupervisors.size() != 0) {
for (SupervisorInfo superivsor : isolationSupervisors) {
slotNum = slotNum + superivsor.getAvailableWorkerPorts().size();
}
} else {
for (Entry<String, SupervisorInfo> entry : supervisors.entrySet()) {
slotNum = slotNum
+ entry.getValue().getAvailableWorkerPorts().size();
}
}
return slotNum;
}
/**
* @param context
* @param workers
* @return
*/
private List<ResourceWorkerSlot> getUserDefineWorkers(
DefaultTopologyAssignContext context, List<WorkerAssignment> workers) {
List<ResourceWorkerSlot> ret = new ArrayList<ResourceWorkerSlot>();
if (workers == null)
return ret;
Map<String, List<Integer>> componentToTask = (HashMap<String, List<Integer>>) ((HashMap<String, List<Integer>>) context
.getComponentTasks()).clone();
if (context.getAssignType() != context.ASSIGN_TYPE_NEW) {
checkUserDefineWorkers(context, workers,
context.getTaskToComponent());
}
for (WorkerAssignment worker : workers) {
ResourceWorkerSlot workerSlot = new ResourceWorkerSlot(worker,
componentToTask);
if (workerSlot.getTasks().size() != 0) {
ret.add(workerSlot);
}
}
return ret;
}
private void checkUserDefineWorkers(DefaultTopologyAssignContext context,
List<WorkerAssignment> workers, Map<Integer, String> taskToComponent) {
Set<ResourceWorkerSlot> unstoppedWorkers = context
.getUnstoppedWorkers();
List<WorkerAssignment> re = new ArrayList<WorkerAssignment>();
for (WorkerAssignment worker : workers) {
for (ResourceWorkerSlot unstopped : unstoppedWorkers) {
if (unstopped
.compareToUserDefineWorker(worker, taskToComponent))
re.add(worker);
}
}
workers.removeAll(re);
}
private List<SupervisorInfo> getResAvailSupervisors(
Map<String, SupervisorInfo> supervisors) {
List<SupervisorInfo> availableSupervisors = new ArrayList<SupervisorInfo>();
for (Entry<String, SupervisorInfo> entry : supervisors.entrySet()) {
SupervisorInfo supervisor = entry.getValue();
if (supervisor.getAvailableWorkerPorts().size() > 0)
availableSupervisors.add(entry.getValue());
}
return availableSupervisors;
}
private List<SupervisorInfo> getResAvailSupervisors(
List<SupervisorInfo> supervisors) {
List<SupervisorInfo> availableSupervisors = new ArrayList<SupervisorInfo>();
for (SupervisorInfo supervisor : supervisors) {
if (supervisor.getAvailableWorkerPorts().size() > 0)
availableSupervisors.add(supervisor);
}
return availableSupervisors;
}
private List<SupervisorInfo> getIsolationSupervisors(
DefaultTopologyAssignContext context) {
List<String> isolationHosts = (List<String>) context.getStormConf()
.get(Config.ISOLATION_SCHEDULER_MACHINES);
LOG.info("Isolation machines: " + isolationHosts);
if (isolationHosts == null)
return new ArrayList<SupervisorInfo>();
List<SupervisorInfo> isolationSupervisors = new ArrayList<SupervisorInfo>();
for (Entry<String, SupervisorInfo> entry : context.getCluster()
.entrySet()) {
if (containTargetHost(isolationHosts, entry.getValue()
.getHostName())) {
isolationSupervisors.add(entry.getValue());
}
}
return isolationSupervisors;
}
private boolean containTargetHost(Collection<String> hosts, String target) {
for (String host : hosts) {
if (NetWorkUtils.equals(host, target) == true) {
return true;
}
}
return false;
}
}