blob: 3a50a3f74af0d8329192f9e73b5d18514b22a245 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package backtype.storm.scheduler.multitenant;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import backtype.storm.Config;
import backtype.storm.scheduler.SchedulerAssignment;
import backtype.storm.scheduler.TopologyDetails;
import backtype.storm.scheduler.WorkerSlot;
/**
* A pool of machines that can be used to run isolated topologies
*/
public class IsolatedPool extends NodePool {
private static final Logger LOG = LoggerFactory.getLogger(IsolatedPool.class);
private Map<String, Set<Node>> _topologyIdToNodes = new HashMap<String, Set<Node>>();
private HashMap<String, TopologyDetails> _tds = new HashMap<String, TopologyDetails>();
private HashSet<String> _isolated = new HashSet<String>();
private int _maxNodes;
private int _usedNodes;
public IsolatedPool(int maxNodes) {
_maxNodes = maxNodes;
_usedNodes = 0;
}
@Override
public void addTopology(TopologyDetails td) {
String topId = td.getId();
LOG.debug("Adding in Topology {}", topId);
SchedulerAssignment assignment = _cluster.getAssignmentById(topId);
Set<Node> assignedNodes = new HashSet<Node>();
if (assignment != null) {
for (WorkerSlot ws : assignment.getSlots()) {
Node n = _nodeIdToNode.get(ws.getNodeId());
assignedNodes.add(n);
}
}
_usedNodes += assignedNodes.size();
_topologyIdToNodes.put(topId, assignedNodes);
_tds.put(topId, td);
if (td.getConf().get(Config.TOPOLOGY_ISOLATED_MACHINES) != null) {
_isolated.add(topId);
}
}
@Override
public boolean canAdd(TopologyDetails td) {
// Only add topologies that are not sharing nodes with other topologies
String topId = td.getId();
SchedulerAssignment assignment = _cluster.getAssignmentById(topId);
if (assignment != null) {
for (WorkerSlot ws : assignment.getSlots()) {
Node n = _nodeIdToNode.get(ws.getNodeId());
if (n.getRunningTopologies().size() > 1) {
return false;
}
}
}
return true;
}
@Override
public void scheduleAsNeeded(NodePool... lesserPools) {
for (String topId : _topologyIdToNodes.keySet()) {
TopologyDetails td = _tds.get(topId);
if (_cluster.needsScheduling(td)) {
LOG.debug("Scheduling topology {}", topId);
Set<Node> allNodes = _topologyIdToNodes.get(topId);
Number nodesRequested = (Number) td.getConf().get(Config.TOPOLOGY_ISOLATED_MACHINES);
int slotsToUse = 0;
if (nodesRequested == null) {
slotsToUse = getNodesForNotIsolatedTop(td, allNodes, lesserPools);
} else {
slotsToUse = getNodesForIsolatedTop(td, allNodes, lesserPools, nodesRequested.intValue());
}
// No slots to schedule for some reason, so skip it.
if (slotsToUse <= 0) {
continue;
}
RoundRobinSlotScheduler slotSched = new RoundRobinSlotScheduler(td, slotsToUse, _cluster);
LinkedList<Node> sortedNodes = new LinkedList<Node>(allNodes);
Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC);
LOG.debug("Nodes sorted by free space {}", sortedNodes);
while (true) {
Node n = sortedNodes.remove();
if (!slotSched.assignSlotTo(n)) {
break;
}
int freeSlots = n.totalSlotsFree();
for (int i = 0; i < sortedNodes.size(); i++) {
if (freeSlots >= sortedNodes.get(i).totalSlotsFree()) {
sortedNodes.add(i, n);
n = null;
break;
}
}
if (n != null) {
sortedNodes.add(n);
}
}
}
Set<Node> found = _topologyIdToNodes.get(topId);
int nc = found == null ? 0 : found.size();
_cluster.setStatus(topId, "Scheduled Isolated on " + nc + " Nodes");
}
}
/**
* Get the nodes needed to schedule an isolated topology.
*
* @param td the topology to be scheduled
* @param allNodes the nodes already scheduled for this topology. This will be updated to include new nodes if needed.
* @param lesserPools node pools we can steal nodes from
* @return the number of additional slots that should be used for scheduling.
*/
private int getNodesForIsolatedTop(TopologyDetails td, Set<Node> allNodes, NodePool[] lesserPools, int nodesRequested) {
String topId = td.getId();
LOG.debug("Topology {} is isolated", topId);
int nodesFromUsAvailable = nodesAvailable();
int nodesFromOthersAvailable = NodePool.nodesAvailable(lesserPools);
int nodesUsed = _topologyIdToNodes.get(topId).size();
int nodesNeeded = nodesRequested - nodesUsed;
LOG.debug("Nodes... requested {} used {} available from us {} " + "avail from other {} needed {}", new Object[] { nodesRequested, nodesUsed,
nodesFromUsAvailable, nodesFromOthersAvailable, nodesNeeded });
if ((nodesNeeded - nodesFromUsAvailable) > (_maxNodes - _usedNodes)) {
_cluster.setStatus(topId, "Max Nodes(" + _maxNodes + ") for this user would be exceeded. "
+ ((nodesNeeded - nodesFromUsAvailable) - (_maxNodes - _usedNodes)) + " more nodes needed to run topology.");
return 0;
}
// In order to avoid going over _maxNodes I may need to steal from
// myself even though other pools have free nodes. so figure out how
// much each group should provide
int nodesNeededFromOthers = Math.min(Math.min(_maxNodes - _usedNodes, nodesFromOthersAvailable), nodesNeeded);
int nodesNeededFromUs = nodesNeeded - nodesNeededFromOthers;
LOG.debug("Nodes... needed from us {} needed from others {}", nodesNeededFromUs, nodesNeededFromOthers);
if (nodesNeededFromUs > nodesFromUsAvailable) {
_cluster.setStatus(topId, "Not Enough Nodes Available to Schedule Topology");
return 0;
}
// Get the nodes
Collection<Node> found = NodePool.takeNodes(nodesNeededFromOthers, lesserPools);
_usedNodes += found.size();
allNodes.addAll(found);
Collection<Node> foundMore = takeNodes(nodesNeededFromUs);
_usedNodes += foundMore.size();
allNodes.addAll(foundMore);
int totalTasks = td.getExecutors().size();
int origRequest = td.getNumWorkers();
int slotsRequested = Math.min(totalTasks, origRequest);
int slotsUsed = Node.countSlotsUsed(allNodes);
int slotsFree = Node.countFreeSlotsAlive(allNodes);
int slotsToUse = Math.min(slotsRequested - slotsUsed, slotsFree);
if (slotsToUse <= 0) {
_cluster.setStatus(topId, "Node has partially crashed, if this situation persists rebalance the topology.");
}
return slotsToUse;
}
/**
* Get the nodes needed to schedule a non-isolated topology.
*
* @param td the topology to be scheduled
* @param allNodes the nodes already scheduled for this topology. This will be updated to include new nodes if needed.
* @param lesserPools node pools we can steal nodes from
* @return the number of additional slots that should be used for scheduling.
*/
private int getNodesForNotIsolatedTop(TopologyDetails td, Set<Node> allNodes, NodePool[] lesserPools) {
String topId = td.getId();
LOG.debug("Topology {} is not isolated", topId);
int totalTasks = td.getExecutors().size();
int origRequest = td.getNumWorkers();
int slotsRequested = Math.min(totalTasks, origRequest);
int slotsUsed = Node.countSlotsUsed(topId, allNodes);
int slotsFree = Node.countFreeSlotsAlive(allNodes);
// Check to see if we have enough slots before trying to get them
int slotsAvailable = 0;
if (slotsRequested > slotsFree) {
slotsAvailable = NodePool.slotsAvailable(lesserPools);
}
int slotsToUse = Math.min(slotsRequested - slotsUsed, slotsFree + slotsAvailable);
LOG.debug("Slots... requested {} used {} free {} available {} to be used {}", new Object[] { slotsRequested, slotsUsed, slotsFree, slotsAvailable,
slotsToUse });
if (slotsToUse <= 0) {
_cluster.setStatus(topId, "Not Enough Slots Available to Schedule Topology");
return 0;
}
int slotsNeeded = slotsToUse - slotsFree;
int numNewNodes = NodePool.getNodeCountIfSlotsWereTaken(slotsNeeded, lesserPools);
LOG.debug("Nodes... new {} used {} max {}", new Object[] { numNewNodes, _usedNodes, _maxNodes });
if ((numNewNodes + _usedNodes) > _maxNodes) {
_cluster.setStatus(topId, "Max Nodes(" + _maxNodes + ") for this user would be exceeded. " + (numNewNodes - (_maxNodes - _usedNodes))
+ " more nodes needed to run topology.");
return 0;
}
Collection<Node> found = NodePool.takeNodesBySlot(slotsNeeded, lesserPools);
_usedNodes += found.size();
allNodes.addAll(found);
return slotsToUse;
}
@Override
public Collection<Node> takeNodes(int nodesNeeded) {
LOG.debug("Taking {} from {}", nodesNeeded, this);
HashSet<Node> ret = new HashSet<Node>();
for (Entry<String, Set<Node>> entry : _topologyIdToNodes.entrySet()) {
if (!_isolated.contains(entry.getKey())) {
Iterator<Node> it = entry.getValue().iterator();
while (it.hasNext()) {
if (nodesNeeded <= 0) {
return ret;
}
Node n = it.next();
it.remove();
n.freeAllSlots(_cluster);
ret.add(n);
nodesNeeded--;
_usedNodes--;
}
}
}
return ret;
}
@Override
public int nodesAvailable() {
int total = 0;
for (Entry<String, Set<Node>> entry : _topologyIdToNodes.entrySet()) {
if (!_isolated.contains(entry.getKey())) {
total += entry.getValue().size();
}
}
return total;
}
@Override
public int slotsAvailable() {
int total = 0;
for (Entry<String, Set<Node>> entry : _topologyIdToNodes.entrySet()) {
if (!_isolated.contains(entry.getKey())) {
total += Node.countTotalSlotsAlive(entry.getValue());
}
}
return total;
}
@Override
public Collection<Node> takeNodesBySlots(int slotsNeeded) {
HashSet<Node> ret = new HashSet<Node>();
for (Entry<String, Set<Node>> entry : _topologyIdToNodes.entrySet()) {
if (!_isolated.contains(entry.getKey())) {
Iterator<Node> it = entry.getValue().iterator();
while (it.hasNext()) {
Node n = it.next();
if (n.isAlive()) {
it.remove();
_usedNodes--;
n.freeAllSlots(_cluster);
ret.add(n);
slotsNeeded -= n.totalSlots();
if (slotsNeeded <= 0) {
return ret;
}
}
}
}
}
return ret;
}
@Override
public NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int slotsNeeded) {
int nodesFound = 0;
int slotsFound = 0;
for (Entry<String, Set<Node>> entry : _topologyIdToNodes.entrySet()) {
if (!_isolated.contains(entry.getKey())) {
Iterator<Node> it = entry.getValue().iterator();
while (it.hasNext()) {
Node n = it.next();
if (n.isAlive()) {
nodesFound++;
int totalSlotsFree = n.totalSlots();
slotsFound += totalSlotsFree;
slotsNeeded -= totalSlotsFree;
if (slotsNeeded <= 0) {
return new NodeAndSlotCounts(nodesFound, slotsFound);
}
}
}
}
}
return new NodeAndSlotCounts(nodesFound, slotsFound);
}
@Override
public String toString() {
return "IsolatedPool... ";
}
}