blob: ab0d7af87854646c0972116cc86a1bb402088b2c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.rm.scheduler;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.uima.ducc.common.Node;
import org.apache.uima.ducc.common.NodeIdentity;
import org.apache.uima.ducc.common.persistence.rm.IRmPersistence;
import org.apache.uima.ducc.common.persistence.rm.IRmPersistence.RmNodes;
import org.apache.uima.ducc.common.persistence.rm.RmPersistenceFactory;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.transport.event.common.IDuccTypes.DuccType;
class NodePool
implements SchedConstants
{
static DuccLogger logger = DuccLogger.getLogger(NodePool.class, COMPONENT_NAME);
String id;
NodePool parent = null;
int depth;
int updated = 0;
int search_order = 100;
int share_quantum = 0;
EvictionPolicy evictionPolicy = EvictionPolicy.SHRINK_BY_MACHINE;
HashMap<String, NodePool> children = new HashMap<String, NodePool>(); // key is name of resource class
Map<String, String> subpoolNames = new HashMap<String, String>(); // if a subpool, this names the membership
HashMap<Node, Machine> allMachines = new HashMap<Node, Machine>(); // all active machines in the system
HashMap<Node, Machine> unresponsiveMachines = new HashMap<Node, Machine>(); // machines with excessive missed heartbeats
HashMap<Node, Machine> offlineMachines = new HashMap<Node, Machine>();
HashMap<Integer, HashMap<Node, Machine>> machinesByOrder = new HashMap<Integer, HashMap<Node, Machine>>(); // All schedulable machines, not necessarily free
HashMap<String, Machine> machinesByName = new HashMap<String, Machine>(); // by name, for nodepool support
HashMap<String, Machine> deadByName = new HashMap<String, Machine>(); // anything we move to offline or unresponsive,
// but with the same name we used, because
// sometimes stupid domain gets in the way
HashMap<String, Machine> machinesByIp = new HashMap<String, Machine>(); // by IP, for nodepool support
HashMap<Share, Share> allShares = new HashMap<Share, Share>();
HashMap<Node, Machine> preemptables = new HashMap<Node, Machine>(); // candidates for preemption for reservations
int total_shares = 0;
Map<ResourceClass, ResourceClass> allClasses = new HashMap<ResourceClass, ResourceClass>(); // all the classes directly serviced by me
//
// There are "theoretical" shares based on actual capacities of
// the machines. They are used for the "how much" part of the
// calculations. They aren't correlated with any actual share
// objects.
//
// nMachinesByOrder is initialzed on every scheduling cycle to the number of physical
// machines of each share order in the system. During scheduling, we start giving them
// away, and must reduce the count accordingly. It is an optimization representing the number
// of completely free machines - machines that have not had any resources scheduled against them
// in this cycle.
//
// assume 2x16G, 0x32G, 1x48G, 4x64G machines ------------------------------+
// the arrays look like this for each init of the scheduler: v
int nMachinesByOrder[]; // number of full, free machines of each share order [ 0 2 0 1 4 ] - physical machines
int vMachinesByOrder[]; // number of partial machines, indexed by free space
int nSharesByOrder[]; // shares of each size for each share order [ 0 21 9 5 4 ] - collective N Shares for each order
//int nFreeSharesByOrder[]; // for each order, the theoretical number of shares to give away [ 0 1 0 3 16 ] - free Q shares per order
int nPendingByOrder[]; // number of N-shares with pending evictinos
//int neededByOrder[]; // for each order, how many N-shares do I want to add?
// int shareExpansion[];
Map<Integer, Integer> onlineMachinesByOrder = new HashMap<Integer, Integer>(); // all online machines
// Indexed by available free shares, the specific machines with the indicated free space
HashMap<Integer, Map<Node, Machine>> virtualMachinesByOrder = new HashMap<Integer, Map<Node, Machine>>(); // UIMA-4142
GlobalOrder maxorder = null;
IRmPersistence persistence = null;
boolean canReserve = false; // if we contain a class with policy Reserve, then stuff in this pool is reservable
// NodePool(NodePool parent, String id, EvictionPolicy ep, int order)
// {
// this.parent = parent;
// this.id = id;
// this.evictionPolicy = ep;
// this.depth = 0;
// this.order = order;
// }
NodePool(NodePool parent, String id, Map<String, String> nodes, EvictionPolicy ep, int depth, int search_order, int share_quantum)
{
String methodName = "NodePool.<init>";
this.parent = parent;
this.id = id;
this.subpoolNames = nodes;
if ( nodes == null ) { // unlikely, but not illegal
this.subpoolNames = new HashMap<String, String>();
logger.warn(methodName, null, "Nodepool", id, ": no nodes in node list");
}
this.evictionPolicy = ep;
this.depth = depth;
this.search_order = search_order;
this.share_quantum = share_quantum; // in KB
if ( parent == null ) {
maxorder = new GlobalOrder();
} else {
maxorder = parent.getGlobalOrder();
}
persistence = RmPersistenceFactory.getInstance(this.getClass().getName(), "RM");
}
void addResourceClass(ResourceClass cl)
{ // UIMA-4065
allClasses.put(cl, cl);
if ( cl.getPolicy() == Policy.RESERVE) canReserve = true;
}
NodePool getParent()
{
return this.parent;
}
String getId()
{
return id;
}
int getShareQuantum()
{
return share_quantum;
}
int getDepth()
{
return depth;
}
int countShares()
{
return allShares.size();
}
int countOccupiedShares()
{
int count = allShares.size();
for ( NodePool np : children.values() ) {
count += np.countOccupiedShares();
}
return count;
}
//
// Note, this will only be accurate AFTER reset, but before actuall allocation of
// shares begins. After allocation, and before the next reset this will return junk.
//
// It is intended to be called from ResourceClass.canUseBonus()
// UIMA-4065
int countAssignableShares(int order)
{
String methodName = "countAssignableShares";
// first calculate my contribution
int ret = nSharesByOrder[order];
for (ResourceClass rc : allClasses.values() ) {
int[] gbo = rc.getGivenByOrder();
if ( gbo != null ) {
ret -= gbo[order];
}
}
logger.trace(methodName, null, "Shares available for", id, ":", ret);
// now accumulate the kid's contribution
for ( NodePool np : children.values() ) {
ret += np.countAssignableShares(order);
}
return ret;
}
void removeShare(Share s)
{
allShares.remove(s);
}
boolean containsPoolNode(Node n)
{
// allow the names to be machines or ip addresses
if ( subpoolNames.containsKey( n.getNodeIdentity().getIp() )) return true;
if ( subpoolNames.containsKey( n.getNodeIdentity().getName() )) return true;
return false;
}
/**
* How many do I have, including recusring down the children?
*/
int countMachines()
{
int count = allMachines.size();
for ( NodePool np : children.values() ) {
count += np.countMachines();
}
return count;
}
/**
* How many do I have, including recursing down the children?
*/
int countUnresponsiveMachines()
{
int count = unresponsiveMachines.size();
for ( NodePool np : children.values() ) {
count += np.countUnresponsiveMachines();
}
return count;
}
/**
* How many do I have, just me.
*/
int countLocalUnresponsiveMachines()
{
return unresponsiveMachines.size();
}
/**
* How many do I have, including recusring down the children?
*/
int countOfflineMachines()
{
int count = offlineMachines.size();
for ( NodePool np : children.values() ) {
count += np.countOfflineMachines();
}
return count;
}
/**
* How many do I have, just me.
*/
int countLocalOfflineMachines()
{
return offlineMachines.size();
}
/**
* Return nodes varied off for me and my kids.
* UIMA-4142, RM reconfiguration
*/
Map<Node, Machine> getOfflineMachines()
{
@SuppressWarnings("unchecked")
Map<Node, Machine> ret = (Map<Node, Machine>) offlineMachines.clone();
for (NodePool np : children.values()) {
ret.putAll(np.getOfflineMachines());
}
return ret;
}
/**
* Return nodes varied off for me and my kids.
* UIMA-4234, More info in query occupancy
*/
Map<Node, Machine> getUnresponsiveMachines()
{
@SuppressWarnings("unchecked")
Map<Node, Machine> ret = (Map<Node, Machine>) unresponsiveMachines.clone();
for (NodePool np : children.values()) {
ret.putAll(np.unresponsiveMachines);
}
return ret;
}
/**
* Non-recursive machine count.
*/
int countLocalMachines()
{
return allMachines.size();
}
/**
* Non recursive share count;
*/
int countLocalShares()
{
return total_shares;
}
/**
* Counts just local, for reservations.
*/
int countFreeMachines(int order)
{
int cnt = 0;
HashMap<Node, Machine> mlist = null;
mlist = machinesByOrder.get(order);
if ( mlist == null ) return 0;
for ( Machine m : mlist.values() ) {
if ( isSchedulable(m) && m.isFree() ) {
cnt++;
}
}
return cnt;
}
/**
* Counts all known machines, just me.
*/
int[] countLocalFreeMachines()
{
return nMachinesByOrder.clone();
}
int countTotalShares()
{
int answer = total_shares;
for ( NodePool np : children.values() ) {
answer += np.countTotalShares();
}
return answer;
}
/**
* Total Q shares in the nodepool that are not yet given away in the scheduling cycle.
*/
int countQShares()
{
int count = nSharesByOrder[1];
for ( NodePool np : children.values() ) {
count += np.countQShares();
}
return count;
}
/**
* Total Q shares in the nodepool still available, just me.
*/
int countLocalQShares()
{
return nSharesByOrder[1];
}
int countAllMachinesByOrder(int o)
{
int count = 0;
if ( machinesByOrder.containsKey(o) ) {
count = machinesByOrder.get(o).size();
}
for ( NodePool np : children.values() ) {
count += np.countAllMachinesByOrder(o);
}
return count;
}
int[] countAllLocalMachines()
{
int[] ret = makeArray();
for ( int o : machinesByOrder.keySet() ) {
ret[o] = machinesByOrder.get(o).size();
}
return ret;
}
/**
* Returns N-Shares, recursing down
*/
int countNSharesByOrder(int o)
{
int count = nSharesByOrder[o];
for ( NodePool np : children.values() ) {
count += np.countNSharesByOrder(o);
}
return count;
}
/**
* Returns N-Shares, local
*/
int countLocalNSharesByOrder(int o)
{
return nSharesByOrder[o];
}
/**
* Returns number of N-shares that are still busy but pending eviction.
*/
int countPendingSharesByOrder(int o)
{
int count = nPendingByOrder[o];
for ( NodePool np : children.values() ) {
count += np.countPendingSharesByOrder(o);
}
return count;
}
/**
* Helper for compatibleNodepool(), recurses down children.
* UIMA-4142
*/
private boolean isCompatibleNodepool(Policy p, ResourceClass rc)
{
if ( allClasses.containsKey(rc) ) return true;
for (NodePool np : children.values()) {
if ( np.isCompatibleNodepool(p, rc) ) return true;
}
return false;
}
/**
* Helper for compatibleNodepool(), find the top of the heirarchy.
* UIMA-4142
*/
NodePool findTopOfHeirarchy()
{
NodePool ret = this;
while (ret.getParent() != null) {
ret = ret.getParent();
}
return ret;
}
/**
* Interrogate whether work assigned to the indicated rc could end up here.
*
* If it's a fair-share allocation, we need to interrogate 'me', my children, and
* my ancestors.
*
* If it's something else, it must reside reight here.
*
* This is called during recovery; a change to the class or np config can cause incompatibilities
* with previously scheduled work after a restart.
*
* UIMA-4142
*
* @param p The scheduling policy; determines whether descent into child pools is allowed.
* @param rc The rc to check
* @return true If work scheduled to the RC is compatible.
*/
boolean compatibleNodepool(Policy p, ResourceClass rc)
{
switch ( p ) {
case FAIR_SHARE:
NodePool top = findTopOfHeirarchy();
return top.isCompatibleNodepool(p, rc);
case FIXED_SHARE:
case RESERVE:
if ( allClasses.containsKey(rc) ) return true;
}
return false;
}
int[] cloneNSharesByOrder()
{
int[] cln = nSharesByOrder.clone();
for ( NodePool np : children.values() ) {
int[] subcln = np.cloneNSharesByOrder();
for ( int i = 0; i < cln.length; i++ ) {
cln[i] += subcln[i];
}
}
return cln;
}
int[] cloneVMachinesByOrder()
{
int[] cln = nMachinesByOrder.clone();
for ( int i = 0; i < cln.length; i++ ) {
cln[i] += vMachinesByOrder[i];
}
for ( NodePool np : children.values() ) {
int[] subcln = np.cloneVMachinesByOrder();
for ( int i = 0; i < cln.length; i++ ) {
cln[i] += subcln[i];
}
}
return cln;
}
public GlobalOrder getGlobalOrder()
{
return maxorder;
}
public void updateMaxOrder(int order)
{
maxorder.update(order);
}
public int getMaxOrder()
{
return maxorder.getOrder(); // must always be the same for parent and all children
}
public int getArraySize()
{
return getMaxOrder() + 1; // a bit bigger, because we're 1-indexed for easier counting
// same for parent and children
}
public int[] makeArray() // common static code because getting this right everywhere is painful
{
return new int[getArraySize()];
}
int getSearchOrder()
{
return this.search_order;
}
public Machine getMachine(Node n)
{
Machine m = allMachines.get(n);
if ( m == null ) {
for ( NodePool np : children.values() ) {
m = np.getMachine(n);
if ( m != null ) {
break;
}
}
}
return m;
}
public Machine getMachine(NodeIdentity ni)
{
Machine m = machinesByIp.get(ni.getIp());
if ( m == null ) {
for ( NodePool np : children.values() ) {
m = np.getMachine(ni);
if ( m != null ) break;
}
}
return m;
}
boolean containsMachine(Machine m)
{
Map<Node, Machine> allm = getAllMachines();
return allm.containsKey(m.getNode());
}
@SuppressWarnings("unchecked")
HashMap<Node, Machine> getAllMachinesForPool()
{
return (HashMap<Node, Machine>) allMachines.clone();
}
HashMap<Node, Machine> getAllMachines()
{
@SuppressWarnings("unchecked")
HashMap<Node, Machine> machs = (HashMap<Node, Machine>) allMachines.clone();
for ( NodePool np : children.values() ) {
HashMap<Node, Machine> m = np.getAllMachines();
if ( m != null ) {
machs.putAll(m);
}
}
return machs;
}
HashMap<String, Machine> getMachinesByName()
{
@SuppressWarnings("unchecked")
HashMap<String, Machine> machs = (HashMap<String, Machine>) machinesByName.clone();
for ( NodePool np : children.values() ) {
HashMap<String, Machine> m = np.getMachinesByName();
if ( m != null ) {
machs.putAll(m);
}
}
return machs;
}
HashMap<String, Machine> getMachinesByIp()
{
@SuppressWarnings("unchecked")
HashMap<String, Machine> machs = (HashMap<String, Machine>) machinesByIp.clone();
for ( NodePool np : children.values() ) {
HashMap<String, Machine> m = np.getMachinesByIp();
if ( m != null ) {
machs.putAll(m);
}
}
return machs;
}
@SuppressWarnings("unchecked")
HashMap<Node, Machine> getMachinesByOrder(int order)
{
HashMap<Node, Machine> machs;
if( machinesByOrder.containsKey(order) ) {
machs = (HashMap<Node, Machine>) machinesByOrder.get(order).clone();
} else {
machs = new HashMap<Node, Machine>();
}
for ( NodePool np : children.values() ) {
HashMap<Node, Machine> m = np.getMachinesByOrder(order);
machs.putAll(m);
}
return machs;
}
@SuppressWarnings("unchecked")
Map<Node, Machine> getVirtualMachinesByOrder(int order)
{
Map<Node, Machine> machs;
if( virtualMachinesByOrder.containsKey(order) ) {
HashMap<Node, Machine> tmp = (HashMap<Node, Machine>) virtualMachinesByOrder.get(order);
machs = (HashMap<Node, Machine>) tmp.clone();
} else {
machs = new HashMap<Node, Machine>();
}
// for ( NodePool np : children.values() ) {
// HashMap<Machine, Machine> m = np.getVirtualMachinesByOrder(order);
// machs.putAll(m);
// }
return machs;
}
/**
* Work out the N shares for each share class.
*
* Note: This is a helper class, not made public, and does not need to account for child nodepools.
* If you need the recursion use countOutNSharesByOrder().
*
* Internally, only call this if you mess with the counting arrays. If you call somebody else who
* messes with the counting arrays, leave it to them to call this.
*/
protected void calcNSharesByOrder()
{
int len = nMachinesByOrder.length;
// init nSharesByorder to the sum of 'n and 'v MachinesByOrder
System.arraycopy(nMachinesByOrder, 0, nSharesByOrder, 0, len);
for ( int i = 0; i < getMaxOrder() + 1; i++ ) {
nSharesByOrder[i] += vMachinesByOrder[i];
}
for ( int o = 1; o < len; o++ ) { // counting by share order
//nFreeSharesByOrder[o] = nMachinesByOrder[o] * o;
for ( int p = o+1; p < len; p++ ) {
if ( nSharesByOrder[p] != 0 ) {
nSharesByOrder[o] += (p / o) * nSharesByOrder[p];
}
}
}
}
protected int[] countMachinesByOrder()
{
int[] ans = nMachinesByOrder.clone();
for ( NodePool np : children.values() ) {
int[] tmp = np.countMachinesByOrder();
for ( int i = 0; i < getArraySize(); i++ ) {
ans[i] += tmp[i];
}
}
return ans;
}
protected int[] countVMachinesByOrder()
{
int[] ans = vMachinesByOrder.clone();
for ( NodePool np : children.values() ) {
int[] tmp = np.countVMachinesByOrder();
for ( int i = 0; i < getArraySize(); i++ ) {
ans[i] += tmp[i];
}
}
return ans;
}
protected int[] countLocalVMachinesByOrder()
{
return vMachinesByOrder.clone();
}
protected int[] countAllNSharesByOrder()
{
int[] ans = nSharesByOrder.clone();
for ( NodePool np : children.values() ) {
int[] tmp = np.countAllNSharesByOrder();
for ( int i = 0; i < getArraySize(); i++ ) {
ans[i] += tmp[i];
}
}
return ans;
}
/**
* Common code to connect a share into the system, used when assigning a new
* share (from within NodePool), or when reconnecting a share during job recovery
* (from JobManagerCoverter).
*/
public synchronized void connectShare(Share s, Machine m, IRmJob j, int order)
{
String methodName = "connectShare";
logger.info(methodName, j.getId(), "share", s, "order", order, "machine", m);
j.assignShare(s);
m.assignShare(s);
rearrangeVirtual(m, order, j.getSchedulingPolicy());
allShares.put(s, s);
}
void rearrangeVirtual(Machine m, int order, Policy policy)
{
String methodName = "rearrangeVirtual";
if ( allMachines.containsKey(m.key()) ) {
int v_order = m.getVirtualShareOrder();
int r_order = m.getShareOrder();
// UIMA-4913 Avoid index-outta-bounds when a machine's size changes
// If Share appears bigger than remaining free-space pretend the share is smaller,
// or if share is smaller but is a whole machine reservation, then pretend the
// reservation matches the machine, so in both cases the free space = 0
if (order > v_order) {
logger.warn(methodName, null, m.getId(), "found a share of size", order, "on a machine with only", v_order, "free slots - set free=0");
order = v_order;
} else if (order < v_order && policy == Policy.RESERVE){
logger.warn(methodName, null, m.getId(), "found a RESERVE share of size", order, "on a machine with", v_order, "free slots - set free=0");
order = v_order;
}
logger.trace(methodName, null, m.getId(), "order", order, "v_order", v_order, "r_order", r_order);
if ( v_order == r_order ) {
nMachinesByOrder[r_order]--;
} else {
vMachinesByOrder[v_order]--;
}
Map<Node, Machine> vlist = virtualMachinesByOrder.get(v_order);
if ( vlist == null ) {
// Delivered under UIMA-4275 as that is when I decided to try to avoid NPE here.
//
// This is fatal, the internal records are all wrong. Usually this is because of some
// external snafu, such as mixing and matching ducc clusters on the same broker.
// There's really not much we can do though. There's a good chance that continuing
// will cause NPE elsewhere. Maybe we can just ignore it and let it leak?
logger.error(methodName, null, "ERROR: bad virtual machine list.", m.getId(), "order", order, "v_order", v_order, "r_order", r_order);
return;
}
vlist.remove(m.key());
v_order -= order;
m.setVirtualShareOrder(v_order);
if (v_order != 0 ) {
vlist = virtualMachinesByOrder.get(v_order);
if ( vlist == null ) {
vlist = new HashMap<Node, Machine>();
virtualMachinesByOrder.put(v_order, vlist);
}
vlist.put(m.key(), m);
vMachinesByOrder[v_order]++;
}
calcNSharesByOrder();
} else {
for ( NodePool np : children.values() ) {
np.rearrangeVirtual(m, order, policy);
}
}
}
void accountForShares(HashMap<Share, Share> shares)
{
if ( shares == null ) return;
for ( Share s : shares.values() ) {
int order = s.getShareOrder();
Machine m = s.getMachine();
Policy policy = s.getJob().getSchedulingPolicy();
rearrangeVirtual(m, order, policy);
}
}
/**
* Prepare the structures for scheduling. These get modified in place by the scheduler.
*
* @param order is the hightest order of any job that can potentially get scheduled.
* We need this to insure the tables have sufficient space and we don't get NPEs.
*/
void reset(int order)
{
String methodName = "reset";
//
// TODO: Not all of these are used in every reset cycle. Maybe we should break up the
// reset code so it matches the cycles better. otoh, this isn't a performance-intensive
// scheduler so do we care?
//
updateMaxOrder(order);
logger.info(methodName, null, "Nodepool:", id, "Maxorder set to", getMaxOrder());
nSharesByOrder = makeArray();
nMachinesByOrder = makeArray();
vMachinesByOrder = makeArray();
//nFreeSharesByOrder = new int[maxorder + 1];
//neededByOrder = new int[maxorder + 1];
nPendingByOrder = makeArray();
// UIMA-4142 Must set vMachinesByOrder and virtualMachinesByOrder independently of
// machinesByOrder because blacklisting can cause v_order != r_order
// during reset.
// UIMA-4910 Ignore unusable machines
virtualMachinesByOrder.clear();
for ( Machine m : allMachines.values() ) {
if ( !isSchedulable(m) ) {
continue; // Ignore unusable machines
}
m.resetVirtualShareOrder();
int v_order = m.getVirtualShareOrder();
int r_order = m.getShareOrder();
Map<Node, Machine> ml = null;
if ( v_order == r_order ) {
nMachinesByOrder[r_order]++;
} else {
vMachinesByOrder[v_order]++;
}
ml = virtualMachinesByOrder.get(v_order);
if ( ml == null ) {
ml = new HashMap<Node, Machine>();
virtualMachinesByOrder.put(v_order, ml);
}
ml.put(m.key(), m);
}
// UIMA 4142 this old calc isn't right any more because blacklisting can cause
// v_order != r_order during reset
// virtualMachinesByOrder = new HashMap<Integer, HashMap<Node, Machine>>();
// for ( Integer i : machinesByOrder.keySet() ) {
// @SuppressWarnings("unchecked")
// HashMap<Node, Machine> ml = (HashMap<Node, Machine>) machinesByOrder.get(i).clone();
// virtualMachinesByOrder.put(i, ml);
// nMachinesByOrder[i] = ml.size();
// }
calcNSharesByOrder();
for ( NodePool np : children.values() ) {
np.reset(order);
}
if ( (parent == null) && ( updated > 0 ) ) {
// top-level nodepool will recurse
logger.info(methodName, null, "Scheduling Tables:\n", toString());
updated = 0;
}
}
void resetPreemptables()
{
String methodName = "resetPreemptables";
logger.info(methodName, null, "Resetting preemptables in nodepool", id);
// UIMA-4064 Need to do this recrsively
preemptables.clear();
for ( NodePool np : children.values() ) {
np.resetPreemptables();
}
}
/**
* Return the specified subpool, or me!
*/
NodePool getSubpool(String name)
{
if ( name.equals(id) ) {
return this;
}
for ( NodePool np : children.values() ) {
NodePool ret = np.getSubpool(name);
if (ret != null) return ret;
}
return null;
}
/**
* Do "I" contain the indicated nodepool? More accurately, can "I" access everything in the
* indicated nodepool?
*/
boolean containsSubpool(NodePool np)
{
if ( np == this ) {
return true;
}
for ( NodePool cnp : children.values() ) {
if (cnp.containsSubpool(np)) return true;
}
return false;
}
HashMap<String, NodePool> getChildren()
{
return children;
}
List<NodePool> getChildrenAscending()
{
ArrayList<NodePool> sorted = new ArrayList<NodePool>();
if ( children.size() > 0 ) {
sorted.addAll(children.values());
Collections.sort(sorted, new NodepoolAscendingSorter());
}
return sorted;
}
List<NodePool> getChildrenDescending()
{
ArrayList<NodePool> sorted = new ArrayList<NodePool>();
if ( children.size() > 0 ) {
sorted.addAll(children.values());
Collections.sort(sorted, new NodepoolDescendingSorter());
}
return sorted;
}
/**
* Subpools are always associated with a classname.
*
* We can assume that all node updates are refused until all subpools are created
* so we don't have to worry about updating the pools until nodeArrives(), below.
*/
NodePool createSubpool(String className, Map<String, String> names, int order)
{
NodePool np = new NodePool(this, className, names, evictionPolicy, depth + 1, order, share_quantum);
children.put(className, np);
return np;
}
// private synchronized void incrementOnlineByOrder(int order)
// {
// if ( ! onlineMachinesByOrder.containsKey(order) ) {
// onlineMachinesByOrder.put(order, 1);
// } else {
// onlineMachinesByOrder.put(order, onlineMachinesByOrder.get(order) + 1);
// }
// }
// private synchronized void decrementOnlineByOrder(int order)
// {
// onlineMachinesByOrder.put(order, onlineMachinesByOrder.get(order) - 1);
// }
// synchronized void getLocalOnlineByOrder(int[] ret) // for queries, just me
// {
// for ( int o: onlineMachinesByOrder.keySet() ) {
// ret[o] += onlineMachinesByOrder.get(o);
// }
// }
// synchronized void getOnlineByOrder(int[] ret) // for queries
// {
// for ( int o: onlineMachinesByOrder.keySet() ) {
// ret[o] += onlineMachinesByOrder.get(o);
// }
// for ( NodePool child : children.values() ) {
// child.getOnlineByOrder(ret);
// }
// }
void signalDb(Machine m, RmNodes key, Object value)
{
String methodName = "signalDb";
try {
persistence.setNodeProperty(m.getNode().getNodeIdentity().getName(), key, value);
} catch (Exception e) {
logger.warn(methodName, null, "Cannot update DB property", key, "for machine", m);
}
}
Map<RmNodes, Object> initDbProperties(Machine m)
{
Node n = m.getNode();
NodeIdentity nid = n.getNodeIdentity();
Map<RmNodes, Object> props = new HashMap<RmNodes, Object>();
props.put(RmNodes.Name, nid.getName());
props.put(RmNodes.Ip, nid.getIp());
props.put(RmNodes.Nodepool, id);
props.put(RmNodes.Quantum, share_quantum / ( 1024*1024));
props.put(RmNodes.Memory , m.getMemory() / (1024*1024));
props.put(RmNodes.ShareOrder , m.getShareOrder());
props.put(RmNodes.Blacklisted , m.isBlacklisted());
// init these here, but must be maintained by machine
props.put(RmNodes.Heartbeats , 0);
props.put(RmNodes.SharesLeft , m.countFreeShares()); // qshares remaining
props.put(RmNodes.Assignments , m.countProcesses()); // processes
props.put(RmNodes.NPAssignments, m.countNpShares());
props.put(RmNodes.Reservable , canReserve);
StringBuffer buf = new StringBuffer();
for ( ResourceClass cl : allClasses.keySet() ) {
buf.append(cl.getName());
buf.append(" ");
}
props.put(RmNodes.Classes, buf.toString());
return props;
}
void adjustMachinesByOrder(int neworder, Machine m)
{
int oldorder = m.getShareOrder();
if ( oldorder != neworder ) { // can change. e.g. if it was taken offline for
HashMap<Node, Machine> mlist = machinesByOrder.get(oldorder);
mlist.remove(m.key());
m.setShareOrder(neworder); // hardware changes.
signalDb(m, RmNodes.ShareOrder, neworder); // Jira 4913 Update DB so ducc-mon can show the current size
mlist = machinesByOrder.get(neworder);
if ( mlist == null ) {
mlist = new HashMap<Node, Machine>();
machinesByOrder.put(neworder, mlist);
}
mlist.put(m.key(), m);
}
}
/**
* Handle a new node update.
*/
Machine nodeArrives(Node node, int order)
{
String methodName = "nodeArrives";
// Note: the caller of this method MUST (aka IS REQUIRED) to insure this this is the
// right nodepool as we do not recurse.
updateMaxOrder(order);
String n = node.getNodeIdentity().getName();
// if it's offline it can't be restored like this.
if ( offlineMachines.containsKey(node) ) {
Machine m = offlineMachines.get(node);
logger.trace(methodName, null, "Node ", m.getId(), " is offline, not activating.");
return m;
}
// logger.info(methodName, null, "NODEARRIVES", n, "pass offline Machines");
// if it was dead, then it isn't any more, AND it's mine, so I need to restart it
if ( unresponsiveMachines.containsKey(node) ) { // reactive the node
logger.info(methodName, null, "RECOVER NODE", n);
Machine m = unresponsiveMachines.remove(node); // not unresponsive any more
// Deal with memory on the machine changing
adjustMachinesByOrder(order, m);
// Note: The machine must be on all the other lists by definition since it wasn't taken off when it went offline
signalDb(m, RmNodes.Responsive, true);
logger.info(methodName, null, "Nodepool:", id, "Host reactivated ", m.getId(), "shares", order, m.toString());
return m;
}
// logger.info(methodName, null, "NODEARRIVES", n, "pass unresponsive Machines");
// ok, it is my problem? If so, then it isn't offline or dead, so it's ok, and we're done
if ( allMachines.containsKey(node) ) { // already known, do nothing
Machine m = allMachines.get(node);
// Deal with memory on the machine changing
adjustMachinesByOrder(order, m);
logger.trace(methodName, null, "Node ", m.getId(), " is already known, not adding.");
return m;
}
// logger.info(methodName, null, "NODEARRIVES", n, "pass allMachines");
// If we fall through it's a new one.
Machine machine = new Machine(node); // brand new machine, make it active
Node key = machine.key();
machine.setShareOrder(order);
allMachines.put(key, machine); // global list
machinesByName.put(machine.getId(), machine);
machinesByIp.put(machine.getIp(), machine);
//incrementOnlineByOrder(order);
machine.setNodepool(this);
total_shares += order;
// index it by its share order to make it easier to find
HashMap<Node, Machine> mlist = machinesByOrder.get(order);
if ( mlist == null ) {
mlist = new HashMap<Node, Machine>();
machinesByOrder.put(order, mlist);
}
mlist.put(key, machine);
logger.info(methodName, null, "Nodepool:", id, "Host added:", id, ": ", machine.getId(), "Nodefile:", subpoolNames.get(machine.getId()), // UIMA-4142, add file nodefile
String.format("shares %2d total %4d:", order, total_shares), machine.toString());
updated++;
Map<RmNodes, Object> props = initDbProperties(allMachines.get(key));
props.put(RmNodes.Responsive, true);
props.put(RmNodes.Online, true);
try {
persistence.createMachine(machine.getId(), props);
} catch (Exception e) {
logger.warn(methodName, null, "Cannot write machine to DB:", machine.getId(), e);
}
return machine;
}
/**
* Purge all or some of the work on a machine that has died, or been taken offline
*
* @param m node being removed
* @param removeAll true if all work is to be purged, otherwise just the preemptable work
*
* Ignore unmanaged reservations as they have no ducc-managed work
* Purge just fair-share/preemptable work if being varyed off, or all work if node has died (UIMA-4752)
*/
void disable(Machine m, boolean removeAll)
{
String methodName = "disable";
logger.info(methodName, null, "Nodepool:", id, "Host disabled:", m.getId(), "Looking for shares to clear");
String eventType = removeAll ? "Host dead:" : "Host offline:";
HashMap<Share, Share> shares = m.getActiveShares();
for (Share s : shares.values()) {
IRmJob j = s.getJob();
if ( j.getDuccType() == DuccType.Reservation ) {
logger.info(methodName, null, "Nodepool:", id, eventType, m.getId(), "Not purging", j.getDuccType());
continue;
}
if ( removeAll || j.getSchedulingPolicy() == Policy.FAIR_SHARE ) {
// NOTE: Currently will never get a Pop ... just a Service of type Other !!
logger.info(methodName, j.getId(), "Nodepool:", id, eventType, j.getDuccType(), "purge:", m.getId());
if (j.getDuccType() == DuccType.Service || j.getDuccType() == DuccType.Pop) {
j.markComplete(); // UIMA-4327 Must avoid reallocation, these guys are toast if they get purged.
logger.info(methodName, j.getId(), "Nodepool:", id, eventType, m.getId(), "Mark service/pop completed.");
}
j.shrinkByOne(s); // De-allocate this share
s.purge(); // This bit tells OR not to wait for confirmation from the agent
int order = s.getShareOrder();
nPendingByOrder[order]++;
} else {
logger.info(methodName, j.getId(), "Nodepool:", id, eventType, m.getId(), "Not purging NP work - ", j.getDuccType());
}
}
}
void nodeLeaves(Machine m)
{
// note, simpler than varyoff because we really don't care about unusual
// conditions since there's nobody to tell
if ( allMachines.containsKey(m.key()) ) {
disable(m, true); // Remove all work
unresponsiveMachines.put(m.key(), m);
signalDb(m, RmNodes.Responsive, false);
} else {
for ( NodePool np : children.values() ) {
np.nodeLeaves(m);
}
}
}
// UIMA-4142
// helper for CLI things that refer to things by name only. do we know about anything by this
// name? see resolve() in Scheduler.java.
boolean hasNode(String n)
{
return machinesByName.containsKey(n);
}
NodePool findNodepoolByNodename(String n)
{
if ( hasNode(n) ) {
return this;
} else {
for ( NodePool np : children.values() ) {
NodePool ret = np.findNodepoolByNodename(n);
if ( ret != null ) {
return ret;
}
}
}
return null;
}
private String doVaryOff(String node)
{
// caller must insure node is known to "me"
Machine m = machinesByName.get(node);
if (offlineMachines.containsKey(m.key()) ) {
return "VaryOff: Nodepool " + id + " - Already offline: " + node;
}
if ( unresponsiveMachines.containsKey(m.key()) ) {
// lets be friendly and tell caller it's also unresponsive
offlineMachines.put(m.key(), m);
signalDb(m, RmNodes.Online, false);
return "VaryOff: Nodepool " + id + " - Unresponsive machine, marked offline: " + node;
}
offlineMachines.put(m.key(), m);
disable(m, false); // Remove just pre-emptable work
signalDb(m, RmNodes.Online, false);
return "VaryOff: " + node + " - OK.";
}
String varyoff(String node)
{
// note, vaguely trickier than 'nodeLeaves' because we need to catch the
// potential user confusions and reflect them back.
NodePool np = findNodepoolByNodename(node);
if ( np == null ) {
return "VaryOff: Nodepool " + id + " - Cannot find machine: " + node;
}
// note we only call this if we know for sure the node can be found and associated with a NP
return np.doVaryOff(node); // must direct to the correct context
}
private String doVaryOn(String node)
{
// caller must insure node is known to "me"
Machine m = machinesByName.get(node);
Node key = m.key();
if ( ! offlineMachines.containsKey(key) ) {
return "VaryOn: Nodepool " + id + " - Already online: " + m.getId();
}
offlineMachines.remove(key);
signalDb(m, RmNodes.Online, true);
return "VaryOn: Nodepool " + id + " - Machine marked online: " + node;
}
/**
* We're going to just take it off the offline list and if it happens to come back, fine, it will get picked up
* in nodeArrives as a new machine.
*/
String varyon(String node)
{
NodePool np = findNodepoolByNodename(node);
if ( np == null ) {
return "VaryOff: Nodepool " + id + " - Cannot find machine: " + node;
}
return np.doVaryOn(node); // must pass to the right nodepool, can't do it "here"
}
boolean isSchedulable(Machine m)
{
if ( m.isBlacklisted() ) return false;
if ( unresponsiveMachines.containsKey(m.key()) ) return false;
if ( offlineMachines.containsKey(m.key()) ) return false;
return true;
}
/**
* ------------------------------------------------------------------------------------------
* Routines used during the counting phase
* ------------------------------------------------------------------------------------------
*/
/**
* A quick check to see if there are any machines of the right size. We make a more
* comprehensive check to see if they're usable in countFreeableMachines later. We do this
* so we can try to return an accurate reason for deferral.
*/
int countReservables(IRmJob j)
{
int order = j.getShareOrder();
if ( ! machinesByOrder.containsKey(order) ) return 0;
return machinesByOrder.get(order).size();
}
/**
* Count total physical machines that could accomodate a 'fixed' request that the job
* will fit in.
*/
int countFixable(IRmJob j)
{
int order = j.getShareOrder();
int ret = 0;
for ( int i = order; i < getMaxOrder(); i++ ) {
if ( machinesByOrder.containsKey(order) ) {
ret += machinesByOrder.get(order).size();
}
}
return ret;
}
/**
* Adjust counts for something that takes full machines, like a reservation.
* If "enforce" is set the machine order must match, otherwise we just do best effort to match.
*
* This is intended for use by reservations only; as such it does NOT recurse into child nodepools.
*
* We save some trouble for later by remembering which machines we counted - we wouldn't be
* counting them if we didn't know FOR SURE at this point that we need them.
* Sort on least eviction cost to get the cheapest set of preemptables.
*
* @returns number of machines given
* and updates the table of preemptables
*/
int countFreeableMachines(IRmJob j, int needed)
{
String methodName = "countFreeableMachines";
logger.info(methodName, j.getId(), "Enter nodepool", id, "preemptables.size() =", preemptables.size());
int order = j.getShareOrder();
ArrayList<Machine> machs = new ArrayList<Machine>();
if ( machinesByOrder.containsKey(order) ) {
machs.addAll(machinesByOrder.get(order).values()); // candidates
} else {
return 0; // no candidates
}
StringBuffer sb = new StringBuffer("Machines to search:");
for ( Machine m : machs ) {
sb.append(" ");
sb.append(m.getId());
}
logger.info(methodName, j.getId(), sb.toString());
Collections.sort(machs, new MachineByAscendingEvictionCostSorter());
int given = 0; // total to give, free or freeable
Iterator<Machine> iter = machs.iterator();
ArrayList<Machine> pables = new ArrayList<Machine>();
while ( iter.hasNext() && (given < needed) ) {
Machine m = iter.next();
logger.info(methodName, j.getId(), "Examining", m.getId());
if ( !isSchedulable(m) ) {
logger.info(methodName, j.getId(), "Bypass because machine", m.getId(), "is offline or unresponsive or blacklisted");
continue;
}
if ( preemptables.containsKey(m.key()) ) { // already counted, don't count twice
logger.info(methodName, j.getId(), "Bypass because machine", m.getId(), "already counted.");
continue;
}
if ( m.isFree() ) {
logger.info(methodName, j.getId(), "Giving", m.getId(), "because it is free");
given++;
continue;
}
if ( m.isFreeable() ) {
logger.info(methodName, j.getId(), "Giving", m.getId(), "because it is freeable");
given++;
pables.add(m);
} else {
logger.info(methodName, j.getId(), "Bypass because machine", m.getId(), "is not freeable");
}
}
// Remember how many full machines we need to free up when we get to preemption stage.
for ( Machine m : pables ) {
logger.info(methodName, j.getId(), "Setting up", m.getId(), "for eviction");
preemptables.put(m.key(), m);
nMachinesByOrder[m.getShareOrder()]--;
}
calcNSharesByOrder();
return given;
}
/**
* @param nrequested is number of N shares to remove
* @param order is the order that is affected
*/
int countOutNSharesByOrder(int order, int nrequested)
{
int given = 0; // track count given, for recursion
int rem = 0;
int low = order;
while ( (given < nrequested ) && ( low <= getMaxOrder() ) ) {
int avail = vMachinesByOrder[low] + nMachinesByOrder[low];
if ( avail > 0 ) {
if (vMachinesByOrder[low] > 0 ) {
vMachinesByOrder[low]--;
} else {
nMachinesByOrder[low]--;
}
given++;
rem = low - order;
if ( rem > 0 ) {
vMachinesByOrder[rem]++;
low = Math.max(rem, order);
}
} else {
low++;
}
}
// oops, I can't do this myself, make a child do it.
int k = nrequested - given; // the number of shares we need
if ( k > 0 ) {
Iterator<NodePool> iter = children.values().iterator();
while ( iter.hasNext() && ( k > 0 ) ) {
NodePool np = iter.next();
given += np.countOutNSharesByOrder(order, k);
k = nrequested - given;
}
}
calcNSharesByOrder();
return given;
}
/********************************************************************************************
*
* Routines used in the 'what-of' phase.
*
* All the counting is done - we have to reset all the counts before starting to call any of these.
*
*******************************************************************************************/
/**
* We need to make enough space for 'cnt' full machines.
*
* Returns number of machines that are freeable, up to 'needed', or 0, if we can't get enough.
* If we return 0, we must defer the reservation.
*/
protected int setupPreemptions(int needed, int order)
{
String methodName = "setupPreemptions";
int given = 0;
Iterator<Machine> iter = preemptables.values().iterator();
while ( iter.hasNext() && (given < needed) ) {
Machine m = iter.next();
int o = m.getShareOrder();
if ( order != o ) {
continue;
}
logger.info(methodName, null, "Clearing", m.getId(), "from preemptable list for reservations.");
HashMap<Share, Share> shares = m.getActiveShares();
for ( Share s : shares.values() ) {
if ( s.isPreemptable() ) {
IRmJob j = s.getJob();
j.shrinkByOne(s);
nPendingByOrder[order]++;
} else {
// if the share was evicted or purged we don't care. otherwise, it SHOULD be evictable so we
// log its state to try to figure out why it didn't evict
if ( ! (s.isEvicted() || s.isPurged() ) ) {
IRmJob j = s.getJob();
logger.warn(methodName, j.getId(), "Found non-preemptable share", s.getId(), "fixed:", s.isFixed(),
"j.NShares", j.countNShares(), "j.NSharesGiven", j.countNSharesGiven());
}
}
}
given++;
iter.remove();
}
return given;
}
/**
* Here we have to dig around and find either fully free machines, or machines that we
* can preempt to fully free it.
*/
void findMachines(IRmJob job, ResourceClass rc)
{
String methodName = "findMachines";
int order = job.getShareOrder();
int counted = job.countNSharesGiven(); // allotment from the counter
int current = job.countNShares(); // currently allocated, plus pending, less those removed by earlier preemption
int needed = (counted - current);
logger.info(methodName, job.getId(), "counted", counted, "current", current, "needed", needed, "order", order);
if ( needed <= 0 ) return;
int cnt = countFreeMachines(order);
if ( cnt < needed ) {
// Get the preemptions started
logger.info(methodName, job.getId(), "Setup preemptions. Have", cnt, "free machines, needed", needed);
setupPreemptions(needed-cnt, order);
}
// something awful happened if we throw here.
if ( ! machinesByOrder.containsKey(order) ) { // hosed if this happens
throw new SchedInternalError(job.getId(), "Scheduling counts are wrong - machinesByOrder does not match nMachinesByOrder");
}
// Since all are the same size and only empty ones are considered, no need to sort
//machs = sortedForReservation(machinesByOrder.get(order));
for ( Machine mm : machinesByOrder.get(order).values() ) {
if ( isSchedulable(mm) && mm.isFree() ) {
Share s = new Share(mm, job, mm.getShareOrder());
s.setFixed();
connectShare(s, mm, job, mm.getShareOrder());
if ( --needed == 0 ) break;
}
}
}
/**
* All the jobs passed in here are assigned to this nodepool. In the case of the global nodepool
* they're assigned implicitly; all others are assigned explicitly.
*
* The only tricky bit here is that there may be reservations waiting for us to clear out a full
* machine, in which case we *might* have to over-preempt. The doAdditions code is expected to
* notice this and compensate by adding new allocations on other nodes. Counting is expected
* to guarantee that other nodes will exist, but it may take a preemption cycle or to go clear them.
*
* For the most part this should be pretty stable though.
*/
/**
* Shares come in sorted by largest first. We iterate looking for a combination of shares that
* leaves space of size 'order' free on the machine, using the fewest number of shares evacuated.
*/
ArrayList<Share> evacuateLargest(int order, ArrayList<Share> shares)
{
int found_order = 0;
// special case to avoid running off the end of the list
if ( shares.size() == 1 ) { // terminate recursion at last share
// Need to recheck if it's preemptable - if the job has had other preemptions then
// we need to avoid over-preemptiong.
Share s = shares.get(0);
if ( s.isPreemptable() && (s.getShareOrder() == order) ) {
return shares; // with success
}
return null; // or with failure
}
ArrayList<Share> slist = new ArrayList<Share>();
for ( Share s : shares ) {
found_order = s.getShareOrder();
if ( s.isPreemptable() && (found_order == order) ) { // exact match, end recursion
slist.add(s);
return slist;
}
int new_order = order - found_order; // now looking for next order after removing size of what we just fond
@SuppressWarnings("unchecked")
ArrayList<Share> new_shares = (ArrayList<Share>) shares.clone(); // ... and after removing the share we just found without destroying
new_shares.remove(0); // the incoming list
ArrayList<Share> found_shares = evacuateLargest(new_order, new_shares);
if ( s.isPreemptable() && (found_shares != null) ) { // ... else we just advance to the next and try the search again
slist.add(s); // making progress, end recursion
slist.addAll(found_shares);
return slist;
}
}
return null; // found nothing, heck
}
private void doEvictions(int[] neededByOrder, HashMap<Integer, HashMap<IRmJob, IRmJob>> candidates, boolean force)
{
for ( int nbo = getMaxOrder(); nbo > 0; nbo-- ) {
if ( neededByOrder[nbo] == 0 ) { // these are N-shares
continue;
}
for ( int oo = getMaxOrder(); oo > 0; oo-- ) {
HashMap<IRmJob, IRmJob> jobs = candidates.get(oo);
if ( jobs == null ) {
continue;
}
Iterator<IRmJob> iter = jobs.values().iterator(); // he has something to give. is it enough?
while ( iter.hasNext() && (neededByOrder[nbo] > 0) ) {
IRmJob j = iter.next();
int loss = 0;
switch ( evictionPolicy ) {
case SHRINK_BY_MACHINE:
// minimize fragmentation
loss = j.shrinkByOrderByMachine(neededByOrder[nbo], nbo, force, this); // pass in number of N-shares of given order that we want
// returns number of quantum shares it had to relinquish
break;
case SHRINK_BY_INVESTMENT:
// minimize lost work
loss = j.shrinkByInvestment(neededByOrder[nbo], nbo, force, this); // pass in number of N-shares of given order that we want
// returns number of quantum shares it had to relinquish
break;
}
neededByOrder[nbo] -= loss;
neededByOrder[0] -= loss;
nPendingByOrder[oo] += loss;
if ( j.countNShares() == 0 ) { // nothing left? don't look here any more
iter.remove();
}
}
}
}
}
/**
* Here we tell the NP how much we need cleared up. It will look around and try to do that.
* @deprecated No longer used, the doEvictions code in NodepoolScheduler handles evictions by itself.
* Keeping this for a while for reference. UIMA-4275
*/
void doEvictionsByMachine(int [] neededByOrder, boolean force)
{
String methodName = "doEvictions";
//
// Collect losers that are also squatters, by order, and try them first
//
String type;
type = force ? "forced" : "natural";
logger.debug(methodName, null, getId(), "NeededByOrder", type, "on entrance eviction", Arrays.toString(neededByOrder));
for ( NodePool np : getChildrenDescending() ) {
logger.info(methodName, null, "Recurse to", np.getId(), "from", getId(), "force:", force);
np.doEvictionsByMachine(neededByOrder, force);
logger.info(methodName, null, "Recurse from", np.getId(), "proceed with logic for", getId(), "force", force);
}
//
// Adjust neededByOrder to reflect the number of shares that need to be preempted by subtracting the
// number of shares that already are free
//
for ( int nbo = getMaxOrder(); nbo > 0; nbo-- ) {
// UIMA-4065 - I think that subtracting countPendingSharesByOrder() amounts to double counting because it
// will reflect any evictions from the depth-first recursion. Instead, we would subtract only
// our own shares.
//
// int needed = Math.max(0, neededByOrder[nbo] - countNSharesByOrder(nbo) - countPendingSharesByOrder(nbo));
int needed = Math.max(0, neededByOrder[nbo] - countNSharesByOrder(nbo) - nPendingByOrder[nbo]);
neededByOrder[nbo] = needed;
neededByOrder[0] += needed;
}
logger.debug(methodName, null, getId(), "NeededByOrder", type, "after adjustments for pending eviction:", Arrays.toString(neededByOrder));
HashMap<Integer, HashMap<IRmJob, IRmJob>> squatters = new HashMap<Integer, HashMap<IRmJob, IRmJob>>();
HashMap<Integer, HashMap<IRmJob, IRmJob>> residents = new HashMap<Integer, HashMap<IRmJob, IRmJob>>();
for ( Share s : allShares.values() ) {
HashMap<Integer, HashMap<IRmJob, IRmJob>> map = null;
boolean is_candidate = force ? s.isForceable() : s.isPreemptable();
if ( is_candidate ) {
IRmJob j = s.getJob();
ResourceClass rc = j.getResourceClass();
if ( rc.getNodepoolName().equals(id) ) {
map = residents;
} else {
map = squatters;
}
int order = j.getShareOrder();
HashMap<IRmJob, IRmJob> jmap = null;
if ( map.containsKey(order) ) {
jmap = map.get(order);
} else {
jmap = new HashMap<IRmJob, IRmJob>();
map.put(order, jmap);
}
jmap.put(j, j);
}
}
doEvictions(neededByOrder, squatters, force);
logger.debug(methodName, null, getId(), "NeededByOrder", type, "after eviction of squatters:", Arrays.toString(neededByOrder));
if ( neededByOrder[0] <= 0 ) {
return;
}
doEvictions(neededByOrder, residents, force);
logger.debug(methodName, null, getId(), "NeededByOrder", type, "after eviction of residents:", Arrays.toString(neededByOrder));
}
// For FIXED: find shares ith caps, disallow vertical stacking. UIMA-4712
int findSharesHorizontal( IRmJob j )
{
return findShares(j, true, false);
}
// For FIXED: find shares ith caps, do allow vertical stacking. UIMA-4712
int findSharesVertical( IRmJob j )
{
return findShares(j, true, true);
}
// For FAIR_SHRE: find shares, caller controls caps, allow vertical stacking. UIMA-4712
int findShares( IRmJob j, boolean honorCaps )
{
return findShares(j, honorCaps, true);
}
int findShares( IRmJob j, boolean honorCaps, boolean allowVertical ) // UIMA-4712, allowVertical
{
String methodName = "findShares";
int counted = j.countNSharesGiven(); // allotment from the counter
int current = j.countNShares(); // currently allocated, plus pending, less those removed by earlier preemption
int needed = (counted - current);
int order = j.getShareOrder();
int given = 0;
boolean expansionStopped = false; // UIMA-4275
logger.debug(methodName, j.getId(), "counted", counted, "current", current, "needed", needed, "order", order, "given", given);
if ( needed > 0 ) {
whatof: {
for ( int i = order; i < getArraySize(); i++ ) {
if ( nSharesByOrder[i] == 0 ) {
continue; // nothing here to give
}
Map<Node, Machine> machs = getVirtualMachinesByOrder(i);
ArrayList<Machine> ml = new ArrayList<Machine>();
ml.addAll(machs.values());
for ( Machine m : ml ) { // look for space
if ( !isSchedulable(m) ) continue; // nope
if ( (!allowVertical) && (m.hasVerticalConflict(j)) ) continue; // UIMA-4712
int g = Math.min(needed, m.countFreeShares(order)); // adjust by the order supported on the machine
for ( int ndx= 0; ndx < g; ndx++ ) {
if ( honorCaps && j.exceedsFairShareCap() ) { // UIMA-4275
// can't take any more shares, probably because of caps
expansionStopped = true;
break whatof;
} else {
Share s = new Share(m, j, order);
connectShare(s, m, j, order);
logger.info(methodName, j.getId(), "Connecting new share", s.toString());
//j.assignShare(s);
//m.assignShare(s);
//rearrangeVirtual(m, order);
//allShares.put(s, s);
}
}
given += g;
needed -= g;
if ( needed == 0 ) {
break whatof;
}
}
}
}
//calcNSharesByOrder();
}
if ( (needed > 0) && ( !expansionStopped ) && ( j.getSchedulingPolicy() == Policy.FAIR_SHARE) ) { // UIMA-4275
for ( NodePool np : getChildrenAscending() ) {
StringBuffer sb = new StringBuffer();
for ( NodePool sp : getChildrenAscending() ) {
sb.append(sp.getId());
sb.append(" ");
}
logger.info(methodName, null, np.getId(), "Doing expansions in this order:", sb.toString());
int g = np.findShares(j, honorCaps, allowVertical);
given += g;
needed -= g;
if ( needed == 0 ) {
break;
}
}
}
return given;
}
/**
* Bop through the jobs, and if their current counts exceed their current assignment, find
* something to give them.
*
* It's possible that a job had evictions as a result of clearing space for a reservation -
* so we need to check the count of allocated shares, the count of pending removals, and the
* current share count assignment.
*/
HashMap<IRmJob, IRmJob> doExpansion(List<IRmJob> jobs)
{
String methodName = "doExpansion";
HashMap<IRmJob, IRmJob> expansions = new HashMap<IRmJob, IRmJob>();
StringBuffer sb = new StringBuffer();
sb.append("NP: ");
sb.append(getId());
sb.append(" Expansions in this order: ");
for ( IRmJob j : jobs ) {
if ( j.isCompleted() ) continue; // deal with races while job is completing
j.undefer();
sb.append(j.getId());
sb.append(":");
if ( findShares(j, false) > 0 ) { // always fair-share, so don't do caps yet. UIMA-4712
sb.append("found ");
expansions.put(j, j);
} else {
sb.append("notfound ");
}
if ( j.countNShares() == 0 ) j.setReason("Waiting for preemptions.");
}
logger.info(methodName, null, sb.toString());
return expansions;
}
/**
* This prints garbage unless you call reset() first.
*/
public String toString()
{
StringBuffer sb = new StringBuffer();
sb.append("--------------------------------------------------------------------------------\n");
sb.append("Nodepool ");
sb.append(id);
sb.append(" depth ");
sb.append(depth);
sb.append(": ");
sb.append("\n");
//
// Print the key tables. First the header ...
//
int len = nMachinesByOrder.length;
StringBuffer sbsb = new StringBuffer("%18s ");
for ( int i = 0; i < len; i++ ) {
sbsb.append("%4s ");
}
sbsb.append("\n");
String fmt = sbsb.toString();
Object[] vals = new Object[len + 2];
vals[0] = "Order";
for ( int i = 0; i < len; i++ ) {
vals[i+1] = Integer.toString(i);
}
sb.append(String.format(fmt, vals));
// Now nMachinesByorder
sbsb = new StringBuffer("%18s ");
for ( int i = 0; i < len; i++ ) {
sbsb.append("%4d ");
}
sbsb.append("\n");
fmt = sbsb.toString();
vals[0] = "nMachinesByOrder";
int[] counts = countMachinesByOrder();
for ( int i = 0; i < len; i++ ) {
vals[i+1] = counts[i];
}
sb.append(String.format(fmt, vals));
vals[0] = "vMachinesByOrder";
counts = countVMachinesByOrder();
for ( int i = 0; i < len; i++ ) {
vals[i+1] = counts[i];
}
sb.append(String.format(fmt, vals));
// Now nSharesByorder
vals[0] = "nSharesByOrder";
counts = countAllNSharesByOrder();
for ( int i = 0; i < len; i++ ) {
vals[i+1] = counts[i];
}
sb.append(String.format(fmt, vals));
// Now nFreeByorder
// vals[0] = "nFreeSharesByOrder";
// counts = countFreeSharesByOrder();
// for ( int i = 0; i < len; i++ ) {
// vals[i+1] = counts[i];
// }
// sb.append(String.format(fmt, vals));
sb.append("--------------------------------------------------------------------------------\n");
for ( NodePool np: children.values () ) {
sb.append(np.toString());
}
return sb.toString();
}
public void queryMachines()
{
String methodName = "queryMachines";
ArrayList<Machine> machines = new ArrayList<Machine>();
machines.addAll(getAllMachines().values());
logger.info(methodName, null, "================================== Query Machines Nodepool:", id, "=========================");
StringBuffer buf = new StringBuffer();
buf.append(Machine.getHeader());
buf.append("\n");
buf.append(Machine.getDashes());
buf.append("\n");
Collections.sort(machines, new MachineByOrderSorter());
for ( Machine m : machines) {
buf.append(m.toString());
int remaining = m.countFreeShares();
if ( remaining > 0 ) {
buf.append("[" + m.countFreeShares() + "]");
}
buf.append("\n");
}
logger.info(methodName, null, "\n", buf.toString());
logger.info(methodName, null, "================================== End Query Machines Nodepool:", id, "======================");
}
//
// Order shares by INCREASING preemption cost (all free followed by those with least eviction cost)
// Don't need to check for unschedulable or un-freeable as they will be ignored later.
//
class MachineByAscendingEvictionCostSorter implements Comparator<Machine> {
public int compare(Machine m1, Machine m2) {
if (m1.equals(m2))
return 0;
if (m1.isFree()) {
if (m2.isFree())
return 0;
else
return -1; // m2 not free, m1 to the front of the list
} else if (m2.isFree())
return 1; // m1 not free, m2 to the front of the list
// Sort the lowest eviction cost first
// Since totals are the same, most free shares ==> smallest eviction cost
switch (evictionPolicy) {
case SHRINK_BY_MACHINE :
return m2.countFreeShares() - m1.countFreeShares();
case SHRINK_BY_INVESTMENT :
return m1.getInvestment() - m2.getInvestment();
default:
return 0;
}
}
}
static private class NodepoolAscendingSorter
implements Comparator<NodePool>
{
public int compare(NodePool n1, NodePool n2)
{
return (n1.getSearchOrder() - n2.getSearchOrder());
}
}
static private class NodepoolDescendingSorter
implements Comparator<NodePool>
{
public int compare(NodePool n1, NodePool n2)
{
return (n2.getSearchOrder() - n1.getSearchOrder());
}
}
class InvestmentSorter
implements Comparator<Share>
{
public int compare(Share s1, Share s2)
{
return (int) (s1.getInvestment() - s2.getInvestment()); // lowest investment
// if we're not tracking investment we
// don't care anyway so this works fine
}
}
class DescendingShareOrderSorter
implements Comparator<Share>
{
public int compare(Share s1, Share s2)
{
return (int) (s2.getShareOrder() - s1.getShareOrder());
}
}
class MachineByOrderSorter
implements Comparator<Machine>
{
public int compare(Machine m1, Machine m2)
{
return m2.getShareOrder() - m1.getShareOrder();
}
}
class MachineByAscendingOrderSorter
implements Comparator<Machine>
{
public int compare(Machine m1, Machine m2)
{
return m1.getShareOrder() - m2.getShareOrder();
}
}
class GlobalOrder
{
int maxorder = 0;
GlobalOrder()
{
this.maxorder = 0;
}
synchronized void reset()
{
this.maxorder = 0;
}
synchronized void update(int order)
{
this.maxorder = Math.max(maxorder, order);
}
synchronized int getOrder()
{
return maxorder;
}
}
}