blob: 1715d674a7aef82fd9d23af56bbfbfa7a6a3c62b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.rm.scheduler;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.uima.ducc.common.Node;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.SystemPropertyResolver;
/**
* This implementation of IScheduler is the initial implementation of a scheduler using classes.
*/
public class NodepoolScheduler
implements IScheduler,
SchedConstants
{
DuccLogger logger = DuccLogger.getLogger(NodepoolScheduler.class, COMPONENT_NAME);
Map<ResourceClass, ResourceClass> resourceClasses;
Map<IRmJob, IRmJob> needyJobs = new TreeMap<IRmJob, IRmJob>(new JobByTimeSorter());
NodePool globalNodepool;
Object[] classes;
SchedulingUpdate schedulingUpdate;
EvictionPolicy evictionPolicy = EvictionPolicy.SHRINK_BY_MACHINE;
int fragmentationThreshold = 2;
boolean do_defragmentation = true;
boolean use_global_allotment = true;
int global_allotment = Integer.MAX_VALUE;
int scheduling_quantum;
NodepoolScheduler()
{
}
public void setClasses(Map<ResourceClass, ResourceClass> prclasses)
{
this.resourceClasses = prclasses;
// organize all the priority classes into lists by priority because we process all classes of the
// same priority in a group
//
// This version of the scheduler requires that classes at the same priority MUST be of the same type.
// i.e. we don't support mixed-share at this point.
HashMap<Integer, ArrayList<ResourceClass>> sorter = new HashMap<Integer, ArrayList<ResourceClass>>();
for ( ResourceClass pcl : prclasses.values() ) {
ArrayList<ResourceClass> cl = sorter.get(pcl.getPriority());
if ( cl == null ) {
cl = new ArrayList<ResourceClass>();
sorter.put(pcl.getPriority(), cl);
}
// make sure policies match
if (cl.size() > 0 ) {
ResourceClass prev = cl.get(0);
if ( prev.getPolicy() != pcl.getPolicy() ) {
throw new SchedulingException(null, "Scheduling policy must match for same-priority classes: class " +
prev.getName() + " : " + pcl.getName());
}
}
cl.add(pcl);
}
// now sort the keys
ArrayList<Integer> keys = new ArrayList<Integer>();
keys.addAll(sorter.keySet());
Collections.sort(keys); // "best" (lowest) priority first
// and finally create the "classes" array, ordered by decreasing priority, each element is the
// collection of priority classes at the same priority.
classes = new Object[sorter.size()];
int ndx = 0;
for ( Integer k : keys ) {
classes[ndx++] = sorter.get(k);
}
fragmentationThreshold = SystemPropertyResolver.getIntProperty("ducc.rm.fragmentation.threshold", fragmentationThreshold);
do_defragmentation = SystemPropertyResolver.getBooleanProperty("ducc.rm.defragmentation", do_defragmentation);
use_global_allotment = SystemPropertyResolver.getBooleanProperty("ducc.rm.use_global_allotment", use_global_allotment);
global_allotment = SystemPropertyResolver.getIntProperty("ducc.rm.global_allotment", global_allotment);
}
/**
* Sets the top-level np for this scheduler
*/
public void setNodePool(NodePool np)
{
this.globalNodepool = np;
this.scheduling_quantum = np.getShareQuantum() >> 20; // to GB from KB
}
public void setEvictionPolicy(EvictionPolicy ep)
{
this.evictionPolicy = ep;
}
/**
* Check the allotment for the user, given that we want to allocate
* - nprocs new processes for
* - job j
*/
boolean validSingleAllotment(IRmJob j)
{
String methodName = "validAllotment";
//
// Original design and implementation for UIMA-4275: class based. Subsequent discussion resulted in
// changing to a single global allotment. I'll leave the class-based allotment in for now because
// I suspect it will re-raise its ugly head. (jrc)
//
if ( use_global_allotment ) {
User u = j.getUser();
int lim = u.getOverrideLimit();
if ( lim < 0 ) {
lim = global_allotment;
}
int shares = u.countNPShares();
long sharesInGB = ((shares + j.getShareOrder()) * scheduling_quantum);
if ( sharesInGB > lim ) {
schedulingUpdate.defer(j, "Deferred because allotment of " + lim + "GB is exceeded by user " + j.getUserName());
logger.info(methodName, j.getId(), "Deferred because allotment of " + lim + "GB is exceeded by user " + j.getUserName());
return false;
}
} else {
ResourceClass rc = j.getResourceClass();
if ( rc.allotmentExceeded(j) ) {
schedulingUpdate.defer(j, "Deferred because allotment of " + rc.getAllotment(j) + "GB is exceeded by user " + j.getUserName());
logger.info(methodName, j.getId(), "Deferred because allotment of " + rc.getAllotment(j) + "GB is exceeded by user " + j.getUserName());
return false;
}
}
return true;
}
/**
* This returns the total number of PROCESSES we get to allocate for this job, including those
* already given.
*/
int getAllotmentForJob(IRmJob j)
{
String methodName = "getAllotmentForJob";
User u = j.getUser();
// Let the job ask for the world. This accounts for init cap, prediction, number usable, etc
int order = j.getShareOrder();
int wanted = j.getJobCap(); // in nshares
logger.info(methodName, j.getId(), "Job cap", nSharesToString(wanted, order));
// Find out how many qshares we're allowed
int allotment_in_gb = u.getOverrideLimit();
if ( allotment_in_gb < 0 ) {
allotment_in_gb = global_allotment;
}
int user_allotment = allotment_in_gb / scheduling_quantum; // qshares
// How many qshares we, the user, have used
int allocated = u.countNPShares();
logger.info(methodName, j.getId(), "Current NP allocation for user", allocated, "qshares", (allocated * scheduling_quantum), "GB",
"user_allotment", user_allotment, "user_allotment in GB", allotment_in_gb );
// This is how many QShares we get to allocate for the job
int additional = Math.max(0, user_allotment - allocated);
int additional_processes = additional / order;
logger.info(methodName, j.getId(), "Additional Nshares allowed for request:", nSharesToString(additional_processes, order));
// No shares, so we show deferred
if ( (additional_processes == 0) ) {
if (j.countNShares() == 0) {
// over allotment, never had anything, can get anything so is deferred
schedulingUpdate.defer(j, "Deferred because allotment of " + allotment_in_gb + "GB is exceeded.");
logger.info(methodName, j.getId(), "Deferred because allotment of", allotment_in_gb, "GB is exceeded.");
} else {
logger.info(methodName, j.getId(), "Allotment of", allotment_in_gb, "GB caps request. Return with", allocated, "qshares allocated.");
}
return j.countNShares();
}
int allowed = j.countNShares() + additional_processes;
if ( allowed < wanted ) {
logger.info(methodName, j.getId(), "Capping job on allotment: ", allotment_in_gb + " GB. Remaining allowed nshares [",
allowed, "] wanted [", wanted, "]");
}
logger.info(methodName, j.getId(), "Allowed", nSharesToString(allowed, order), "wanted", nSharesToString(wanted, order));
return Math.min(wanted, allowed);
}
private void reworknShares(int[] vshares, int[] nshares)
{
// now redo nshares
int len = vshares.length;
System.arraycopy(vshares, 0, nshares, 0, len);
for ( int o = 1; o < len; o++ ) { // counting by share order
for ( int p = o+1; p < len; p++ ) {
if ( nshares[p] != 0 ) {
nshares[o] += (p / o) * nshares[p];
}
}
}
//System.out.println("vshares : " + fmtArray(vmachines));
//System.out.println("nshares : " + fmtArray(nshares));
}
/**
* @param nshares is a table showing the number of virtual shares for each order.
* @param count is number of N shares to remove
* @param order is the order that is affected
*/
void doShareSplits(int[] vmach, long count, int order)
{
rsbo : {
for ( int o = order; o < vmach.length; o++ ) {
while ( vmach[o] > 0 ) {
int given = o / order;
int residual = o % order;
if ( count >= given ) { // we give it all away
count -= given;
if ( residual > 0 ) {
vmach[residual]++; // and maybe a leftover
}
} else { // can't give it all away
int leftover = o - ((int)count * order);
vmach[leftover]++;
count = 0;
}
vmach[o] --;
if ( count == 0 ) {
break rsbo;
}
}
}
}
}
/**
* @param nshares is a table showing the number of virtual shares for each order.
* @param count is number of N shares to remove
* @param order is the order that is affected
*/
void removeSharesByOrder(int[] vmach, int[] nshares, long count, int order)
{
if ( count == 0 ) return; // shortcut so we don't have to keep checking in caller
//
// First do the ones that may be able to fill with at most one machine split.
//
for ( int f = 1; (f * order) < vmach.length; f++ ) {
int fo = f * order;
if ( vmach[fo] > 0 ) {
long available = vmach[fo] * f;
long given = Math.min(count, available);
int remaining = (int) (available - given);
vmach[fo] = (remaining * order) / fo;
int residual = (remaining * order) % fo;
if ( residual > 0 ) {
vmach[residual] ++;
}
count -= given;
}
if ( count == 0 ) {
break;
}
}
//
// Now we must do splits if we still need some.
//
if ( count > 0 ) {
doShareSplits(vmach, count, order);
}
reworknShares(vmach, nshares);
}
/**
* Create a string showing virtual and quantum shares, given virtual shares and the order.
* For use in debugging messages.
*/
String nSharesToString(int shares, int order)
{
return String.format("%dQ%d", shares, (shares * order));
}
/**
* Create a string showing virtual and quantum shares, given quantum shares and the order.
* For use in debugging messages.
*/
String qSharesToString(int shares, int order)
{
String o = null;
if ( order == 0 ) {
o = "-";
} else {
o = (Integer.toString(shares/order));
}
return String.format("%sQ%d", o, shares);
}
// /**
// * Return the nodepool for a class, or the global nodepool if none is explicitly associated with the class.
// * @deprecated Remove as soon as it is verified corrcct.
// */
// NodePool getNodepool(ResourceClass rc)
// {
// String id = rc.getNodepoolName();
// if ( id == null ) {
// return globalNodepool;
// }
// return globalNodepool.getSubpool(id);
// }
// ==========================================================================================
// ==========================================================================================
// =========================== REWORKED CODE FOR FAIR SHARE ==================================
// ==========================================================================================
// ==========================================================================================
// /**
// * @deprecated - see rc.geMaxOrder();
// */
// int calculateMaxJobOrder(ArrayList<ResourceClass> rcs)
// {
// int max = 0;
// for ( ResourceClass rc: rcs ) {
// HashMap<Integer, HashMap<IRmJob, IRmJob>> jobs = rc.getAllJobsByOrder();
// for ( int i : jobs.keySet() ) {
// max= Math.max(max, i);
// }
// }
// return max;
// }
private String fmtArray(int[] array)
{
Object[] vals = new Object[array.length];
StringBuffer sb = new StringBuffer();
for ( int i = 0; i < array.length; i++ ) {
sb.append("%3s ");
vals[i] = Integer.toString(array[i]);
}
return String.format(sb.toString(), vals);
}
// UIMA-4275 Don't pass in total shares any more, we used only for class caps, which work differently now.
protected void apportion_qshares(List<IEntity> entities, int[] vshares, String descr)
{
String methodName = "apportion_qshares";
boolean shares_given = false;
int maxorder = globalNodepool.getMaxOrder();
int[] nshares = globalNodepool.makeArray(); // nshares
if ( entities.size() == 0 ) return;
Collections.sort(entities, entities.get(0).getApportionmentSorter());
reworknShares(vshares, nshares);
ArrayList<IEntity> working = new ArrayList<IEntity>();
working.addAll(entities);
HashMap<IEntity, int[]> given_by_order = new HashMap<IEntity, int[]>();
HashMap<IEntity, Integer> deserved = new HashMap<IEntity, Integer>(); // qshares
// This section dealing with RmCounter writes the counting parameters to the log in a form
// that can be cut/pasted into a java properties file. This file can then be used in the
// RmCounter test/deveopment application. It also turns out to be very useful in the log
// in general so it is promoted to 'info' level.
StringBuffer enames = null;
StringBuffer eweights = null;
logger.info(methodName, null, descr, "RmCounter Start");
logger.info(methodName, null, descr, "maxorder = ", globalNodepool.getMaxOrder());
enames = new StringBuffer();
eweights = new StringBuffer();
for ( IEntity e : working ) {
int[] gbo = globalNodepool.makeArray();
e.setGivenByOrder(gbo);
given_by_order.put(e, gbo);
deserved.put(e, 0);
// jrc e.initWantedByOrder();
enames.append(e.getName());
enames.append(" ");
eweights.append(Integer.toString(e.getShareWeight()));
eweights.append(" ");
}
logger.info(methodName, null, descr, "entity_names = ", enames.toString());
logger.info(methodName, null, descr, "weights = ", eweights.toString());
for ( IEntity e : working ) {
logger.info(methodName, null, descr, "wantedby." + e.getName() + " = ", fmtArray(e.getWantedByOrder()));
}
logger.info(methodName, null, descr, "vmachines =", fmtArray(vshares));
logger.info(methodName, null, descr, "RmCounter End");
int pass = 0;
do {
// Starting at highest order, give full fair share to any entity that wants it, minus the
// shares already given. Remove the newly given shares and trickle down the fragments.
logger.trace(methodName, null, descr, "----------------------- Pass", (pass++), "------------------------------");
logger.trace(methodName, null, descr, "vshares", fmtArray(vshares));
logger.trace(methodName, null, descr, "nshares", fmtArray(nshares));
shares_given = false;
HashMap<IEntity, Integer> given_per_round = new HashMap<IEntity, Integer>(); // qshares
int allweights = 0;
for ( IEntity e : working ) {
allweights += e.getShareWeight();
given_per_round.put(e, 0);
}
//
// work out deserved for everybody based on what's still around.
//
int all_qshares = nshares[1];
for ( IEntity e : working ) {
int base_fs = (int) Math.floor(nshares[1] * ( (double) e.getShareWeight() / allweights ));
double d_base_fs = nshares[1] * ( (double) e.getShareWeight() / allweights );
deserved.put(e, base_fs);
all_qshares -= base_fs;
logger.trace(methodName, null, descr, e.getName(), "Wanted :", fmtArray(e.getWantedByOrder()));
logger.trace(methodName, null, descr, e.getName(), "deserved:", base_fs, d_base_fs);
}
logger.trace(methodName, null, descr, "Leftover after giving deserved:" + all_qshares);
if ( all_qshares > 0 ) {
for ( IEntity e: working ) {
deserved.put(e, deserved.get(e) + 1);
all_qshares--;
if ( all_qshares == 0 ) break;
}
}
for ( IEntity e : working ) {
logger.trace(methodName, null, descr, String.format("Final deserved by %15s: int[%3d] (after bonus)", e.getName(), deserved.get(e)));
}
for ( int o = maxorder; o > 0; o--) {
int total_taken = 0; // nshares
if ( nshares[o] == 0 ) {
logger.trace(methodName, null, descr, "O " + o + " no shares to give, moving on.");
continue;
}
for ( IEntity e : working ) {
int[] wbo = e.getWantedByOrder(); // processes - NShares
int[] gbo = given_by_order.get(e); // NShares
if ( wbo[o] == 0 ) {
logger.trace(methodName, null, descr, "O", o, "Entity", e.getName(), "nothing wanted at this order, moving on.");
continue;
}
double dgiven = nshares[o] * ((double) e.getShareWeight() / allweights) * o; // QShares for base calcs
int des = deserved.get(e); // total deserved this round QShares
int gpr = given_per_round.get(e); // total given this round
int mpr = Math.max(0, des-gpr); // max this round, deserved less what I aleady was given
//int tgiven = Math.min(mpr, (int) Math.floor(dgiven)); // what is calculated, capped by what I alreay have
// UIMA-4275, floor to ciel. Below with tgiven and rgiven we deal with ther emainder also. The floor plus the residual below
// seemed really agressive and some small allocation were working out to 0 when they shouldn't
int tgiven = Math.min(mpr, (int) Math.ceil(dgiven)); // what is calculated, capped by what I alreay have
int cap = e.calculateCap(); // get caps, if any, in qshares (simplified in UIMA-4275)
logger.trace(methodName, null, descr, "O", o, ":", e.getName(), "Before caps, given", tgiven, "cap", cap);
if ( gbo[0] >= cap ) { // UIMA-4275
logger.trace(methodName, null, descr, "O", o, "Entity", e.getName(), "cap prevents further allocation.");
continue;
}
int given = tgiven / o; // tentatively given, back to NShares
int rgiven = tgiven % o; // residual - remainder
//int twanted = wbo[0] + gbo[0]; // actual wanted: still wanted plus alredy given
// if ( twanted <= fragmentationThreshold ) { // if under the defrag limit, round up
if ( (rgiven > 0) && ( given == 0) ) {
given = Math.min( ++given, nshares[o] ); // UIMA-3664
}
// } // if not under the defrag limit, round down
if ( given + gbo[0] > cap ) { // adjust for caps
given = Math.max(0, cap - gbo[0]);
}
int taken = Math.min(given, wbo[o]); // NShares
taken = Math.min(taken, nshares[o] - total_taken); // cappend on physical (in case rounding overcommitted)
logger.trace(methodName, null, descr,
"O", o, ":", e.getName(), "After caps,",
" dgiven Q[", dgiven,
"] given N[", given ,
"] taken N[", taken ,
"]");
gbo[o] += taken;
gbo[0] += taken;
wbo[o] -= taken;
wbo[0] -= taken;
total_taken += taken;
given_per_round.put(e, given_per_round.get(e) + (taken*o));
}
if ( total_taken > 0 ) shares_given = true;
removeSharesByOrder(vshares, nshares, total_taken, o);
// If you were given all you deserve this round, then pull the weight so you don't
// dilute the giving for shares you aren't owed.
Iterator<IEntity> iter = working.iterator();
while ( iter.hasNext() ) {
IEntity e = iter.next();
int des = deserved.get(e);
int gpr = given_per_round.get(e);
int got_all = Math.max(0, des - gpr);
if ( got_all == 0 ) {
allweights -= e.getShareWeight();
}
}
if ( allweights <=0 ) break; // JRC JRC
}
// Remove entities that have everything they want or could otherwise get
Iterator<IEntity> iter = working.iterator();
while ( iter.hasNext() ) {
IEntity e = iter.next();
if ( (e.getWantedByOrder()[0] == 0) || (e.getGivenByOrder()[0] >= e.calculateCap()) ) { // UIMA-4275, checking fair-share cap
// logger.info(methodName, null, descr, e.getName(), "reaped, nothing more wanted:", fmtArray(e.getWantedByOrder()));
iter.remove();
}
}
// Remove entities that can't get anythng more. This is important to get better convergence - otherwise
// the "spectator" jobs will pollute the fair-share and convergence will be much harder to achieve.
iter = working.iterator();
while ( iter.hasNext() ) {
IEntity e = iter.next();
int[] wbo = e.getWantedByOrder();
boolean purge = true;
for ( int o = maxorder; o > 0; o-- ) {
if ( (wbo[o] > 0) && (nshares[o] > 0) ) { // if wants something, and resources still exist for it ...
purge = false; // then no purge
break;
}
}
if ( purge ) {
//logger.info(methodName, null, descr, e.getName(), "reaped, nothing more usablee:", fmtArray(e.getWantedByOrder()), "usable:",
// fmtArray(nshares));
iter.remove();
}
}
if ( logger.isTrace() ) {
logger.trace(methodName, null, descr, "Survivors at end of pass:");
for ( IEntity e : working ) {
logger.trace(methodName, null, descr, e.toString());
}
}
} while ( shares_given );
if ( logger.isTrace() ) {
logger.info(methodName, null, descr, "Final before bonus:");
for ( IEntity e : entities ) {
int[] gbo = e.getGivenByOrder();
logger.info(methodName, null, descr, String.format("%12s %s", e.getName(), fmtArray(gbo)));
}
}
//
// A final pass, in case something was left behind due to rounding.
// These are all bonus shares. We'll give preference to the "oldest"
// entities. But only one extra per pass, in order to evenly distribute.
//
// Technically, we might want to distribute the according to the
// entity weights, but we're not doing that (assuming all weights are 1).
//
boolean given = true;
//int bonus = 0;
while ( (nshares[1] > 0) && (given)) {
given = false;
for ( IEntity e : entities ) {
//int[] wbo = e.getWantedByOrder(); // nshares
int[] gbo = e.getGivenByOrder(); // nshares
for ( int o = maxorder; o > 0; o-- ) {
// the entity access its wbo, gbo, and entity-specific knowledge to decide whether
// the bonus is usable. if so, we give out exactly one in an attempt to spread the wealth.
//
// An example of where you can't use, is a class over a nodepool whose resources
// are exhausted, in which case we'd loop and see if anybody else was game.
// UIMA-4065
while ( (e.canUseBonus(o) ) && (vshares[o] > 0) ) {
gbo[o]++;
gbo[0]++;
removeSharesByOrder(vshares, nshares, 1, o);
given = true;
break;
}
}
// UIMA-4605 - old 'bonus' code -- keep for a while as quick reference
// for ( int o = maxorder; o > 0; o-- ) {
// int canuse = wbo[o] - gbo[o];
// while ( (canuse > 0 ) && (vshares[o] > 0) ) {
// gbo[o]++;
// //bonus++;
// canuse = wbo[o] - gbo[o];
// removeSharesByOrder(vshares, nshares, 1, o);
// given = true;
// break;
// }
// }
}
}
logger.debug(methodName, null, descr, "Final apportionment:");
for ( IEntity e : entities ) {
int[] gbo = e.getGivenByOrder(); // nshares
logger.debug(methodName, null, descr, String.format("%12s gbo %s", e.getName(), fmtArray(gbo)));
}
logger.debug(methodName, null, descr, "vshares", fmtArray(vshares));
logger.debug(methodName, null, descr, "nshares", fmtArray(nshares));
// if ( bonus > 0 ) {
// logger.debug(methodName, null, descr, "Final after bonus:");
// for ( IEntity e : entities ) {
// int[] gbo = e.getGivenByOrder(); // nshares
// logger.debug(methodName, null, descr, String.format("%12s %s", e.getName(), fmtArray(gbo)));
// }
// logger.debug(methodName, null, descr, "vshares", fmtArray(vshares));
// logger.debug(methodName, null, descr, "nshares", fmtArray(nshares));
// } else {
// logger.debug(methodName, null, descr, "No bonus to give.");
// }
}
/**
* Count out shares for only the jobs in the ResouceClasses here, and only from the given
* nodepool.
*/
protected void countClassShares(NodePool np, List<ResourceClass> rcs)
{
String methodName = "countClassShares";
if ( logger.isDebug() ) {
StringBuffer sb = new StringBuffer("Counting for nodepool ");
sb.append(np.getId());
sb.append(" - classes -");
for ( ResourceClass rc : rcs ) {
sb.append(" ");
sb.append(rc.getName());
}
logger.debug(methodName, null, sb.toString());
}
// if ( true ) return;
// pull the counts. these don't get updated by the counting routines per-se. after doing counting the np's are
// expected to do the 'what-of' calculations that do acutall allocation and which update the counts
int[] vshares = np.cloneVMachinesByOrder();
ArrayList<IEntity> l = new ArrayList<IEntity>();
l.addAll(rcs);
for ( IEntity e : l ) {
e.initWantedByOrder((ResourceClass) e);
}
apportion_qshares((List<IEntity>) l, vshares, methodName); // UIMA-4275, remove 'total'
int sum_of_weights = 0;
for ( ResourceClass rc : rcs ) {
sum_of_weights += rc.getShareWeight(); // see next loop
}
//
// If we have to do a "final eviction" because of fragmentation (evict stuff even though
// it's below it's calcualted counts), we want to know the pure, unmodified fair share
// for each rc. Here is a great place to do that calculation.
//
for ( ResourceClass rc : rcs ) {
int fair_share = (int) Math.floor(np.countTotalShares() * ( (double) rc.getShareWeight() / sum_of_weights ));
rc.setPureFairShare(fair_share);
}
}
/**
* Count out shares for only the jobs in the ResouceClasses here, and only from the given
* nodepool.
*/
protected boolean countUserShares(ResourceClass rc)
{
String methodName = "countUserShares";
HashMap<IRmJob, IRmJob> allJobs = rc.getAllJobs();
if ( allJobs.size() == 0 ) {
return false; // nothing to do ...
}
// if ( rc.getName().equals("urgent") ) {
// int stop_here = 1;
// stop_here++;
// }
int[] vshares = rc.getGivenByOrder(); // assigned in countClassShares
HashMap<User, User> users = new HashMap<User, User>(); // get a map of all users in this class by examining the curent jobs
for ( IRmJob j : allJobs.values() ) {
User u = j.getUser();
if ( users.containsKey(u) ) continue;
u.initWantedByOrder(rc);
users.put(u, u);
}
ArrayList<IEntity> l = new ArrayList<IEntity>();
l.addAll(users.values());
apportion_qshares((List<IEntity>) l, vshares, methodName); // UIMA-4275 remove 'total'
//
// For final eviction if needed, calculate the pure uncapped un-bonused count for
// each user.
//
int pure_share = rc.getPureFairShare();
int fs = (int) Math.floor((double) pure_share / users.size());
for ( User u : users.values() ) {
u.setPureFairShare(fs);
}
return true;
}
private void countJobShares(ResourceClass rc)
{
String methodName = "countJobShares";
HashMap<User, HashMap<IRmJob,IRmJob>> userJobs = rc.getAllJobsByUser();
for ( User u : userJobs.keySet() ) {
HashMap<IRmJob, IRmJob> jobs = userJobs.get(u);
if ( jobs.size() == 0 ) {
continue;
}
int[] vshares = u.getGivenByOrder();
ArrayList<IEntity> l = new ArrayList<IEntity>();
l.addAll(jobs.values());
for ( IEntity e : l ) {
e.initWantedByOrder(rc);
}
apportion_qshares((List<IEntity>) l, vshares, methodName); // UIMA-4275, remove "total"
//
// For final eviction if needed, calculate the pure uncapped un-bonused count for
// each user.
//
int pure_share = u.getPureFairShare();
int fs = (int) Math.floor((double) pure_share / jobs.size());
for ( IRmJob j : jobs.values() ) {
j.setPureFairShare(fs);
}
}
}
/**
* Find the set of classes from the presented set of elibible classes that have jobs in
* the given nodepool. UIMA-4065
*
* @param np Relevent nodepool
* @param eligible (Possibly restricted) set of classes that **might** have jobs in the nodepool
* @return List of classes with jobs in the nodepool
*/
private List<ResourceClass> gatherRcs(NodePool np, List<ResourceClass> eligible)
{
ArrayList<ResourceClass> ret = new ArrayList<ResourceClass>();
String npn = np.getId();
for ( ResourceClass rc : eligible ) {
String rcnpn = rc.getNodepoolName();
if ( rcnpn == null ) continue;
if ( rc.countJobs() == 0 ) continue;
if ( rcnpn.equals(npn) ) ret.add(rc);
}
return ret;
}
/**
* Do a depth-first traversal of the nodepool calculating counts for all the jobs in the nodepool and its children.
*
* Starting at each leaf NP, gather all the classes that have jobs in the NP, and if there are any, get class counts
* for the classes. Pass the classes up to the caller who can do a more global recount if necessary. If there are
* no jobs over the nodepool then bypass the count for it, of course. This is a rewrite of the original which did
* not properly handle the recursion past 1 level (root + 1). It uses gatherRcs() as a helper to find relevent classes.
* jrc 2014-11-05. UIMA-4605
*
* Note that this is tricky - please make sure you understand all the logic in countClassShares before changing
* anything.
*
* @param np
* @param eligible
* @return List of classes with potential counts.
*/
protected List<ResourceClass> traverseNodepoolsForCounts(NodePool np, List<ResourceClass> eligible)
{
//String methodName = "traverseNodepoolsForCounts";
List<ResourceClass> myRcs = gatherRcs(np, eligible); // the resource classes for NodePool np
boolean hasJobs = (myRcs.size() > 0); // do I have jobs for this np?
List<NodePool> subpools = np.getChildrenAscending(); // now recurse down to leaves from here
for ( NodePool subpool : subpools ) {
List<ResourceClass> subrc = traverseNodepoolsForCounts(subpool, eligible);
myRcs.addAll(subrc);
}
// now do our fs, if there are jobs resident in this np
if ( hasJobs ) countClassShares(np, myRcs);
return myRcs; // return aggregated classes to caller
}
// /**
// * Depth-first traversal of the nodepool. Once you get to a leaf, count the shares. This sets an
// * upper-bound on the number of shares a class can have. As you wind back up the tree the counts may
// * be reduced because of competition from jobs in the parent node. By the time we're done we should have
// * accounted for all jobs and all usable resources.
// *
// * Note how this works:
// * Consider a configuration with two nodepools plus global, A, B, and G. Suppose nodepools A and B have
// * 30 shares each and G only has 10 shares. G can apportion over it's 10, plus the 60 from A and B. So
// * after apporioning over A and B we need to do fair-share over G+A+B to insure that jobs submitted
// * to G are not "cheated" - recall that jobs in this set of classes have the same weight and priority,
// * and thus the same "right" to all the shares. However, allocating a job from class A over the
// * full set of 10+30 shares could over-allocate it. So the cap calculations must be sure never to
// * increase the already-given shares for subpools.
// *
// * Therefore we traverse the FULL SET of classes on every recursion. When calculating caps from
// * apportion_shares the resource classes will have to account for multiple traversals and not over-allocate
// * if a class has already been apportioned from a subpool.
// *
// * Keep for a while for reference. It is wrong but if there are still bugs in the rewrite we
// * want easy reference to the original. jrc 2014-11-05 UIMA-4065
// */
// protected void traverseNodepoolsForCounts(NodePool np, List<ResourceClass> rcs)
// {
// //HashMap<String, NodePool> subpools = np.getChildren();
// List<NodePool> subpools = np.getChildrenAscending();
// for ( NodePool subpool : subpools ) {
// ArrayList<ResourceClass> cls = new ArrayList<ResourceClass>();
// String npn = subpool.getId();
// int njobs = 0;
// for ( ResourceClass rc : rcs ) {
// String rcnpn = rc.getNodepoolName();
// if ( rcnpn == null ) continue;
// if ( rc.getNodepoolName().equals(npn) ) {
// cls.add(rc);
// njobs += rc.countJobs();
// }
// }
// if ( njobs > 0 ) {
// traverseNodepoolsForCounts(subpool, cls);
// }
// }
// countClassShares(np, rcs);
// }
protected void updateNodepools(NodePool np, ArrayList<ResourceClass> rcs)
{
//HashMap<String, NodePool> subpools = np.getChildren();
List<NodePool> subpools = np.getChildrenAscending();
for ( NodePool subpool : subpools ) {
ArrayList<ResourceClass> cls = new ArrayList<ResourceClass>();
String npn = subpool.getId();
int njobs = 0;
for ( ResourceClass rc : rcs ) {
String rcnpn = rc.getNodepoolName();
if ( rcnpn == null ) continue;
if ( rc.getNodepoolName().equals(npn) ) {
cls.add(rc);
njobs += rc.countJobs();
}
}
if ( njobs > 0 ) {
updateNodepools(subpool, cls);
}
}
// All calls have rcs EXACTLY the same as classes assigned to the np except
// for the last one, to global. At the end of the recursion everything except
// those have been processed. We grab just the ones in --global-- so we don't
// end up counting them more than once.
if ( np == globalNodepool ) {
ArrayList<ResourceClass> tmp = new ArrayList<ResourceClass>();
for ( ResourceClass rc : rcs ) {
if ( rc.getNodepoolName().equals(globalNodepool.getId()) ) {
tmp.add(rc);
}
}
rcs = tmp;
}
for (ResourceClass rc : rcs ) {
if ( rc.countJobs() == 0 ) continue;
rc.updateNodepool(np);
}
}
private void howMuchFairShare(ArrayList<ResourceClass> rcs)
{
String methodName = "howMuchFairShare";
if ( logger.isTrace() ) {
logger.info(methodName, null, "Scheduling FAIR SHARE for these classes:");
logger.info(methodName, null, " ", ResourceClass.getHeader());
logger.info(methodName, null, " ", ResourceClass.getDashes());
for ( ResourceClass pc : rcs ) {
logger.info(methodName, null, " ", pc.toString());
}
}
ArrayList<ResourceClass> eligible = new ArrayList<ResourceClass>();
Collections.sort(rcs, new ClassByWeightSorter());
for ( ResourceClass rc : rcs ) {
HashMap<IRmJob, IRmJob> jobs = rc.getAllJobs();
logger.debug(methodName, null, "Schedule class", rc.getName());
rc.clearShares();
if ( jobs.size() == 0 ) {
logger.debug(methodName, null, "No jobs to schedule in class ", rc.getName());
} else {
eligible.add(rc);
for ( IRmJob j : jobs.values() ) {
logger.info(methodName, j.getId(), "Scheduling job in class ", rc.getName(), ":", j.toString());
}
}
}
if ( eligible.size() == 0 ) {
return;
}
//
// First step, figure out how many shares per class.
//
traverseNodepoolsForCounts(globalNodepool, eligible); // (only these classes, not the global set)
//
// Everything should be stable - now reduce the counts in the nodepools
//
/**
logger.info(methodName, null, "Fair-share class counts:");
for ( ResourceClass rc : rcs ) {
logger.info(methodName, null, rc.tablesToString());
}
*/
//
// Update the class-level counts
//
updateNodepools(globalNodepool, eligible);
//
// Now the easy part, user and job allocations
//
//
// We now know how many shares per class. Let's do something similar for all the users in each class.
// This will leave us with the number of shares per user for each resource class.
//
// The fair-share is easier to calculate because all priorities are the same within a class - we
// dealt with class weights in the loop above. But we always have to deal with jobs that don't
// use their full allocation.
//
for ( ResourceClass rc : rcs ) {
if ( !rc.hasSharesGiven() ) {
for (IRmJob j : rc.getAllJobs().values()) {
j.clearShares();
j.undefer(); // can get message set during defrag
}
continue;
}
//
// Common rc-based share assignment for counting both user and job shares
//
/**
int[] tmpSharesByOrder = rc.getSharesByOrder(globalNodepool.getArraySize()); // get the shares given by countClassShares
int len = tmpSharesByOrder.length;
for ( int i = 1; i < len; i++ ) { // now get by-share totals, same as nSharesByOrder in the node pool,
// only for the class, not globally
for ( int j = i+1; j < len; j++ ) {
if ( tmpSharesByOrder[j] != 0 ) {
tmpSharesByOrder[i] += (j / i) * tmpSharesByOrder[j];
}
}
}
*/
if ( ! countUserShares(rc) ) {
continue; // nothing to do, on to the next class
}
countJobShares(rc);
}
}
/**
* Is the job submitted to one of these classes?
*/
protected boolean jobInClass(ArrayList<ResourceClass> rcs, IRmJob j)
{
for ( ResourceClass rc : rcs ) {
if ( j.getResourceClass() == rc ) return true;
}
return false;
}
/**
* Expand jobs from the needy list ahead of other jobs, because we likely stole the
* shares from older jobs which would otherwise get priority.
*/
protected void expandNeedyJobs(NodePool np, ArrayList<ResourceClass> rcs)
{
String methodName = "expandNeedyJobs";
if ( needyJobs.size() == 0 ) return;
logger.trace(methodName, null, "Enter: needyJobs.size =", needyJobs.size());
List<NodePool> subpools = np.getChildrenAscending();
for ( NodePool subpool : subpools ) {
expandNeedyJobs(subpool, rcs);
}
List<IRmJob> fair_share_jobs = new ArrayList<IRmJob>();
List<IRmJob> fixed_share_jobs = new ArrayList<IRmJob>();
List<IRmJob> reservations = new ArrayList<IRmJob>();
List<IRmJob> removeList = new ArrayList<IRmJob>();
for ( IRmJob j : needyJobs.values() ) {
if ( j.isCompleted() ) {
removeList.add(j);
continue;
}
ResourceClass rc = j.getResourceClass();
if ( rc == null ) {
removeList.add(j);
continue; // job completed or was canceled or some other wierdness
}
if ( !jobInClass(rcs, j) ) continue;
if ( rc.getNodepool() == np ) {
switch ( rc.getPolicy()) {
case FAIR_SHARE:
fair_share_jobs.add(j);
break;
case FIXED_SHARE:
fixed_share_jobs.add(j);
break;
case RESERVE:
reservations.add(j);
break;
}
removeList.add(j);
}
}
for ( IRmJob j : removeList ) {
needyJobs.remove(j);
}
// try to connect shares now
Collections.sort(reservations, new JobByTimeSorter());
logger.debug(methodName, null, "NP[", np.getId(), "Expand needy reservations.", listJobSet(reservations));
for ( IRmJob j : reservations ) {
ResourceClass rc = j.getResourceClass();
np.findMachines(j, rc);
}
Collections.sort(fixed_share_jobs, new JobByTimeSorter());
logger.debug(methodName, null, "NP[", np.getId(), "Expand needy fixed.", listJobSet(fixed_share_jobs));
for ( IRmJob j : fixed_share_jobs ) {
if ( np.findShares(j, false) > 0 ) {
//
// Need to fix the shares here, if any, because the findShares() code is same for fixed and fair share so it
// won't have done that yet.
//
for ( Share s : j.getPendingShares().values() ) {
s.setFixed();
logger.info(methodName, j.getId(), "Assign:", s);
}
}
}
Collections.sort(fair_share_jobs, new JobByTimeSorter());
logger.debug(methodName, null, "NP[", np.getId(), "Expand needy jobs.", listJobSet(fair_share_jobs));
np.doExpansion(fair_share_jobs);
logger.trace(methodName, null, "Exit : needyJobs.size =", needyJobs.size());
}
// private static int stop_here_dx = 0;
protected void traverseNodepoolsForExpansion(NodePool np, ArrayList<ResourceClass> rcs)
{
String methodName = "traverseNodepoolsForExpansion";
// HashMap<String, NodePool> subpools = np.getChildren();
List<NodePool> subpools = np.getChildrenAscending();
StringBuffer sb = new StringBuffer();
for ( NodePool sp : subpools ) {
sb.append(sp.getId());
sb.append(" ");
}
logger.info(methodName, null, np.getId(), "Doing expansions in this order:", sb.toString());
for ( NodePool subpool : subpools ) {
traverseNodepoolsForExpansion(subpool, rcs);
}
// logger.info(methodName, null, "--- stop_here_dx", stop_here_dx);
// if ( stop_here_dx == 13 ) {
// @SuppressWarnings("unused")
// int stophere;
// stophere=1;
// }
// stop_here_dx++;
// fall through at leaves
// gather all the classes that are assigned to this pool
ArrayList<ResourceClass> cls = new ArrayList<ResourceClass>();
String npn = np.getId();
int njobs = 0;
for ( ResourceClass rc : rcs ) {
String rcnpn = rc.getNodepoolName();
if ( rcnpn == null ) continue;
if ( rc.getNodepoolName().equals(npn) ) {
cls.add(rc);
njobs += rc.countJobs();
}
}
if ( njobs == 0 ) return;
ArrayList<IRmJob> jobs = new ArrayList<IRmJob>(); // collect all the jobs for the current np
for ( ResourceClass rc : cls ) {
jobs.addAll(rc.getAllJobs().values());
}
Collections.sort(jobs, new JobByTimeSorter());
np.doExpansion(jobs);
}
/**
* Counts are established. We strictly honor them inside each job and evict where the count is
* less than the currently allocated shares, and add where it is greater.
*/
//static int stophere = 0;
protected void whatOfFairShare(ArrayList<ResourceClass> rcs)
{
String methodName = "whatOfFairShare";
ArrayList<ResourceClass> eligible = new ArrayList<ResourceClass>();
Collections.sort(rcs, new ClassByWeightSorter());
for ( ResourceClass rc : rcs ) {
HashMap<IRmJob, IRmJob> jobs = rc.getAllJobs();
logger.debug(methodName, null, "Schedule class", rc.getName());
if ( jobs.size() == 0 ) {
logger.debug(methodName, null, "No jobs to schedule in class ", rc.getName());
} else {
eligible.add(rc);
for ( IRmJob j : jobs.values() ) { // just logging for debug for now
logger.info(methodName, j.getId(), "Scheduling job in class ", rc.getName(), ":", j.countNSharesGiven(), "shares given, order", j.getShareOrder());
}
}
}
if ( eligible.size() == 0 ) {
return;
}
//traverseNodepoolsForEviction(globalNodepool, eligible);
// logger.trace(methodName, null, "Machine occupancy before expansion", stophere++);
// if ( stophere == 7 ) {
// @SuppressWarnings("unused")
// int stophere;
// stophere = 1 ;
// }
traverseNodepoolsForExpansion(globalNodepool, eligible);
}
// ==========================================================================================
// ==========================================================================================
// =========================== REWORKED CODE FOR FIXED_SHARE =================================
// ==========================================================================================
// ==========================================================================================
/**
* Make sure there are enough shares to allocate either directly, or through preemption,
* and count them out.
*/
void howMuchFixed(ArrayList<ResourceClass> rcs)
{
String methodName = "howMuchFixed";
if ( logger.isTrace() ) {
logger.info(methodName, null, "Scheduling FIXED SHARE for these classes:");
logger.info(methodName, null, " ", ResourceClass.getHeader());
logger.info(methodName, null, " ", ResourceClass.getDashes());
for ( ResourceClass pc : rcs ) {
logger.info(methodName, null, " ", pc.toString());
}
}
int total_jobs = 0;
for ( ResourceClass rc : rcs ) {
HashMap<IRmJob, IRmJob> jobs = rc.getAllJobs();
total_jobs += jobs.size();
}
if ( total_jobs == 0 ) {
return;
}
for ( ResourceClass rc : rcs ) {
ArrayList<IRmJob> jobs = rc.getAllJobsSorted(new JobByTimeSorter());
for ( IRmJob j : jobs ) {
logger.info(methodName, j.getId(), "Scheduling job to class:", rc.getName());
j.clearShares(); // reset virtual shares at start of each schedling cycle
j.undefer(); // in case it works this time!
switch ( j.getDuccType() ) {
case Job:
countFixedForJob(j, rc);
break;
case Service:
case Pop:
case Reservation:
default:
countSingleFixedProcess(j, rc);
break;
}
}
}
}
void countFixedForJob(IRmJob j, ResourceClass rc)
{
String methodName = "countFixedForJob";
logger.info(methodName, j.getId(), "Counting shares for", j.getShortType() + "." + j.getId());
// Allowed something if we get here. Must see if we have something to give.
NodePool np = rc.getNodepool();
int order = j.getShareOrder();
int available = np.countLocalNSharesByOrder(order);
logger.info(methodName, j.getId(), "available shares of order", order, "in np:", available);
if ( available == 0 ) {
if (j.countNShares() == 0) {
if ( np.countFixable(j) > 0 ) {
schedulingUpdate.defer(j, "Deferred because insufficient resources are availble.");
logger.info(methodName, j.getId(), "Deferring, insufficient shares available. NP", np.getId(),
"available[", np.countNSharesByOrder(order), "]");
} else {
schedulingUpdate.defer(j, "Deferred because no hosts in class " + rc.getName() + " have sufficient memory to accomodate the request.");
logger.info(methodName, j.getId(), "Deferring, no machines big enough for the request. NP", np.getId(),
"available[", np.countNSharesByOrder(order), "]");
}
return;
} else {
logger.info(methodName, j.getId(), "Nodepool is out of shares: NP", np.getId(),
"available[", np.countNSharesByOrder(order), "]");
}
}
int granted = getAllotmentForJob(j); // in nshares, processes
//
// The job passes; give the job a count
//
logger.info(methodName, j.getId(), "+++++ nodepool", np.getId(), "class", rc.getName(), "order", order, "shares", nSharesToString(granted, order));
int[] gbo = globalNodepool.makeArray();
gbo[order] = granted; // what we get
j.setGivenByOrder(gbo);
// The difference between what we pass to 'what of', and what we already have. The shares we already have are accounted
// for in a special step at the start of the scheduling round.
np.countOutNSharesByOrder(order, granted - j.countNShares());
}
void countSingleFixedProcess(IRmJob j, ResourceClass rc)
{
String methodName = "countSingleFixedProcess";
logger.info(methodName, j.getId(), "Counting shares for", j.getShortType() + "." + j.getId(), "in class", rc.getName());
NodePool np = rc.getNodepool();
if ( j.isCompleted() ) {
return;
}
if ( j.countNShares() > 0 ) { // only 1 allowed, UIMA-4275
// already accounted for as well, since it is a non-preemptable share
logger.info(methodName, j.getId(), "[stable]", "assigned", j.countNShares(), "processes, ",
(j.countNShares() * j.getShareOrder()), "QS");
int[] gbo = globalNodepool.makeArray();
gbo[j.getShareOrder()] = 1; // must set the allocation so eviction works right
j.setGivenByOrder(gbo);
return;
}
int order = j.getShareOrder();
//
// Now see if we have sufficient shares in the nodepool for this allocation.
//
if ( np.countLocalNSharesByOrder(order) == 0 ) {
if (np.countFixable(j) > 0 ) {
schedulingUpdate.defer(j, "Deferred because insufficient resources are availble.");
logger.info(methodName, j.getId(), "Deferring, insufficient shares available. NP", np.getId(), "available[", np.countNSharesByOrder(order), "]");
} else {
schedulingUpdate.defer(j, "Deferred because no hosts in class " + rc.getName() + " have sufficient memory to accomodate the request.");
logger.info(methodName, j.getId(), "Deferring, no machines big enough for the request. NP", np.getId(),
"available[", np.countNSharesByOrder(order), "]");
}
return;
}
//
// Make sure this allocation does not blow the allotment cap.
//
if ( ! validSingleAllotment(j) ) return; // this method will defer the job and log it
//
// The job passes. Assign it a count and get on with life ...
//
logger.info(methodName, j.getId(), "+++++ nodepool", np.getId(), "class", rc.getName(), "order", order, "shares", nSharesToString(1, order));
int[] gbo = globalNodepool.makeArray();
gbo[order] = 1;
j.setGivenByOrder(gbo);
np.countOutNSharesByOrder(order, 1);
}
/**
* If there are free shares of the right order just assign them. Otherwise
* the counts will cause evictions in lower-priority code so we just wait.
*/
protected void whatOfFixedShare(ArrayList<ResourceClass> rcs)
{
String methodName = "whatOfFixedShare";
for ( ResourceClass rc : rcs ) {
ArrayList<IRmJob> jobs = rc.getAllJobsSorted(new JobByTimeSorter());
NodePool np = rc.getNodepool();
for ( IRmJob j : jobs ) {
if ( j.countNShares() == j.countNSharesGiven() ) { // got what we need, we're done
continue;
}
if ( j.isRefused() ) { // bypass jobs that we know can't be allocated. unlikely after UIMA-4275.
continue;
}
if ( j.isDeferred() ) { // UIMA-4275 - still waiting for an allocation
continue;
}
if ( j.isCompleted() ) { // UIMA-4327 - reinstated, if this gets set we aren't allowed to expand any more
continue;
}
int order = j.getShareOrder();
int count = j.countNSharesGiven();
if ( np.findSharesHorizontal(j) > 0 ) { // UIMA-4275, no longer require full allocation, we'll take what we can
// UIMA-4712 first try horizontal stacking
//
// Need to fix the shares here, if any, because the findShares() code is same for fixed and fair share so it
// won't have done that yet.
//
for ( Share s : j.getPendingShares().values() ) {
s.setFixed();
}
logger.info(methodName, j.getId(), "Assign(H):", nSharesToString(count, order));
}
if ( j.countNShares() == 0 ) { // UIMA-4712 now try horizontal stacking
if ( np.findSharesVertical(j) > 0 ) {
//
// Need to fix the shares here, if any, because the findShares() code is same for fixed and fair share so it
// won't have done that yet.
//
for ( Share s : j.getPendingShares().values() ) {
s.setFixed();
}
logger.info(methodName, j.getId(), "Assign(V):", nSharesToString(count, order));
}
}
//
// If nothing assigned we're waiting on preemptions which will occur naturally, or by forcible eviction of squatters,
// or defrag.
//
if ( j.countNShares() == 0 ) {
if ( j.countNShares() == 0 ) {
j.setReason("Waiting for preemptions.");
if(j.getEligibleMachinesCount() == 0) {
if(j.getMachineList().size() > 0) {
j.setReason("No machines match user specified list.");
}
}
}
}
}
}
}
// ==========================================================================================
// ==========================================================================================
// =========================== REWORKED CODE FOR RESERVATIONS ================================
// ==========================================================================================
// ==========================================================================================
private void howMuchReserve(ArrayList<ResourceClass> rcs)
{
String methodName = "howMuchreserve";
if ( logger.isTrace() ) {
logger.info(methodName, null, "Calculating counts for RESERVATION for these classes:");
logger.info(methodName, null, " ", ResourceClass.getHeader());
logger.info(methodName, null, " ", ResourceClass.getDashes());
for ( ResourceClass pc : rcs ) {
logger.info(methodName, null, " ", pc.toString());
}
}
int total_jobs = 0;
for ( ResourceClass rc : rcs ) {
HashMap<IRmJob, IRmJob> jobs = rc.getAllJobs();
total_jobs += jobs.size();
}
if ( total_jobs == 0 ) {
return;
}
for ( ResourceClass rc : rcs ) {
ArrayList<IRmJob> jobs = rc.getAllJobsSorted(new JobByTimeSorter());
// Now pick up the work that can be scheduled, if any
for ( IRmJob j : jobs) {
j.clearShares(); // reset shares assigned at start of each schedling cycle
j.undefer(); // in case it works this time!
switch ( j.getDuccType() ) {
case Job:
countReservationForJob(j, rc);
break;
case Service:
case Pop:
case Reservation:
default:
countSingleReservation(j, rc);
break;
}
}
}
}
void countReservationForJob(IRmJob j, ResourceClass rc)
{
String methodName = "countReservationForJob";
logger.info(methodName, j.getId(), "Counting full machines for", j.getShortType() + "." + j.getId());
// Allowed something if we get here. Must see if we have something to give.
NodePool np = rc.getNodepool();
// Are there any machines of the right size in the NP that can be reserved for this job?
int available = np.countReservables(j);
if ( available == 0 ) {
if (j.countNShares() == 0) {
schedulingUpdate.defer(j, "Deferred because there are no hosts of the correct size in class " + rc.getName());
logger.info(methodName, j.getId(), "Deferred because no hosts of correct size. NP", np.getId());
} else {
logger.info(methodName, j.getId(), "Nodepool is out of shares: NP", np.getId());
}
return;
}
int granted = getAllotmentForJob(j); // total allowed, including those already scheduled
int needed = granted - j.countNShares(); // additional shares
int freeable = 0;
if ( needed > 0 ) {
freeable = np.countFreeableMachines(j, needed); // might schedule evictions
if ( (freeable + j.countNShares()) == 0 ) {
schedulingUpdate.defer(j, "Deferred because resources are exhausted.");
logger.warn(methodName, j.getId(), "Deferred because resources are exhausted in nodepool " + np.getId());
return;
}
}
//
// The job passes; give the job a count
//
logger.info(methodName, j.getId(), "Request is granted a machine for reservation.");
int[] gbo =globalNodepool.makeArray();
int order = j.getShareOrder(); // memory, coverted to order, so we can find stuff
gbo[order] = freeable + j.countNShares(); // account for new stuff plus what it already has
j.setGivenByOrder(gbo);
}
void countSingleReservation(IRmJob j, ResourceClass rc)
{
String methodName = "countSingleReservation";
logger.info(methodName, j.getId(), "Counting shares for", j.getShortType() + "." + j.getId(), "in class", rc.getName());
NodePool np = rc.getNodepool();
if ( j.countNShares() > 0 ) {
logger.info(methodName, j.getId(), "[stable]", "assigned", j.countNShares(), "processes, ",
(j.countNShares() * j.getShareOrder()), "QS");
int[] gbo =globalNodepool.makeArray();
gbo[j.getShareOrder()] = 1; // UIMA4275 - only one
j.setGivenByOrder(gbo);
return;
}
//
// Make sure this allocation does not blow the allotment cap.
//
if ( ! validSingleAllotment(j) ) return; // defers and logs
if ( np.countReservables(j) == 0 ) {
schedulingUpdate.defer(j, "Deferred because there are no hosts of the correct size in class " + rc.getName());
logger.warn(methodName, j.getId(), "Deferred because requested memory " + j.getMemory() + " does not match any machine.");
return;
}
if ( np.countFreeableMachines(j, 1) == 0 ) { // might also schedule preemptions
schedulingUpdate.defer(j, "Deferred because resources are exhausted.");
logger.warn(methodName, j.getId(), "Deferred because resources are exhausted in nodepool " + np.getId());
return;
}
logger.info(methodName, j.getId(), "Request is granted a machine for reservation.");
int[] gbo =globalNodepool.makeArray();
int order = j.getShareOrder(); // memory, coverted to order, so we can find stuff
gbo[order] = 1;
j.setGivenByOrder(gbo);
}
/**
*/
private void whatOfReserve(ArrayList<ResourceClass> rcs)
{
String methodName = "whatOfToReserve";
for ( ResourceClass rc : rcs ) {
NodePool np = rc.getNodepool();
ArrayList<IRmJob> jobs = rc.getAllJobsSorted(new JobByTimeSorter());
for ( IRmJob j: jobs ) {
if ( j.isRefused() ) { // bypass jobs that we know can't be allocated
continue;
}
if ( j.isDeferred() ) { // counts don't work, we can't do this yet
continue;
}
if ( j.isCompleted() ) { // UIMA-4327 - reinstated, if this gets set we aren't allowed to expand any more
continue;
}
try {
np.findMachines(j, rc);
} catch (Exception e) {
logger.error(methodName, j.getId(), "Reservation issues:", e);
continue;
}
//
// Either shares were assigned or not. If not we wait for evictions, otherwise it is
// fully allocated. Nothing more to do here.
//
if ( j.countNShares() == 0 ) {
if ( j.countNShares() == 0 ) {
j.setReason("Waiting for preemptions.");
if(j.getEligibleMachinesCount() == 0) {
if(j.getMachineList().size() > 0) {
j.setReason("No machines match user specified list.");
}
}
}
}
}
}
}
/**
* Remove non-preemptable resources from the counts.
*/
protected void accountForNonPreemptable()
{
for (ResourceClass rc : resourceClasses.values() ) {
switch ( rc.getPolicy() ) {
case FAIR_SHARE:
break;
case FIXED_SHARE:
case RESERVE:
{
NodePool np = rc.getNodepool();
HashMap<IRmJob, IRmJob> jobs = rc.getAllJobs();
for ( IRmJob j : jobs.values() ) {
HashMap<Share, Share> shares = j.getAssignedShares();
np.accountForShares(shares);
}
}
break;
}
}
}
/**
* Remove active shares from the nodepool counts, leaving us with just the free shares in the tables.
*/
protected void accountForFairShare()
{
for (ResourceClass rc : resourceClasses.values() ) {
if ( rc.getPolicy() == Policy.FAIR_SHARE ) {
NodePool np = rc.getNodepool();
HashMap<IRmJob, IRmJob> jobs = rc.getAllJobs();
for ( IRmJob j : jobs.values() ) {
HashMap<Share, Share> shares = j.getAssignedShares();
np.accountForShares(shares);
}
}
}
}
/**
* The second stage - work up all counts globally
*/
protected void findHowMuch(ArrayList<ResourceClass> rcs)
{
switch ( rcs.get(0).getPolicy() ) {
case FAIR_SHARE:
howMuchFairShare(rcs);
break;
case FIXED_SHARE:
howMuchFixed(rcs);
break;
case RESERVE:
howMuchReserve(rcs);
break;
}
}
/**
* Collect all the fair-share classes and pass them to the nodepools for eviction
* This interacts recursively with NodePool.doEvictions*.
*
* In NodepoolScheduler we recurse doing depth-first traversal to have every nodepool do evictions for jobs
* that are submitted to that pool. The NodePool then recurses again in depth-first mode to evict shares that
* have spilled into lower-level pools. We have a double-recursion:
* 1. Evict shares for jobs that originate in each node pool
* 2. Evict shares that have spilled into lower-level pools.
*/
// private static int stop_here_de = 0;
protected void doEvictions(NodePool nodepool)
{
String methodName = "doEvictions";
// logger.info(methodName, null, "--- stop_here_de", stop_here_de);
// if ( stop_here_de == 7 ) {
// @SuppressWarnings("unused")
// int stophere;
// stophere=1;
// }
// stop_here_de++;
for ( NodePool np : nodepool.getChildrenDescending() ) { // recurse down the tree
logger.debug(methodName, null, "Recurse to", np.getId(), "from", nodepool.getId());
doEvictions(np); // depth-first traversal
logger.debug(methodName, null, "Return from", np.getId(), "proceeding with logic for", nodepool.getId());
}
int neededByOrder[] =globalNodepool.makeArray(); // for each order, how many N-shares do I want to add?
// int total_needed = 0;
Map<IRmJob, Integer> overages = new HashMap<IRmJob, Integer>(); // UIMA-4275
for ( ResourceClass cl : resourceClasses.values() ) {
if ( cl.getNodepoolName().equals(nodepool.getId()) && (cl.getAllJobs().size() > 0) ) {
HashMap<IRmJob, IRmJob> jobs = cl.getAllJobs();
String npn = cl.getNodepoolName();
logger.info(methodName, null, String.format("%7s %7s %6s %5s (%s)", "Counted", "Current", "Needed", "Order", npn));
for ( IRmJob j : jobs.values() ) {
int counted = j.countNSharesGiven(); // allotment from the counter
int current = j.countNShares(); // currently allocated, plus pending, less those removed by earlier preemption
int needed = (counted - current);
int order = j.getShareOrder();
// Why abs and not max? Because if needed > 0, that's shares we need to make space for.
// if needed < 0, that's shares we need to dump because the
// counts say so.
// if needed == 0 then clearly nothing
if ( needed < 0 ) {
// UIMA-4275 these guys must be forced to shrink
overages.put(j, -needed);
} else {
// needed = Math.abs(needed);
// needed = Math.max(0, needed);
logger.info(methodName, j.getId(), String.format("%7d %7d %6d %5d", counted, current, needed, order));
neededByOrder[order] += needed;
// total_needed += needed;
}
}
}
}
// Every job in overages is required to lose the indicated number of share. If this is done optimally it
// will leave suficcient space for the counted shares of all the expansions. Therein lies the rub.
//
// The older code below does its best to make space for the 'needed' array but it fails to fully evict
// an over-deployed job in a number of situations. The loop here is going to rely on defragmentation, which
// we did not have originally to do final cleanup. The job will be asked to dump its extra processes according
// to the mechanism in its shrinkBy() method. See that method for details.
// UIMA-4275
for (IRmJob j : overages.keySet()) {
j.shrinkBy(overages.get(j));
}
// First we try to make enough space in the right places for under-allocation jobs
// logger.debug(methodName, null, nodepool.getId(), "NeededByOrder before any eviction:", Arrays.toString(neededByOrder));
// if ( (nodepool.countOccupiedShares() > 0) && (total_needed > 0) ) {
// nodepool.doEvictionsByMachine(neededByOrder, false);
}
/**
* Determine if a candidate share can or cannot be transferred (eventually) to a needy job based on nodepool constraints.
*/
boolean compatibleNodepools(Share candidate, IRmJob needy)
{
Machine m = candidate.getMachine();
ResourceClass nrc = needy.getResourceClass();
NodePool np = nrc.getNodepool();
return np.containsMachine(m); // can we get to the candidate share from 'needy's np?
}
// /**
// * Discover whether the potential job is able or unable to supply shares to a needy job because of nodepool restrictions.
// */
// boolean compatibleNodepools(IRmJob potential, IRmJob needy)
// {
// ResourceClass prc = potential.getResourceClass();
// ResourceClass nrc = needy.getResourceClass();
// NodePool pp = prc.getNodepool();
// NodePool np = nrc.getNodepool();
// return np.containsSubpool(pp) || pp.containsSubpool(np);
// }
/**
* Discover whether the potential resource class is able or unable to supply shares to a jobs in a needy class because of nodepool restrictions.
*/
boolean compatibleNodepools(ResourceClass potential, IRmJob needy)
{
ResourceClass nrc = needy.getResourceClass();
NodePool pp = potential.getNodepool();
NodePool np = nrc.getNodepool();
return np.containsSubpool(pp) || pp.containsSubpool(np);
}
/**
* For debugging, get job ids onto a single line.
*/
String listJobSet(Map<IRmJob, IRmJob> jobs)
{
if ( jobs.size() == 0 ) return "NONE";
StringBuffer sb = new StringBuffer("[");
for ( IRmJob j : jobs.keySet() ) {
sb.append(j.getId());
sb.append(" ");
}
sb.append("]");
return sb.toString();
}
String listJobSet(List<IRmJob> jobs)
{
if ( jobs.size() == 0 ) return "NONE";
StringBuffer sb = new StringBuffer("[");
for ( IRmJob j : jobs ) {
sb.append(j.getId());
sb.append(" ");
}
sb.append("]");
return sb.toString();
}
/**
* Make a share available to job 'nj'. If possible, just reassign If not, cancel it's expansion or evict it, as appropriate.
*/
boolean clearShare(Share s, IRmJob nj)
{
String methodName = "clearShare";
IRmJob rich_j = s.getJob();
if ( s.isPending() ) {
if (s.getShareOrder() == nj.getShareOrder() ) { // same size, reassign it directly
logger.debug(methodName, nj.getId(), "Reassign expanded share", s.toString(), "from", rich_j.getId());
Machine m = s.getMachine();
m.reassignShare(s, nj);
rich_j.cancelPending(s);
nj.assignShare(s);
return false;
} else { // different size, just discard it
logger.debug(methodName, nj.getId(), "Canceling expansion share", s.toString(), "from", rich_j.getId());
rich_j.cancelPending(s);
Machine m = s.getMachine();
m.removeShare(s);
return true;
}
} else { // not pending, must evict it
logger.debug(methodName, nj.getId(), "Donate", s.toString(), "from", rich_j.getId());
rich_j.shrinkByOne(s);
return false;
}
}
/**
* This routine tries to find enough process that can be coopted from "rich" users for jobs that deserved
* shares but couldn't get them because of fragmentation.
*
* @param j This is the job we are trying to find shrares for.
* @param needed This is the number of processes we need for job j.
* @param users_by_wealth This is all the users who can donate, ordered by wealth.
* @param jobs_by_user This is all the candidate jobs owned by the user. Note that this is not necessarily
* ALL the user's jobs, we will have culled everything that makes no sense to
* take from in the caller.
*
* @return The number of processes we found space for. Note this could be different from the number
* of processes evicted, if it took more than one eviction to make spece. Also We may have
* evicted a process smaller than is needed, because there was already some free space on
* the machine.
*/
int takeFromTheRich(IRmJob nj,
int needed,
TreeMap<User, User> users_by_wealth,
HashMap<User, TreeMap<IRmJob, IRmJob>> jobs_by_user)
{
String methodName = "takeFromTheRich";
// 1. Collect all machines that have shares, which if evicted, would make enough space
// - in compatible NP
// - g + sum(shares belonging to rich users on the machine);
// 2. Order the machiens by
// a) richest user
// b) largest machine
// 3. Pick next machine,
// - clear enough shares
// - remove machine from list
// - update wealth
// 4. Repeat at 2 until
// a) have given what is needed
// b) nothing left to give
Map<IRmJob, IRmJob> candidateJobs = new HashMap<IRmJob, IRmJob>();
Map<Machine, Machine> eligibleMachines = new TreeMap<Machine, Machine>(new EligibleMachineSorter());
for ( TreeMap<IRmJob, IRmJob> jobs : jobs_by_user.values() ) {
candidateJobs.putAll(jobs);
}
int given = 0;
int orderNeeded = nj.getShareOrder();
ResourceClass cl = nj.getResourceClass(); // needy job's resource class
String npname = cl.getNodepoolName(); // name of the class
NodePool np = globalNodepool.getSubpool(npname); // job's nodepool
Map<Node, Machine> machines = np.getAllMachines(); // everything here is a candidate, nothing else is
// this is the machines in the pool, and all the
// subpools
// Here we filter all the machines looking for machines that *might* be able to satisfy the defrag. At the
// end this set of machines is eligbleMachines.
machine_loop :
for ( Machine m : machines.values() ) {
if ( m.getShareOrder() < orderNeeded ) { // nope, too small
logger.debug(methodName, nj.getId(), "Bypass ", m.getId(), ": too small for request of order", orderNeeded);
continue;
}
// if the job is a reservation the machine size has to matchm and machine must be clearable
if ( nj.getSchedulingPolicy() == Policy.RESERVE ) {
if ( m.getShareOrder() != orderNeeded ) {
logger.debug(methodName, nj.getId(), "Bypass ", m.getId(), ": RESERVE policy requires exact match for order", orderNeeded);
continue;
}
// machine must be clearable as well
Collection<Share> shares = m.getActiveShares().values();
for ( Share s : shares ) {
if ( ! candidateJobs.containsKey(s.getJob()) ) {
logger.debug(methodName, nj.getId(), "Bypass ", m.getId(), ": for reservation, machine contains non-candidate job", s.getJob().getId());
continue machine_loop;
}
}
}
Map<Share, Share> as = m.getActiveShares(); // everything alloacated here
int g = m.getVirtualShareOrder(); // g is space that we might be able to make after defrag:
// free space + freeable-from-candidates
for ( Share s : as.values() ) {
IRmJob j = s.getJob();
if ( s.isForceable() && candidateJobs.containsKey(j) ) { // evictable, and a candidate for reclamation by defrag
g += j.getShareOrder();
}
}
if ( g >= orderNeeded ) { // if it's usable by the job, it's a candidate
logger.info(methodName, nj.getId(), "Candidate machine:", m.getId());
eligibleMachines.put(m, m);
} else {
logger.info(methodName, nj.getId(), "Not a candidate, insufficient free space + candidate shares:", m.getId());
}
}
// Now eligibleMachines is the set of candidate machines for defrag
logger.info(methodName, nj.getId(), "Found", eligibleMachines.size(), "machines to be searched in this order:");
StringBuffer buf = new StringBuffer();
for ( Machine m : eligibleMachines.keySet() ) {
buf.append(m.getId());
buf.append(" ");
}
logger.info(methodName, nj.getId(), "Eligible machines:", buf.toString());
// first part done, we know where to look.
// Now just bop through the machines to see if we can get anything for this specific job (nj)
int given_per_round = 0;
do {
int g = 0;
given_per_round = 0;
for ( Machine m : eligibleMachines.keySet() ) {
//
// How best to order candidate shares? You can choose the "wealthiest" first, but if it's not a good
// match by size, end up evicting too many shares which could include a not-so-wealthy share, or
// increase frag by breaking it up and leaving a useless bit.
//
// So we're going to try ordering shares by "wealthiest", but then if we find an exact match by size,
// order that to the front of the candidates. We may not end up evicting the "wealthiest", but we
// should end up evicting tne least disruptive share.
//
List<Share> sh = new ArrayList<Share>();
sh.addAll(m.getActiveShares().values());
Collections.sort(sh, new ShareByWealthSorter());
g = m.getVirtualShareOrder(); // ( free space at this point )
List<Share> potentialShares = new ArrayList<Share>();
for ( Share s : sh ) {
IRmJob j = s.getJob();
// User u = j.getUser();
if ( s.isForceable() ) {
if ( candidateJobs.containsKey(j) ) {
g += s.getShareOrder();
if ( s.getShareOrder() == orderNeeded ) {
potentialShares.add(0, s); // exact matches first
} else {
potentialShares.add(s);
}
}
}
if ( g >= orderNeeded ) break;
}
// potentialShares should be properly ordered as discussed above at this point
if ( g >= orderNeeded ) {
// found enough on this machine for 1 share!
logger.debug(methodName, nj.getId(), "Clearing shares: g[", g, "], orderNeeded[", orderNeeded, "]");
g = m.getVirtualShareOrder(); // reset
for ( Share s : potentialShares ) {
IRmJob j = s.getJob();
User u = j.getUser();
g += s.getShareOrder();
given_per_round++;
clearShare(s, nj);
u.subtractWealth(s.getShareOrder());
logger.debug(methodName, nj.getId(), "Clearing share", s, "order[", s.getShareOrder(),
"]: g[", g, "], orderNeeded[", orderNeeded, "]");
if ( g >= orderNeeded) break; // inner loop, could break on exact match without giving everything away
}
break; // outer loop, if anything was found
}
}
if ( given_per_round > 0 ) {
// Must reorder the eligible list to get the "next" best candidate. We could try to remove
// machines that were exhausted above ...
Map<Machine, Machine> tmp = new HashMap<Machine, Machine>();
tmp.putAll(eligibleMachines);
eligibleMachines.clear();
for ( Machine m : tmp.keySet() ) {
eligibleMachines.put(m, m);
}
// and also must track how many processes we made space for
given = given + (g / orderNeeded); // at least one,or else we have a bug
logger.debug(methodName, nj.getId(), "LOOPEND: given[", given, "] g[", g, "] orderNeeded[", orderNeeded, "]");
}
logger.debug(methodName, nj.getId(), "Given_per_round", given_per_round, "given", given, "needed", needed);
} while ( (given_per_round > 0) && ( given < needed ));
// Sometimes we can directly reassign a share, in which case the job isn't waiting any more.
// We only care about setting a message if the poor thing is still totally starved of resources.
if ( nj.countNShares() == 0 ) {
nj.setReason("Waiting for defragmentation.");
}
return given;
}
void doFinalEvictions(HashMap<IRmJob, Integer> needy)
{
String methodName = "doFinalEvictions";
for ( IRmJob j : needy.keySet() ) {
logger.debug(methodName, j.getId(), "Will attempt to have space made for", needy.get(j), "processes");
}
//
// Search for candidate donors and order by "most able to be generous". Nodepools must be compatible.
//
// If prioritiy of needy is same or better, the cCandidates must not be needy, must be initialized already,
// and have sufficient shares to give.
//
// If priority of needy is better we keep track of the rich vs the poor jobs and possibly perform a second
// pass that includes poor jobs, if we can't get enougg from the rich.
//
for ( IRmJob nj : needy.keySet() ) {
int priority_needy = nj.getSchedulingPriority();
TreeMap<IRmJob, IRmJob> rich_candidates = new TreeMap<IRmJob, IRmJob>(new FragmentationSorter()); // first class candidates, they're rich and available
TreeMap<IRmJob, IRmJob> poor_candidates = new TreeMap<IRmJob, IRmJob>(new FragmentationSorter()); // clearing for better priority job, we only use this if it's
// impossible to clear from the rich candidates
for ( ResourceClass rc : resourceClasses.values() ) {
if ( rc.getPolicy() == Policy.RESERVE ) continue; // exempt from preemption
if ( rc.getPolicy() == Policy.FIXED_SHARE ) continue; // exempt from preemption
if ( ! compatibleNodepools(rc, nj) ) {
logger.debug(methodName, nj.getId(), "Skipping class", rc.getName(), "vs job class", nj.getResourceClass().getName(), "because of incompatible nodepools.");
continue;
}
int priority_candidate = rc.getPriority();
boolean use_expanded_pool = false; // better priority job is allowed to look at poor jobs if can't be satisfied from the rich
if ( priority_needy > priority_candidate ) { // Greater means worse
logger.debug(methodName, nj.getId(), "Jobs in class", rc.getName(), "are not candidates because better priority: [",
priority_candidate, "vs", priority_needy, "]");
continue;
}
if ( priority_needy < priority_candidate ) { // less means better
logger.debug(methodName, nj.getId(), "Needy job has better priority than jobs in class", rc.getName(), "[",
priority_candidate, "vs", priority_needy, "]. Using expanded pool.");
use_expanded_pool = true;
}
HashMap<IRmJob, IRmJob> jobs = rc.getAllJobs();
for ( IRmJob j : jobs.values() ) {
int nshares = j.countNShares();
int qshares = nshares * j.getShareOrder();
if ( nshares == 0 ) {
logger.debug(methodName, nj.getId(), "Job", j.getId(), "is not a candidate because it has no share.");
continue;
}
if ( needy.containsKey(j) ) {
if ( use_expanded_pool ) {
logger.debug(methodName, nj.getId(), "Job", j.getId(), "is a backup candidate because it's needy.");
poor_candidates.put(j, j);
} else {
logger.debug(methodName, nj.getId(), "Job", j.getId(), "is a not a candidate because it's needy.");
}
continue;
}
if ( ! j.isInitialized() ) {
if ( use_expanded_pool ) {
logger.debug(methodName, nj.getId(), "Job", j.getId(), "is a backup candidate because it's not initialized yet.");
poor_candidates.put(j, j);
} else {
logger.debug(methodName, nj.getId(), "Job", j.getId(), "is not a candidate because it's not initialized yet.");
}
continue;
}
if ( nshares < fragmentationThreshold ) {
if ( use_expanded_pool ) {
logger.debug(methodName, nj.getId(), "Job", j.getId(), "is a backup candidate because below frag threshold. nshares[", nshares, "] qshares[", qshares, "] threshold[", fragmentationThreshold, "]");
poor_candidates.put(j, j);
} else {
logger.debug(methodName, nj.getId(), "Job", j.getId(), "is not a candidate because below frag threshold. nshares[", nshares, "] qshares[", qshares, "] threshold[", fragmentationThreshold, "]");
}
continue;
}
logger.debug(methodName, nj.getId(), "Job", j.getId(), "is a candidate with processes[", nshares, "] qshares[", qshares, "]");
rich_candidates.put(j, j);
}
} // End search for candidate donors
//
// Here start looking at 'needy' and trying to match them agains the candidates
//
HashMap<User, TreeMap<IRmJob, IRmJob>> jobs_by_user = new HashMap<User, TreeMap<IRmJob, IRmJob>>(); // use this to track where the wealth originatses
TreeMap<User, User> users_by_wealth = new TreeMap<User, User>(new UserByWealthSorter()); // orders users by wealth
collectWealth(rich_candidates, users_by_wealth, jobs_by_user);
int needed = needy.get(nj); // this was adjusted to a reasonable level in the caller
logger.debug(methodName, nj.getId(), "Needy job looking for", needed, "more processes of O[", nj.getShareOrder(), "]");
//
// Try stealing shares from the "rich" candidates first.
//
needed -= takeFromTheRich(nj, needed, users_by_wealth, jobs_by_user);
if ( needed <= 0 ) {
// This can go <0 if total space freed + unused space on a node adds up to >1 share.
// It's slimplest to just not sweat it and call it satisfied.
logger.info(methodName, nj.getId(), "Satisfied needs of job by taking from the rich.");
continue;
}
//
// The needy job had sufficient priority that be built up a list of emergency-backup jobs to evict.
//
if ( poor_candidates.size() > 0) {
logger.info(methodName, nj.getId(), "Could not clear sufficient space from rich candidates. Retrying with all candidates.");
jobs_by_user.clear();
users_by_wealth.clear();
rich_candidates.putAll(poor_candidates);
collectWealth(rich_candidates, users_by_wealth, jobs_by_user);
needed -= takeFromTheRich(nj, needed, users_by_wealth, jobs_by_user);
if ( needed <= 0 ) {
// This can go <0 if total space freed + unused space on a node adds up to >1 share.
// It's slimplest to just not sweat it and call it satisfied.
logger.info(methodName, nj.getId(), "Satisfied needs of job by taking from all candidates.");
continue;
}
}
logger.info(methodName, nj.getId(), "Could not get enough from the rich. Asked for", needy.get(nj), "still needing", needed);
nj.setReason("Waiting for defragmentation.");
}
}
void collectWealth(TreeMap<IRmJob, IRmJob> candidates, TreeMap<User, User> users_by_wealth, HashMap<User, TreeMap<IRmJob, IRmJob>> jobs_by_user)
{
// Candidates are ordered by the FragmentationSorter
// - most over pure fair share
// - hten most currently allocated
// user_by_wealth is ordered by the UserByWealthSorter
// - ordered by most wealth - actual qshares over all jobs
//
// Collect total wealth and order the wealthy by spondulix
//
HashMap<User, Integer> shares_by_user = new HashMap<User, Integer>(); // use this to track user's wealth
for ( IRmJob j : candidates.values() ) {
User u = j.getUser();
if ( shares_by_user.get(u) == null ) {
shares_by_user.put(u, 0);
}
shares_by_user.put(u, shares_by_user.get(u) + (j.countNShares() * j.getShareOrder()));
TreeMap<IRmJob, IRmJob> ujobs = jobs_by_user.get(u);
if ( ujobs == null ) {
ujobs = new TreeMap<IRmJob, IRmJob>(new JobByShareSorter()); // orders by largest number of assigned shares
jobs_by_user.put(u, ujobs);
}
ujobs.put(j, j);
}
// and tracks their fat jobs
for ( User u : shares_by_user.keySet() ) {
u.setShareWealth(shares_by_user.get(u)); // qshares
users_by_wealth.put(u, u);
}
}
void getNodepools(NodePool top, List<NodePool> nodepools)
{
for ( NodePool np : top.getChildren().values()) {
getNodepools(np, nodepools);
}
nodepools.add(top);
}
/**
* Check to see if there are jobs whose counts indicate they should be getting shares, but which
* can't be satisfied either from free shares or from pending evictions because of
* fragmentation.
*
* If there's frag, we return a map that shows how much each under-allocated job needs, and
* a map of the pending evictions.
*/
void detectFragmentation(HashMap<IRmJob, Integer> needed_by_job)
{
String methodName = "detectFragmentation";
if ( logger.isDebug() ) {
logger.debug(methodName, null, "vMachines:", fmtArray(globalNodepool.cloneVMachinesByOrder()));
}
List<NodePool> poollist = new ArrayList<NodePool>();
getNodepools(globalNodepool, poollist);
NodePool[] allPools = poollist.toArray(new NodePool[poollist.size()]);
if ( logger.isDebug() ) {
// make sure the node pool list is built correctly
StringBuffer sb = new StringBuffer("Nodepools:");
for ( NodePool np : allPools ) {
sb.append(np.getId());
sb.append(" ");
}
logger.debug(methodName, null, sb.toString());
}
// These next two maps are built lazily, once per call to this routine
Map<String, int[]> vshares = new HashMap<String, int[]>(); // Virtual shares of each order in each nodepool.
// This gets enhanced with pending evictions so we
// can tell whether enough space is accounted for
// for each job.
Map<String, int[]> nshares = new HashMap<String, int[]>(); // For each order, the number of shares available,
// either directly, or through splits from higher-order
// space, enhanced with pending evictions and purges.
Map<String, Map<IRmJob, Integer>> jobs = new HashMap<String, Map<IRmJob, Integer>>();
for ( int npi = 0; npi < allPools.length; npi++ ) { // Turns out to be depth-first traversal !
// First pass, init the structures, including any free space that may have been unusable
NodePool np = allPools[npi];
String id = np.getId();
int[] vmach =globalNodepool.makeArray();
int[] nmach =globalNodepool.makeArray();
Map<IRmJob, Integer> jobmap = new HashMap<IRmJob, Integer>();
vshares.put(id, vmach);
nshares.put(id, nmach);
jobs.put(id, jobmap);
}
boolean must_defrag = false;
String headerfmt = "%14s %20s %6s %4s %7s %6s %2s";
String datafmt = "%14s %20s %6d %4d %7d %6d %2d";
for ( ResourceClass rc : resourceClasses.values() ) {
// Next: Look at every job and work out its "need". Collect jobs by nodepool into the jobmaps.
Map<IRmJob, IRmJob> allJobs = rc.getAllJobs();
String npn = rc.getNodepoolName();
Map<IRmJob, Integer> jobmap = jobs.get(npn);
if ( allJobs.size() == 0 ) continue;
logger.info(methodName, null, String.format(headerfmt, "Nodepool", "User", "PureFS", "NSh", "Counted", "Needed", "O"), "Class:", rc.getName());
for ( IRmJob j : allJobs.values() ) {
if ( j.isRefused() ) {
continue;
}
if ( j.isDeferred() ) {
continue;
}
int counted = j.countNSharesGiven(); // accounting for ramp-up, various caps, etc.
int current = j.countNShares(); // currently allocated, plus pending, less those removed by earlier preemption
int needed = counted - current; // could go negative if its evicting
int order = j.getShareOrder();
if ( j.getSchedulingPolicy() == Policy.FAIR_SHARE ) { // cap on frag threshold
if ( current >= fragmentationThreshold ) {
needed = 0;
} else if ( current >= j.getPureFairShare() ) { // more than our pure share, we're not needy
needed = 0;
} else if ( needed < 0 ) { // more than out count, likely are evicting
needed = 0;
} else if ( needed > 0) {
needed = Math.min(needed, fragmentationThreshold);
jobmap.put(j, needed); // we'll log this in a minute, not here
must_defrag = true;
}
} else { // if not fair-share, must always try to defrag if needed
// its full allocation, and therefore cannot be needy
// UIMA-4275 We rely on quotas to keep 'needed' under control
if ( needed > 0 ) {
jobmap.put(j, needed);
must_defrag = true;
}
}
logger.info(methodName, j.getId(),
String.format(datafmt,
npn,
j.getUser().getName(),
j.getPureFairShare(),
current,
counted,
needed,
order),
(needed > 0) ? "POTENTIALLY NEEDY" : ""
);
// String.format("NP: %10s User %8s Pure fs[%3d] Nshares[%3d] AsgnNshares[%3d] needed[%3d] O[%d] %s",
// npn,
// j.getUser().getName(),
// j.getPureFairShare(),
// current,
// counted,
// needed,
// order,
// (needed > 0) ? "POTENTIALLY NEEDY" : ""
// ));
}
}
if ( ! must_defrag ) return; // the rest of this is expensive, let's bypass if we can
for ( int npi = 0; npi < allPools.length; npi++ ) { // Turns out to be depth-first traversal !
// Next pass, find all the open and potentially open spots in each nodepool. We need to coalesce
// evicted shares with each other and with open space on each machine in order to know whether the
// space is potentially usable by jobs.
NodePool np = allPools[npi];
Map<Node, Machine> machs = np.getAllMachinesForPool();
for ( Machine m : machs.values() ) {
int free = m.countFreedUpShares(); // free space plus evicted shares - eventual space
if ( free != 0 ) {
logger.trace(methodName, null, "Freed shares", free, "on machine", m.getId(), "in nodepool", np.getId());
for ( NodePool npj = np; npj != null; npj = npj.getParent() ) { // must propogate up because of how these tables work
String id_j = npj.getId();
int[] vmach_j = vshares.get(id_j);
if (free >= vmach_j.length) { // UIMA-5605 Avoid out-of-bounds exception
logger.warn(methodName, null, "Nodepool",id_j,"has a maximum size of", vmach_j.length-1,
"but the computed free space is", free);
free = vmach_j.length - 1;
}
logger.trace(methodName, null, "Update v before: NP[", id_j, "] v:", fmtArray(vmach_j));
vmach_j[free]++; // This is the largest potential share that can be made on this machine,
// after evictions starting 'here' and propogating up
logger.trace(methodName, null, "Update v after : NP[", id_j, "] v:", fmtArray(vmach_j));
}
}
}
}
for ( int npi = 0; npi < allPools.length; npi++ ) {
// Next pass, create the cumulative "n" style table from the "v" table of holes
String id = allPools[npi].getId();
int[] vmach = vshares.get(id);
int[] nmach = nshares.get(id);
reworknShares(vmach, nmach); // Populate nmach from vmach for this np, with free or potentially free shares
if ( logger.isInfo() ) {
logger.info(methodName, null, "NP", id, "After check: virtual free Space", fmtArray(vmach));
logger.info(methodName, null, "NP", id, "After check: cumulative free Space", fmtArray(nmach));
}
}
for ( int npi = 0; npi < allPools.length; npi++ ) {
// Last step, traverse jobs by nodepool and determine their need.
String id = allPools[npi].getId();
Map<IRmJob, Integer> jobmap = jobs.get(id);
if ( jobmap.size() == 0 ) continue; // only looking at potentially needy jobs
int[] nmach = nshares.get(id);
for ( IRmJob j : jobmap.keySet() ) {
int needed = jobmap.get(j);
int order = j.getShareOrder();
int available = nmach[order];
int to_remove = 0;
needyJobs.put(j, j);
if ( available >= needed ) {
needed = 0;
to_remove = needed;
} else {
to_remove = available;
needed -= to_remove;
}
if ( to_remove > 0 ) {
NodePool np = allPools[npi];
for ( NodePool npj = np; npj != null; npj = npj.getParent() ) { // must propogate up because of how these tables work
String id_j = npj.getId();
int[] vmach_j = vshares.get(id_j);
int[] nmach_j = nshares.get(id_j);
removeSharesByOrder(vmach_j, nmach_j, to_remove, order);
}
}
if ( needed > 0 ) {
needed_by_job.put(j, needed);
logger.info(methodName, j.getId(),
String.format("NP: %10s User: %10s Pure fs: %3d needed %3d O[%d] %s",
id,
j.getUser().getName(),
j.getPureFairShare(),
needed,
order,
"ACTUALLY NEEDY"
));
}
}
}
}
void insureFullEviction()
{
String methodName = "insureFullEviction";
int jobcount = 0;
for ( ResourceClass rc : resourceClasses.values() ) {
if ( rc.getPolicy() != Policy.RESERVE ) {
jobcount += rc.countJobs();
}
}
if ( jobcount == 0 ) return;
HashMap<IRmJob, Integer> needy = new HashMap<IRmJob, Integer>();
detectFragmentation(needy);
if ( needy.size() == 0 ) {
logger.info(methodName, null, "No needy jobs, defragmentation bypassed.");
return;
}
logger.info(methodName, null, "NEEDY JOBS DETECTED");
doFinalEvictions(needy);
}
/**
* The third stage - work up all counts globally
*/
protected void findWhatOf(ArrayList<ResourceClass> rcs)
{
switch ( rcs.get(0).getPolicy() ) {
case FAIR_SHARE:
whatOfFairShare(rcs);
break;
case FIXED_SHARE:
whatOfFixedShare(rcs);
break;
case RESERVE:
whatOfReserve(rcs);
break;
}
}
void setSchedulingUpdate(ArrayList<ResourceClass> rcs)
{
for ( ResourceClass rc : rcs ) {
HashMap<IRmJob, IRmJob> jobs = rc.getAllJobs();
for ( IRmJob j : jobs.values() ) {
if ( j.isRefused() ) {
continue;
}
if ( j.isExpanded() ) {
schedulingUpdate.addExpandedJob(j);
}
if ( j.isShrunken() ) {
schedulingUpdate.addShrunkenJob(j);
}
if ( j.isStable() && ( ! j.isReservation() ) ) {
schedulingUpdate.addStableJob(j);
}
if ( j.isDormant() ) {
schedulingUpdate.addDormantJob(j);
}
if ( j.isReservation() ) {
schedulingUpdate.addReservation(j);
}
}
}
}
/**
* Figure out what we have to give away. Called on entry, and after any kind of
* action that takes machines out of the schedulable pool ( e.g. reservation ).
*/
private void resetNodepools()
{
//String methodName = "countResources";
int maxorder = 0;
for ( ResourceClass rc: resourceClasses.values() ) {
maxorder = Math.max(maxorder, rc.getMaxJobOrder());
}
globalNodepool.reset(maxorder);
// logger.info(methodName, null, "Scheduling Tables at Start of Epoch:\n", globalNodepool.toString());
}
/**
* IScheduler entry point for the fairShare calculation.
*
* This implements the easy three step process described at the top of the file.
*/
public void schedule(SchedulingUpdate upd)
{
String methodName = "schedule";
int jobcount = 0;
for ( ResourceClass rc : resourceClasses.values() ) {
HashMap<IRmJob, IRmJob> allJobs = rc.getAllJobs();
jobcount += allJobs.size();
for ( IRmJob j : allJobs.values() ) {
j.initJobCap();
}
}
if ( jobcount == 0 ) {
logger.debug(methodName, null, "No jobs to schedule under nodepool", globalNodepool.getId());
return;
}
// UIMA-5190 Reduce info logging ... most of these details can be seen in the schedule sent later to the OR
if (logger.isDebug()) {
logger.info(methodName, null, "Machine occupancy before schedule");
globalNodepool.queryMachines();
}
this.schedulingUpdate = upd;
//
// Reset all counts, and adjust for the non-preemptable work. Nodepool counts end up reflecting all
// *potential* shares in the system.
//
resetNodepools();
accountForNonPreemptable();
//
// Walk through the classes starting with highest priority and just pass out the resources.
// On each iteration we pass the list of all classes at the same priority, to get scheduled
// together. We assume that we do NOT mix policy among priorities, enforced by the caller.
//
for ( int i = 0; i < classes.length; i++ ) {
@SuppressWarnings("unchecked")
ArrayList<ResourceClass> rcs = (ArrayList<ResourceClass>) classes[i];
findHowMuch(rcs);
}
//
// Counts are now placed into all the class, user, and job structures. We need to account for
// all *actual* resources, so we can find free things more gracefully.
//
resetNodepools();
accountForNonPreemptable();
accountForFairShare();
// ////////////////////////////////////////////////////////////////////////////////////////////////////
// And now, find real, physical resources.
//
// First pre-expansion of needy jobs, prioritized
// Second, normal expandion of the scheduled work.
//
if ( logger.isTrace() ) {
globalNodepool.queryMachines();
}
for ( int i = 0; i < classes.length; i++ ) {
@SuppressWarnings("unchecked")
ArrayList<ResourceClass> rcs = (ArrayList<ResourceClass>) classes[i];
expandNeedyJobs(globalNodepool, rcs);
}
for ( int i = 0; i < classes.length; i++ ) {
@SuppressWarnings("unchecked")
ArrayList<ResourceClass> rcs = (ArrayList<ResourceClass>) classes[i];
findWhatOf(rcs);
}
//
//
// ////////////////////////////////////////////////////////////////////////////////////////////////////
// //
// // Ethnic cleansing of shares. We look at each subpool and force eviction of non-member
// // shares, strictly according to counts, to prevent squatters from locking out legitimate
// // members of the pool.
// //
// cleanNodepools(globalNodepool);
doEvictions(globalNodepool);
// doForcedEvictions(globalNodepool);
//
// If we've fragmented we may not have been able to make enough space to get everybody's fair
// share startd. Here we check to make sure everybody who is capped got at least what
// the deserved, and if not, we go in more agressively against jobs that have more than
// their "absolute" fair share.
//
if ( do_defragmentation ) {
insureFullEviction();
}
//
// At last, walk through all the jobs and place them into the scheduling update appropripriately
//
for ( int i = 0; i < classes.length; i++ ) {
@SuppressWarnings("unchecked")
ArrayList<ResourceClass> rcs = (ArrayList<ResourceClass>) classes[i];
setSchedulingUpdate(rcs);
}
globalNodepool.resetPreemptables(); // Reservations: preemptables are machines that are going to get cleared
// for pendingreservations. preemptable machines do not get reset
// normally so we can save state across the scheduling phases.
}
// //
// // Order entities by age, oldest first
// //
// static private class EntityByAgeSorter
// implements Comparator<IEntity>
// {
// public int compare(IEntity e1, IEntity e2)
// {
// if ( e1 == e2 ) return 0;
// // remember older has lower time
// return (int) (e1.getTimestamp() - e2.getTimestamp());
// }
// }
static private class JobByTimeSorter
implements Comparator<IRmJob>
{
public int compare(IRmJob j1, IRmJob j2)
{
if ( j1.equals(j2) ) return 0;
if ( j1.getTimestamp() == j2.getTimestamp() ) { // if tied on time (unlikely)
return (j2.getShareOrder() - j1.getShareOrder()); // break tie on share order, decreasing
}
return (int) (j1.getTimestamp() - j2.getTimestamp()); // increasing time order
}
}
//
// Order classes by share weight, descending
//
static private class ClassByWeightSorter
implements Comparator<ResourceClass>
{
public int compare(ResourceClass r1, ResourceClass r2)
{
if ( r1 == r2 ) return 0;
return (r2.getShareWeight() - r1.getShareWeight());
}
}
//
// Treemap sorter, must never return 0
//
// Order classes by Q share wealth, descending
//
static private class UserByWealthSorter
implements Comparator<User>
{
public int compare(User u1, User u2)
{
if ( u1.equals(u2) ) return 0;
int w1 = u1.getShareWealth();
int w2 = u2.getShareWealth();
if ( w1 == w2 ) return -1; // we don't ever want these guys to compare equal
// unless the instances are the same, checked above
return w2 - w1;
}
}
//
// Treemap sorter, must never return 0
//
// Order jobs by largest assigned q shares first
//
static private class JobByShareSorter
implements Comparator<IRmJob>
{
public int compare(IRmJob j1, IRmJob j2)
{
if ( j1.equals(j2) ) return 0; // same instances MUST return equal,
int s1 = j1.countNShares() * j1.getShareOrder();
int s2 = j2.countNShares() * j2.getShareOrder();
if ( s1 == s2 ) return -1; // similar share count but never equal
return ( s2 - s1 );
}
}
//
// Order jobs by "most able to be generous".
//
static private class FragmentationSorter
implements Comparator<IRmJob>
{
//
// Order by
// a) most over pure fair share
// b) most currently allocated
//
public int compare(IRmJob j1, IRmJob j2)
{
if ( j1.equals(j2) ) return 0;
// pure fair-share
int p1 = j1.getPureFairShare(); // qshares
int p2 = j2.getPureFairShare();
// actual current allocation
int c1 = j1.countNShares() * j1.getShareOrder(); // to qshares
int c2 = j2.countNShares() * j2.getShareOrder();
// overage ...
int o1 = Math.max(0, c1 - p1);
int o2 = Math.max(0, c2 - p2);
if ( o1 < o2 ) return 1; // largest overage first
if ( o1 > o2 ) return -1;
if ( c1 < c2 ) return 1; // largest allocation first
if ( c1 > c2 ) return -1;
// and the tie breaker, these guys are never allowed to compare equal unless the
// instances are the same, and we don't care about the order if we get this far
return -1;
}
}
//
// Order shares by
// - pending (can be reassigned or dropped)
// - investment
//
// Pending always sorts before not-pending.
//
// This is a sorter for a tree map so we have to be sure not to return equality unless the objects
// are the same objects.
//
// static private class FinalEvictionSorter
// implements Comparator<Share>
// {
//
// public int compare(Share s1, Share s2)
// {
// if ( s1 == s2 ) return 0;
//
// // pending shares first, no point expanding them if we don't have to
// if ( s1.isPending() && s2.isPending() ) return -1;
// if ( s1.isPending() ) return -1;
// if (s2.isPending() ) return 1;
//
// // Shares on machines with more space first, deal with defrag, which is why we're here
// int vso1 = s1.getMachine().countFreedUpShares();
// int vso2 = s2.getMachine().countFreedUpShares();
//
// if ( vso1 != vso2 ) {
// return vso2 - vso1; // (more space first)
// }
//
// // All else being equal, use investment
// int inv = (int) (s1.getInvestment() - s2.getInvestment());
// if ( inv == 0 ) return -1; // careful not to return 0
// return inv;
// }
// }
//
// Sort machines for defrag.
// 1 any machine with free space F, and a candidate job j of order O(j) such
// that F + O(j) == O(nj)
// Tiebraker on j is wealth W: W(j1) > W(j2)
// 2 choice, any machine with a candidate job of the same order as the needy job
// Secondary sort, candidate job A is richer than candidate job B
//
// Tiebreak 1 ~ 2: W(j1) > W(j2) - choose host whose job is richest
//
// a) machines with richest users first
// b) largest machine second
//
static private class EligibleMachineSorter
implements Comparator<Machine>
{
public int compare(Machine m1, Machine m2)
{
if ( m1.equals(m2) ) return 0;
int m1wealth = 0;
int m2wealth = 0;
Map<Share, Share> sh1 = m1.getActiveShares();
for ( Share s : sh1.values() ) {
IRmJob j = s.getJob();
User u = j.getUser();
m1wealth = Math.max(m1wealth, u.getShareWealth());
}
Map<Share, Share> sh2 = m2.getActiveShares();
for ( Share s : sh2.values() ) {
IRmJob j = s.getJob();
User u = j.getUser();
m2wealth = Math.max(m2wealth, u.getShareWealth());
}
if ( m1wealth != m2wealth ) return m2wealth - m1wealth; // richest uesr first
long m1mem = m1.getMemory();
long m2mem = m2.getMemory();
if ( m1mem == m2mem ) return -1; // for tree map, must not return 0 unless same object
return (int) (m2mem - m1mem); // largest machine first.
}
}
//
// Order shares by most wealthy owner - defrag
// Orders by wealthiest owner first.
//
static private class ShareByWealthSorter
implements Comparator<Share>
{
public int compare(Share s1, Share s2)
{
if ( s1.equals(s2) ) return 0;
int s1wealth = 0;
int s2wealth = 0;
IRmJob j1 = s1.getJob();
User u1 = j1.getUser();
s1wealth = u1.getShareWealth();
IRmJob j2 = s2.getJob();
User u2 = j2.getUser();
s2wealth = u2.getShareWealth();
if ( s2wealth == s1wealth ) {
return RmJob.compareInvestment(s1, s2); // UIMA-4275
}
return s2wealth - s1wealth;
}
}
}