blob: 81a0ad8abcff6b1f5a8e98ba3f619fc6f636ec8b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.scheduler;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.utils.Utils;
public class DefaultScheduler implements IScheduler {
private static Set<WorkerSlot> badSlots(Map<WorkerSlot, List<ExecutorDetails>> existingSlots, int numExecutors, int numWorkers) {
if (numWorkers != 0) {
Map<Integer, Integer> distribution = Utils.integerDivided(numExecutors, numWorkers);
Set<WorkerSlot> slots = new HashSet<WorkerSlot>();
for (Entry<WorkerSlot, List<ExecutorDetails>> entry : existingSlots.entrySet()) {
Integer executorCount = entry.getValue().size();
Integer workerCount = distribution.get(executorCount);
if (workerCount != null && workerCount > 0) {
slots.add(entry.getKey());
workerCount--;
distribution.put(executorCount, workerCount);
}
}
for (WorkerSlot slot : slots) {
existingSlots.remove(slot);
}
return existingSlots.keySet();
}
return null;
}
public static Set<WorkerSlot> slotsCanReassign(Cluster cluster, Set<WorkerSlot> slots) {
Set<WorkerSlot> result = new HashSet<WorkerSlot>();
for (WorkerSlot slot : slots) {
if (!cluster.isBlackListed(slot.getNodeId())) {
SupervisorDetails supervisor = cluster.getSupervisorById(slot.getNodeId());
if (supervisor != null) {
Set<Integer> ports = supervisor.getAllPorts();
if (ports != null && ports.contains(slot.getPort())) {
result.add(slot);
}
}
}
}
return result;
}
public static void defaultSchedule(Topologies topologies, Cluster cluster) {
for (TopologyDetails topology : cluster.needsSchedulingTopologies()) {
List<WorkerSlot> availableSlots = cluster.getAvailableSlots();
Set<ExecutorDetails> allExecutors = topology.getExecutors();
Map<WorkerSlot, List<ExecutorDetails>> aliveAssigned =
EvenScheduler.getAliveAssignedWorkerSlotExecutors(cluster, topology.getId());
Set<ExecutorDetails> aliveExecutors = new HashSet<ExecutorDetails>();
for (List<ExecutorDetails> list : aliveAssigned.values()) {
aliveExecutors.addAll(list);
}
Set<WorkerSlot> canReassignSlots = slotsCanReassign(cluster, aliveAssigned.keySet());
int totalSlotsToUse = Math.min(topology.getNumWorkers(), canReassignSlots.size() + availableSlots.size());
Set<WorkerSlot> badSlots = null;
if (totalSlotsToUse > aliveAssigned.size() || !allExecutors.equals(aliveExecutors)) {
badSlots = badSlots(aliveAssigned, allExecutors.size(), totalSlotsToUse);
}
if (badSlots != null) {
cluster.freeSlots(badSlots);
}
EvenScheduler.scheduleTopologiesEvenly(new Topologies(topology), cluster);
}
}
@Override
public void prepare(Map<String, Object> conf, StormMetricsRegistry metricsRegistry) {
//noop
}
@Override
public void schedule(Topologies topologies, Cluster cluster) {
defaultSchedule(topologies, cluster);
}
@Override
public Map<String, Map<String, Double>> config() {
return Collections.emptyMap();
}
}