blob: 4a936b951799641bf3f78a6eabd448bd80222c84 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.rm.scheduler;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.uima.ducc.common.Node;
import org.apache.uima.ducc.common.NodeConfiguration;
import org.apache.uima.ducc.common.NodeIdentity;
import org.apache.uima.ducc.common.Pair;
import org.apache.uima.ducc.common.admin.event.RmAdminQLoadReply;
import org.apache.uima.ducc.common.admin.event.RmAdminQOccupancyReply;
import org.apache.uima.ducc.common.admin.event.RmAdminReply;
import org.apache.uima.ducc.common.admin.event.RmAdminVaryReply;
import org.apache.uima.ducc.common.admin.event.RmQueriedClass;
import org.apache.uima.ducc.common.admin.event.RmQueriedMachine;
import org.apache.uima.ducc.common.admin.event.RmQueriedNodepool;
import org.apache.uima.ducc.common.component.AbstractDuccComponent;
import org.apache.uima.ducc.common.persistence.rm.IRmPersistence;
import org.apache.uima.ducc.common.persistence.rm.RmPersistenceFactory;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.DuccProperties;
import org.apache.uima.ducc.common.utils.DuccPropertiesResolver;
import org.apache.uima.ducc.common.utils.SystemPropertyResolver;
import org.apache.uima.ducc.common.utils.Version;
import org.apache.uima.ducc.common.utils.id.DuccId;
import org.apache.uima.ducc.common.utils.id.DuccIdFactory;
/**
* This process orchestrates scheduling.
* - Receives requests from clients ( job manager, service manager, etc ) for resources
* - Forwards requests and current state to pluggable scheduling implementation
* - Receives a schedule, updates state, sends responses to requestors
* - Maintains state as needed (work item life cycle etc)
*/
public class Scheduler
// extends Thread
implements ISchedulerMain,
SchedConstants
{
IJobManager jobManager;
static DuccLogger logger = DuccLogger.getLogger(Scheduler.class, COMPONENT_NAME);
boolean done = false;
// Boolean force_epoch = false;
String ducc_home;
// Integer epoch = 5; // scheduling epoch, seconds
NodeConfiguration configuration = null; // UIMA-4142 make it global
String defaultDomain = null; // UIMA-4142
boolean needRecovery = false; // UIMA-4142 tell outer layer that recovery is required
AbstractDuccComponent baseComponent; // UIMA-4142, pass in the base for reconfig - reread ducc.properties
NodePool[] nodepools; // top-level nodepools
int max_order = 0;
//
// Fair-share and fixed-share use shares only, not machines
//
Map<DuccId, Share> busyShares = new HashMap<DuccId, Share>(); // Running "fair" share jobs
// incoming reports of machines that are now free
Map<DuccId, Pair<IRmJob, Share>> vacatedShares= new HashMap<DuccId, Pair<IRmJob, Share>>();
// boolean growthOccurred = false; // don't care which grew, just that something grew
List<IRmJob> incomingJobs = new ArrayList<IRmJob>(); // coming in from external world but not added our queues yet
List<IRmJob> recoveredJobs = new ArrayList<IRmJob>(); // coming in from external world but we don't now about them, (hopefully
// because we crashed and not for more nefarious reasons)
List<IRmJob> completedJobs = new ArrayList<IRmJob>(); // signaled complete from outside but not yet dealt with
List<IRmJob> initializedJobs = new ArrayList<IRmJob>(); // Init is complete so we can begin full (un)fair share allocation
//HashMap<Node, Node> incomingNodes = new HashMap<Node, Node>(); // node updates
Map<Node, Node> deadNodes = new HashMap<Node, Node>(); // missed too many heartbeats
Map<Node, Integer> illNodes = new HashMap<Node, Integer>(); // starting to miss, keep track of how many for the db
// HashMap<Node, Node> allNodes = new HashMap<Node, Node>(); // the guys we know
Map<String, NodePool> nodepoolsByNode = new HashMap<String, NodePool>(); // all nodes, and their associated pool
Map<String, String> shortToLongNode = new HashMap<String, String>(); //
Map<String, User> users = new HashMap<String, User>(); // Active users - has a job in the system
//HashMap<DuccId, IRmJob> runningJobs = new HashMap<DuccId, IRmJob>();
Map<DuccId, IRmJob> allJobs = new HashMap<DuccId, IRmJob>();
Map<ResourceClass, ResourceClass> resourceClasses = new HashMap<ResourceClass, ResourceClass>();
Map<String, ResourceClass> resourceClassesByName = new HashMap<String, ResourceClass>();
String defaultFairShareName = null;
String defaultReserveName = null;
int defaultNThreads = 1;
int defaultNTasks = 10;
int defaultMemory = 15;
// these two are initialized in constructor
String schedImplName;
IScheduler[] schedulers;
long share_free_dram = 0; // 0 GB in KB - minim memory after shares are allocated
long dramOverride = 0; // if > 0, use this instead of amount reported by agents (modeling and testing)
int pending_evictions = 0; // for queries
int pending_expansions = 0; // for queries
EvictionPolicy evictionPolicy = EvictionPolicy.SHRINK_BY_MACHINE;
// int nodeMetricsUpdateRate = 30000;
// int startupCountdown = 0; // update each epoch. only schedule when it's > nodeStability
int nodeStability = 3;
boolean stability = false;
private static DuccIdFactory idFactory;
IRmPersistence persistence = null;
// static boolean expandByDoubling = true;
// static int initializationCap = 2; // Max allocation until we know initialization works in
// units of *processes*, not shares (i.e.N-shares).
//
// Version
// 0 - major version
// 6 - minor version
// 3 - ptf - forced eviction under fragmentation.
// 4 - defrag code complete
// beta - not yet "real"!
//
// Bring up to speed with rest of ducc version. 2013-03-06 jrc
//
// 1.0.1 - RM can purge non-preemptables except for Unmanaged Reservations. UIMA-3614
// 1.0.2 - vary-on, vary-off
// 1.0.3 - fix bad check in recursion in NodepoolScheduler.doEvictions
// 1.1.0 - Syncnronize with release
final static int rmversion_major = 2;
final static int rmversion_minor = 0;
final static int rmversion_ptf = 0;
final static String rmversion_string = null;
boolean initialized = false; // we refuse nodeupdates until this is true
public Scheduler(AbstractDuccComponent baseComponent)
{
this.baseComponent = baseComponent; // UIMA-4142, pass in the base for reconfig
}
public synchronized void init()
throws Exception
{
String methodName = "init";
//setName("Scheduler");
DuccLogger.setUnthreaded();
String ep = SystemPropertyResolver.getStringProperty("ducc.rm.eviction.policy", "SHRINK_BY_MACHINE");
evictionPolicy = EvictionPolicy.valueOf(ep);
// nodepool = new NodePool(null, evictionPolicy, 0); // global nodepool
share_free_dram = SystemPropertyResolver.getLongProperty("ducc.rm.reserved.dram", share_free_dram) * 1024 * 1024; // GB -> KB
ducc_home = SystemPropertyResolver.getStringProperty("DUCC_HOME");
// some defaults, for jobs that don't specify them
defaultNTasks = SystemPropertyResolver.getIntProperty("ducc.rm.default.tasks", 10);
defaultNThreads = SystemPropertyResolver.getIntProperty("ducc.rm.default.threads", 1);
defaultMemory = SystemPropertyResolver.getIntProperty("ducc.rm.default.memory", 15); // in GB
// expandByDoubling = RmUtil.getBooleanProperty("ducc.rm.expand.by.doubling", true);
nodeStability = SystemPropertyResolver.getIntProperty("ducc.rm.node.stability", 3); // number of node metrics updates to wait for before scheduling
// 0 means, just jump right in and don't wait
dramOverride = SystemPropertyResolver.getLongProperty("ducc.rm.override.dram", 0);
if ( dramOverride > 0 ) {
dramOverride = dramOverride * (1024 * 1024); // convert to KB
}
if ( idFactory == null ) { // UIMA-4142 only remake it on first boot
idFactory = new DuccIdFactory(1);
}
// try {
// schedImplName = SystemPropertyResolver.getStringProperty("ducc.rm.scheduler", "org.apache.uima.ducc.rm.ClassBasedScheduler");
// @SuppressWarnings("unchecked")
// Class<IScheduler> cl = (Class<IScheduler>) Class.forName(schedImplName);
// scheduler = (IScheduler) cl.newInstance();
// } catch (ClassNotFoundException e) {
// throw new SchedulingException(null, "Cannot find class " + schedImplName);
// } catch (InstantiationException e) {
// throw new SchedulingException(null, "Cannot instantiate class " + schedImplName);
// } catch (IllegalAccessException e) {
// throw new SchedulingException(null, "Cannot instantiate class " + schedImplName + ": can't access constructor.");
// }
String class_definitions = SystemPropertyResolver
.getStringProperty(DuccPropertiesResolver
.ducc_rm_class_definitions, "scheduler.classes");
class_definitions = System.getProperty("DUCC_HOME") + "/resources/" + class_definitions;
try {
initClasses();
} catch ( Exception e ) {
logger.error(methodName, null, e);
throw e;
}
// we share most of the state with the actual scheduling code - no need to keep passing this around
// TODO: Make sure these are all Sialized correctly
// scheduler.setEvictionPolicy(evictionPolicy);
// scheduler.setClasses(resourceClasses);
// scheduler.setNodePool(nodepools[0]);
logger.info(methodName, null, " reserved DRAM : ", (share_free_dram / (1024*1024)), " GB");
logger.info(methodName, null, " DRAM override : ", (dramOverride / (1024*1024)), " GB");
logger.info(methodName, null, " scheduler : ", schedImplName);
logger.info(methodName, null, " default threads : ", defaultNThreads);
logger.info(methodName, null, " default tasks : ", defaultNTasks);
logger.info(methodName, null, " default memory : ", defaultMemory);
logger.info(methodName, null, " default fairshare class : ", defaultFairShareName);
logger.info(methodName, null, " default reserve : ", defaultReserveName);
logger.info(methodName, null, " class definition file : ", class_definitions);
logger.info(methodName, null, " default domain : ", defaultDomain); // UIMA-4142
logger.info(methodName, null, " eviction policy : ", evictionPolicy);
logger.info(methodName, null, " database enabled : ", !System.getProperty("ducc.database.host").equals("--disabled--"));
logger.info(methodName, null, " database implementation : ", System.getProperty("ducc.rm.persistence.impl"));
logger.info(methodName, null, " use prediction : ", SystemPropertyResolver.getBooleanProperty("ducc.rm.prediction", true));
logger.info(methodName, null, " prediction fudge factor : ", SystemPropertyResolver.getIntProperty("ducc.rm.prediction.fudge", 10000));
logger.info(methodName, null, " node stability : ", nodeStability);
logger.info(methodName, null, " init stability : ", SystemPropertyResolver.getIntProperty("ducc.rm.init.stability"));
logger.info(methodName, null, " fast recovery : ", SystemPropertyResolver.getBooleanProperty("ducc.rm.fast.recovery", true));
logger.info(methodName, null, " metrics update rate : ", SystemPropertyResolver.getIntProperty("ducc.agent.node.metrics.publish.rate",
DEFAULT_NODE_METRICS_RATE));
logger.info(methodName, null, " initialization cap : ", SystemPropertyResolver.getIntProperty("ducc.rm.initialization.cap"));
logger.info(methodName, null, " expand by doubling : ", SystemPropertyResolver.getBooleanProperty("ducc.rm.expand.by.doubling", true));
logger.info(methodName, null, " fragmentation threshold : ", SystemPropertyResolver.getIntProperty("ducc.rm.fragmentation.threshold", 2));
logger.info(methodName, null, " do defragmentation : ", SystemPropertyResolver.getBooleanProperty("ducc.rm.defragmentation", true));
logger.info(methodName, null, " DUCC home : ", System.getProperty("DUCC_HOME"));
logger.info(methodName, null, " ActiveMQ URL : ", SystemPropertyResolver.getStringProperty("ducc.broker.url"));
logger.info(methodName, null, " JVM : ", System.getProperty("java.vendor") +
" "+ System.getProperty("java.version"));
logger.info(methodName, null, " JAVA_HOME : ", System.getProperty("java.home"));
logger.info(methodName, null, " JVM Path : ", System.getProperty("ducc.jvm"));
logger.info(methodName, null, " JMX URL : ", System.getProperty("ducc.jmx.url"));
logger.info(methodName, null, " OS Architecture : ", System.getProperty("os.arch"));
logger.info(methodName, null, " OS Name : ", System.getProperty("os.name"));
logger.info(methodName, null, " DUCC Version : ", Version.version());
logger.info(methodName, null, " RM Version : ", ""+ rmversion_major + "."
+ rmversion_minor + "."
+ rmversion_ptf);
persistence = RmPersistenceFactory.getInstance(this.getClass().getName(), "RM");
persistence.clear();
initialized = true;
}
public RmAdminReply reconfigure() // UIMA-4142
{
String methodName = "reconfigure";
RmAdminReply ret = new RmAdminReply();
logger.info(methodName, null, "Reconfiguration starts.");
setInitialized(false); // stop receipt of OR and Agent publications
// First we run the logic that reads the configuration, and if it fails, we abort the reconfig without crashing
// We'll throw it away because we call init() in a minute, which will do the actual configuration, as if booting
try {
readConfiguration(); // UIMA-4142
} catch (Throwable e) {
setInitialized(true);
logger.warn(methodName, null, "Reconfiguration aborted:", e.toString());
ret.setRc(false);
ret.setMessage("Reconfiguration failed: " + e.toString());
return ret;
}
HashMap<Node, Machine> offlineMachines = new HashMap<Node, Machine>();
for (NodePool np : nodepools) {
offlineMachines.putAll(np.getOfflineMachines());
}
// (be careful, don't use the value, that must be discarded as it points to the OLD np)
List<String> offlineHostnames = new ArrayList<String>();
for ( Machine m : offlineMachines.values()) {
logger.info(methodName, null, "Saving offline status of", m.getId());
offlineHostnames.add(m.getId());
}
offlineMachines = null;
this.configuration = null;
this.defaultDomain = null;
this.nodepools = null;
this.max_order = 0;
this.busyShares.clear();
this.vacatedShares.clear();
this.incomingJobs.clear();
this.recoveredJobs.clear();
this.initializedJobs.clear();
this.deadNodes.clear();
this.nodepoolsByNode.clear();
this.shortToLongNode.clear();
this.users.clear();
this.allJobs.clear();
this.resourceClasses.clear();
this.resourceClassesByName.clear();
try {
baseComponent.reloadProperties("ducc.deploy.configuration");
init();
if ( offlineHostnames.size() > 0 ) {
String[] offline = offlineHostnames.toArray(new String[offlineHostnames.size()]);
varyoff(offline);
}
} catch ( Throwable t ) {
// TODO do something? What? If this fails its pretty awful.
}
setRecovery(true); // signal to outer layer that full recovery is needed
setInitialized(true); // resume receipt of publications
logger.info(methodName, null, "Reconfiguration complete.");
ret.setMessage("Reconfiguration complete.");
return ret;
}
public synchronized void setRecovery(boolean v)
{
this.needRecovery = v;
}
public synchronized boolean mustRecover()
{
return this.needRecovery;
}
public synchronized boolean isInitialized()
{
return initialized;
}
public synchronized void setInitialized(boolean v)
{
this.initialized = v;
}
public Machine getMachine(Node n)
{
return getMachine(n.getNodeIdentity());
}
public Machine getMachine(NodeIdentity ni)
{
NodePool nodepool = getNodepoolByName(ni);
return nodepool.getMachine(ni);
}
public void setJobManager(IJobManager jobmanager)
{
this.jobManager = jobmanager;
}
public String getDefaultFairShareName()
{
return defaultFairShareName;
}
public String getDefaultReserveName()
{
return defaultReserveName;
}
public int getDefaultNThreads()
{
return defaultNThreads;
}
public int getDefaultNTasks()
{
return defaultNTasks;
}
public int getDefaultMemory()
{
return defaultMemory;
}
public ResourceClass getResourceClass(String name)
{
return resourceClassesByName.get(name);
}
public IRmJob getJob(DuccId id)
{
return allJobs.get(id);
}
public Share getShare(DuccId id)
{
return busyShares.get(id);
}
// public static int getInitializationCap()
// {
// return initializationCap;
// }
//
// public static boolean isExpandByDoubling()
// {
// return expandByDoubling;
// }
/**
* Calculate share order, given some memory size in GB (as in from a job spec)
*/
public int calcShareOrder(IRmJob j)
{
// Calculate its share order
long mem = j.getMemory() << 20 ; // to KB from GB
int share_quantum = j.getShareQuantum();
int share_order = (int) (mem / share_quantum); // liberal calc, round UP
if ( (mem % share_quantum) > 0 ) {
share_order++;
}
return share_order;
}
/**
* Collect all the classes served by the indicated nodepool (property set). This fills
* in the 'ret' map from the parameter 'dp' and recursive calls to the children in dp.
* @param dp This is the properties object from the configurator for a top-level
* nodepool.
* @param ret This is the map to be filled in by this routine.
*/
void getClassesForNodepool(DuccProperties dp, Map<ResourceClass, ResourceClass> ret)
{
@SuppressWarnings("unchecked")
List<DuccProperties> class_set = (List<DuccProperties>) dp.get("classes");
if ( class_set != null ) {
for ( DuccProperties cl : class_set ) {
ResourceClass rc = resourceClassesByName.get(cl.getStringProperty("name"));
ret.put(rc, rc);
}
}
@SuppressWarnings("unchecked")
List<DuccProperties> children = (List<DuccProperties>) dp.get("children");
if ( children != null ) {
for (DuccProperties child : children ) {
getClassesForNodepool(child, ret);
}
}
}
/**
* Map each node by name into the nodepool it belongs to
*/
void mapNodesToNodepool(Map<String, String> nodes, NodePool pool)
{
if ( nodes == null ) return;
for ( String s : nodes.keySet() ) {
updateNodepoolsByNode(s, pool); // maps from both the fully-qualified name and th shortnmae
}
}
/**
* (Recursively) build up the heirarchy under the parent nodepool.
*/
void createSubpools(NodePool parent, List<DuccProperties> children)
{
if ( children == null ) return;
for ( DuccProperties dp : children ) {
String id = dp.getStringProperty("name");
@SuppressWarnings("unchecked")
Map<String, String> nodes = (Map<String, String>) dp.get("nodes");
int search_order = dp.getIntProperty("search-order", 100);
NodePool child = parent.createSubpool(id, nodes, search_order);
mapNodesToNodepool(nodes, child);
@SuppressWarnings("unchecked")
List<DuccProperties> grandkids = (List<DuccProperties>) dp.get("children");
createSubpools(child, grandkids);
}
}
// UIMA-4142 better modularize this code
NodeConfiguration readConfiguration()
throws Exception
{
String class_definitions = SystemPropertyResolver
.getStringProperty(DuccPropertiesResolver
.ducc_rm_class_definitions, "scheduler.classes");
String user_registry = SystemPropertyResolver
.getStringProperty(DuccPropertiesResolver
.ducc_rm_user_registry, "ducc.users");
class_definitions = System.getProperty("DUCC_HOME") + "/resources/" + class_definitions;
String me = Scheduler.class.getName() + ".Config";
DuccLogger initLogger = new DuccLogger(me, COMPONENT_NAME);
NodeConfiguration nc = new NodeConfiguration(class_definitions, null, user_registry, initLogger); // UIMA-4142 make the config global
nc.readConfiguration();
return nc; // UIMA-4142
}
// UIMA-4142, don't pass in class def file, instead use common readConfiguration
void initClasses()
{
String methodName = "initClasses";
try {
configuration = readConfiguration();
} catch (Throwable e) {
// RM boot. We must abort being has we have no prior working configuration to fall back to.
logger.error(methodName, null, e);
logger.error(methodName, null, "Scheduler exits: unable to read configuration.");
System.out.println("Scheduler exits: unable to read configuration.");
e.printStackTrace();
System.exit(1);
}
defaultDomain = configuration.getDefaultDomain(); // UIMA-4142
configuration.printConfiguration();
DuccProperties[] nps = configuration.getToplevelNodepools();
Map<String, DuccProperties> cls = configuration.getClasses();
nodepools = new NodePool[nps.length]; // top-level nodepools
schedulers = new IScheduler[nps.length]; // a schedler for each top-level nodepool
// Here build up the ResourceClass definitions
logger.info(methodName, null, "Classes:");
logger.info(methodName, null, ResourceClass.getHeader());
logger.info(methodName, null, ResourceClass.getDashes());
for ( DuccProperties props : cls.values() ) {
ResourceClass rc = new ResourceClass(props);
resourceClasses.put(rc, rc);
resourceClassesByName.put(rc.getName(), rc);
logger.info(methodName, null, rc.toString());
}
DuccProperties dc = configuration.getDefaultFairShareClass();
if ( dc != null ) {
defaultFairShareName = dc.getProperty("name");
}
dc = configuration.getDefaultReserveClass();
if ( dc != null ) {
defaultReserveName = dc.getProperty("name");
}
// Instatntiate one scheduler per top-level nodepool
try {
schedImplName = SystemPropertyResolver.getStringProperty("ducc.rm.scheduler", "org.apache.uima.ducc.rm.ClassBasedScheduler");
@SuppressWarnings("unchecked")
Class<IScheduler> cl = (Class<IScheduler>) Class.forName(schedImplName);
for ( int i = 0; i < nps.length; i++ ) {
logger.info(methodName, null, "Rebuilding", schedImplName, "for top level nodepool", nps[i].get("name"));
schedulers[i] = (IScheduler) cl.newInstance();
schedulers[i].setEvictionPolicy(evictionPolicy);
}
} catch (ClassNotFoundException e) {
throw new SchedulingException(null, "Cannot find class " + schedImplName);
} catch (InstantiationException e) {
throw new SchedulingException(null, "Cannot instantiate class " + schedImplName);
} catch (IllegalAccessException e) {
throw new SchedulingException(null, "Cannot instantiate class " + schedImplName + ": can't access constructor.");
}
// Here create the nodepool configuration
for ( int i = 0; i < nps.length; i++ ) {
DuccProperties np = nps[i];
String id = np.getStringProperty("name");
@SuppressWarnings("unchecked")
Map<String, String> nodes = (Map<String, String>) np.get("nodes");
int search_order = np.getIntProperty("search-order", 100);
int q = np.getIntProperty("share-quantum", 15) << 20 ; // to kB which is how the nodes report in
nodepools[i] = new NodePool(null, id, nodes, evictionPolicy, 0, search_order, q);
schedulers[i].setNodePool(nodepools[i]); // set its top-level nodepool
mapNodesToNodepool(nodes, nodepools[i]);
logger.info(methodName, null, "Created top-level nodepool", id);
@SuppressWarnings("unchecked")
List<DuccProperties> children = (List<DuccProperties>) np.get("children");
createSubpools(nodepools[i], children);
Map<ResourceClass, ResourceClass> classesForNp = new HashMap<ResourceClass, ResourceClass>();
getClassesForNodepool(np, classesForNp); // all classes served by this heirarchy - fills in classesForNp
for ( ResourceClass rc: classesForNp.values() ) { // UIMA-4065 tell each cl which np serves it
String rcid = rc.getNodepoolName();
if ( rcid != null ) {
// set the two-way pointers between rc and np
NodePool subpool = nodepools[i].getSubpool(rcid);
rc.setNodepool(subpool); // rc -> nodepool
logger.info(methodName, null, "Assign rc", rc.getName(), "to np", subpool.getId());
subpool.addResourceClass(rc); // nodepool -> rc
}
}
schedulers[i].setClasses(classesForNp);
}
// Here create or update Users with constraints from the registry
Map<String, DuccProperties> usrs = configuration.getUsers(); // UIMA-4275
for ( Object o : usrs.keySet() ) { // iterate over users
String n = (String) o;
DuccProperties dp = usrs.get(n);
for ( Object l : dp.keySet() ) { // iterate over limits for the user
if ( !((String)l).startsWith("max-allotment")) continue; // only this supported at this time
String val = ((String) dp.get(l)).trim();
int lim = Integer.parseInt( val ); // verified parsable int during parsing
User user = users.get(n);
if (user == null) {
user = new User(n);
users.put(n, user);
}
if ( val.contains(".") ) {
String[] tmp = ((String)l).split("\\."); // max_allotment.classname
ResourceClass rc = resourceClassesByName.get(tmp[1]);
user.overrideLimit(rc, lim); // constrain allotment for this class to value in l
} else {
user.overrideGlobalLimit(lim);
}
}
}
}
/**
* Called only from schedule, under the 'this' monitor.
*
* We then take the SchedulingUpdate from the IScheduler and dispatches orders to
* the world to make it happen.
*
* For jobs that lose resources, job manager is asked to stop execution in specific shares.
* For jobs that gain resources, job manager is asked to start execution in specific shares.
* Jobs that don't change are leftovers. If they're not running at all, they're in the pending
* list; they might also be in the running list but had no allocation changes in the current epoch.
*/
private JobManagerUpdate dispatch(SchedulingUpdate upd, JobManagerUpdate jmu)
{
String methodName = "dispatch";
HashMap<IRmJob, IRmJob> jobs;
pending_evictions = 0; // for queries
pending_expansions = 0; // for queries
// Go through shrunken jobs - if they are shrunken to 0, move to dormant
jobs = upd.getShrunkenJobs();
for (IRmJob j : jobs.values()) {
logger.trace(methodName, j.getId(), ">>>>>>>>>> SHRINK");
HashMap<Share, Share> sharesE = j.getAssignedShares();
HashMap<Share, Share> sharesR = j.getPendingRemoves();
logger.trace(methodName, j.getId(), "removing", sharesR.size(), "of existing", sharesE.size(), "shares.");
pending_evictions += (sharesR.size() * j.getShareOrder());
for ( Share s : sharesE.values() ) {
logger.trace(methodName, j.getId(), " current", s.toString());
}
for ( Share s : sharesR.values() ) {
logger.trace(methodName, j.getId(), " remove ", s.toString());
}
logger.trace(methodName, j.getId(), ">>>>>>>>>>");
jmu.removeShares(j, sharesR);
// jobManager.stopJob(j, shares); // stops job on everything on the pendingRemoves list
// j.clearPendingRemoves();
}
// Go through expanded jobs - if they are dormant, remove from dormant
// then add to running.
// Tell the server it needs to start some machines for the job
jobs = upd.getExpandedJobs();
for (IRmJob j : jobs.values() ) {
HashMap<Share, Share> sharesE = j.getAssignedShares();
HashMap<Share, Share> sharesN = j.getPendingShares();
logger.trace(methodName, j.getId(), "<<<<<<<<<< EXPAND");
logger.trace(methodName, j.getId(), "adding", sharesN.size(), "new shares to existing", sharesE.size(), "shares.");
pending_expansions += (sharesN.size() * j.getShareOrder());
for ( Share s : sharesE.values()) {
logger.trace(methodName, j.getId(), " existing ", s.toString());
}
for ( Share s : sharesN.values()) {
logger.trace(methodName, j.getId(), " expanding", s.toString());
}
logger.trace(methodName, j.getId(), "<<<<<<<<<<");
sharesN = j.promoteShares();
if ( sharesN.size() == 0 ) {
// internal error - should not be marked expanded if no machines
throw new SchedulingException(j.getId(), "Trying to execute expanded job but no pending machines.");
}
for ( Share s : sharesN.values()) { // update machine books
// Sanity checks on the bookkeeping
busyShares.put(s.getId(), s);
}
// DuccId id = j.getId(); // pull from dormant, maybe
// if ( dormantJobs .containsKey(id) ) {
// dormantJobs .remove(id);
// }
//runningJobs.put(id, j);
jmu.addShares(j, sharesN);
// jobManager.executeJob(j, shares); // will update job's pending lists
}
jobs = upd.getStableJobs(); // squirrel these away to try next time
for (IRmJob j: jobs.values()) {
if ( j.countNShares() < 0 ) {
throw new SchedulingException(j.getId(), "Share count went negative " + j.countNShares());
}
logger.trace(methodName, j.getId(), ".......... STABLE with ", j.countNShares(), " shares.");
}
jobs = upd.getDormantJobs(); // squirrel these away to try next time
for (IRmJob j: jobs.values()) {
logger.trace(methodName, j.getId(), ".......... DORMANT");
// dormantJobs .put(j.getId(), j);
}
jobs = upd.getReservedJobs();
for (IRmJob j: jobs.values()) {
logger.trace(methodName, j.getId(), "<<<<<<<<<< RESERVE");
HashMap<Share, Share> sharesE = j.getAssignedShares();
HashMap<Share, Share> sharesN = j.getPendingShares();
if ( sharesE.size() == j.getMaxShares() ) {
logger.trace(methodName, j.getId(), "reserve_stable", sharesE.size(), "machines");
} else if ( sharesN.size() == j.getMaxShares() ) { // reservation is complete but not yet confirmed?
logger.trace(methodName, j.getId(), "reserve_adding", sharesN.size(), "machines");
for ( Share s : sharesN.values()) {
logger.trace(methodName, j.getId(), " reserve_expanding ", s.toString());
}
jmu.addShares(j, sharesN);
j.promoteShares();
} else {
logger.trace(methodName, j.getId(), "reserve_pending", j.getMaxShares(), "machines");
}
logger.trace(methodName, j.getId(), "<<<<<<<<<<");
}
jmu.setAllJobs((HashMap<DuccId, IRmJob>)allJobs);
jobs = upd.getRefusedJobs();
Iterator<IRmJob> iter = jobs.values().iterator();
while ( iter.hasNext() ) {
IRmJob j = iter.next();
logger.trace(methodName, j.getId(), ".......... REFUSED");
}
return jmu;
}
/**
* We don't accept new work or even Orchestrator state updates until "ready". We do
* want machines, but be sure the internal structures are protected.
*/
public synchronized boolean ready()
{
return stability;
}
public synchronized void start()
{
stability = true;
}
public void stop()
{
persistence.close();
}
protected void handleIllNodes()
{
String methodName = "handleIllNodes";
if ( ! isInitialized() ) {
logger.info(methodName, null, "Waiting for (re)initialization.");
return;
}
HashMap<Node, Integer> nodeUpdates = new HashMap<Node, Integer>();
synchronized(illNodes) {
nodeUpdates.putAll(illNodes);
illNodes.clear();
}
synchronized(this) {
for ( Node n : nodeUpdates.keySet() ) {
Machine m = getMachine(n);
if ( m == null ) {
logger.warn(methodName, null, "Cannot find any record of machine", n.getNodeIdentity().getName());
continue;
}
int count = nodeUpdates.get(n);
if ( count == 0 ) {
m.heartbeatArrives();
} else {
m.heartbeatMissed(count);
}
}
}
}
protected void handleDeadNodes()
{
String methodName = "handleDeadNodes";
if ( ! isInitialized() ) {
logger.info(methodName, null, "Waiting for (re)initialization.");
return;
}
HashMap<Node, Node> nodeUpdates = new HashMap<Node, Node>();
synchronized(deadNodes) {
nodeUpdates.putAll(deadNodes);
deadNodes.clear();
}
synchronized(this) {
for ( Node n : nodeUpdates.values() ) {
Machine m = getMachine(n);
if ( m == null ) {
// must have been removed because of earlier missed hb
continue;
}
logger.warn(methodName, null, "***Purging machine***", m.getId(), "due to missed heartbeats. THreshold:", nodeStability);
NodePool np = m.getNodepool();
np.nodeLeaves(m);
}
}
}
/**
* We first accept any changes and requests from the outside world and place them where they
* can be acted on in this epoch.
*
* We then pass all relevant requests and resources to the IScheduler. This returns a
* SchedulingUpdate which is passed to the dispatcher to be acted upon.
*/
public JobManagerUpdate schedule()
{
String methodName = "schedule";
// if ( startupCountdown++ < nodeStability ) {
// logger.info(methodName, null, "Startup countdown:", startupCountdown, "of", nodeStability);
// return null;
// }
if ( ! ready() ) {
return null;
}
if ( ! isInitialized() ) {
logger.info(methodName, null, "Waiting for (re)initialization.");
return null;
}
// tracking the OR hang problem - are topics being delivered?
logger.info("nodeArrives", null, "Total arrivals:", total_arrivals);
synchronized(this) {
handleIllNodes();
handleDeadNodes();
resetNodepools();
}
// TODO: Can we combine these two into one?
SchedulingUpdate upd = new SchedulingUpdate(); // state from internal scheduler
JobManagerUpdate jmu = new JobManagerUpdate(); // state we forward to job manager
// int nchanges = 0;
ArrayList<IRmJob> jobsToRecover = new ArrayList<IRmJob>();
synchronized(recoveredJobs) {
jobsToRecover.addAll(recoveredJobs);
recoveredJobs.clear();
// nchanges += jobsToRecover.size();
}
ArrayList<IRmJob> newJobs = new ArrayList<IRmJob>();
//
// If there are new jobs we need to init some things and start a scheduling cycle.
//
synchronized(incomingJobs) {
newJobs.addAll(incomingJobs);
incomingJobs.clear();
// nchanges += newJobs.size();
}
//
// If some jobs pased initializion we need to signal a scheduling cycle to get
// them their fair share
//
// synchronized(initializedJobs) {
// if ( initializedJobs.size() > 0 ) {
// nchanges++;
// }
// initializedJobs.clear();
// }
//
// If some jobs completed we need to process clearning them out and signal a
// scheduling cycle to try to reuse their resources.
//
ArrayList<IRmJob> doneJobs = new ArrayList<IRmJob>();
synchronized(completedJobs) {
doneJobs.addAll(completedJobs);
completedJobs.clear();
//nchanges += doneJobs.size();
}
//
// If some shares were vacated we need to clear them out and run a scheduling cycle.
//
ArrayList<Pair<IRmJob, Share>> doneShares= new ArrayList<Pair<IRmJob, Share>>();
synchronized(vacatedShares) {
doneShares.addAll(vacatedShares.values());
vacatedShares.clear();
//nchanges += doneShares.size();
// we use the vacatedShares object to control share growth as well
//if ( growthOccurred ) nchanges++;
//growthOccurred = false;
}
// boolean must_run = false;
// synchronized(force_epoch) {
// must_run = force_epoch;
// force_epoch = false;
// }
// if ( (nchanges == 0) && !must_run ) {
// jmu.setAllJobs(allJobs);
// return jmu;
// }
// TODO if we remove this code above be sure to clear out all the force_epoch nonsense
// TODO does this even use growthOccurred?
synchronized(this) {
// before looking at jobs, insure we're updated after a crash
for ( IRmJob j : jobsToRecover ) {
processRecovery(j);
}
// process these next to free up resources for the scheduling cycle
for (Pair<IRmJob, Share> p : doneShares) {
processCompletion(p.first(), p.second());
}
for (IRmJob j : doneJobs) {
processCompletion(j);
}
// update user records, "check in" new jobs
if ( newJobs.size() > 0 ) {
logger.info(methodName, null, "Jobs arrive:");
logger.info(methodName, null, "submit", RmJob.getHeader());
}
Iterator<IRmJob> iter = newJobs.iterator();
while ( iter.hasNext() ) {
IRmJob j = iter.next();
if ( j.isRefused() ) { // the JobManagerConverter has already refused it
logger.info(methodName, j.getId(), "Bypassing previously refused job.");
upd.refuse(j, j.getRefusalReason());
}
String user = j.getUserName();
User u = users.get(user);
if ( u == null ) {
u = new User(user);
users.put(user, u);
}
j.setUser(u);
// Calculate its share order
int share_order = calcShareOrder(j);
j.setShareOrder(share_order);
// Assign it to its priority class
String clid = j.getClassName();
ResourceClass prclass = resourceClassesByName.get(clid);
u.addJob(j);
allJobs.put(j.getId(), j);
if ( prclass == null ) {
upd.refuse(j, "Cannot find priority class " + clid + " for job");
continue;
}
// UIMA-4275 never refuse impossible work, just let it hang out
// if ( share_order > max_order ) {
// upd.refuse(j, "Memory requested " + j.getMemory() + "GB exceeds the capacity of any machine in the cluster.");
// continue;
// }
/**
* We want to allow this - a normal job, submitted to a reservation class.
if ( (prclass.getPolicy() == Policy.RESERVE ) && ( ! j.isReservation() ) ) {
upd.refuse(j, "Reservaction class " +
prclass.getId() + " specified but work is not a reservation.");
continue;
}
*/
if ( ((prclass.getPolicy() != Policy.RESERVE ) && (prclass.getPolicy() != Policy.FIXED_SHARE)) && ( j.isReservation() ) ) {
upd.refuse(j, "Class " + prclass.getName() + " is policy " +
prclass.getPolicy() + " but the work is submitted as a reservation.");
continue;
}
prclass.addJob(j);
j.setResourceClass(prclass);
try {
persistence.addJob(j);
} catch (Exception e) {
logger.warn(methodName, j.getId(), "Cannot persist new job in database:", e);
}
logger.info(methodName, j.getId(), "submit", j.toString());
}
logger.info(methodName, null, "Scheduling " + newJobs.size(), " new jobs. Existing jobs: " + allJobs.size());
for ( int i = 0; i < schedulers.length; i++ ) {
logger.info(methodName, null, "Run scheduler", i, "with top-level nodepool", nodepools[i].getId());
schedulers[i].schedule(upd);
}
for ( IRmJob j : allJobs.values() ) { // UIMA-4577 persist 'demand'
try {
persistence.updateDemand(j);
} catch (Exception e) {
logger.warn(methodName, j.getId(), "Cannot update demand in database:", e);
}
}
logger.info(methodName, null, "--------------- Scheduler returns ---------------");
logger.info(methodName, null, "\n", upd.toString());
logger.info(methodName, null, "------------------------------------------------");
dispatch(upd, jmu); // my own job lists get updated by this
return jmu;
}
}
synchronized public void shutdown()
{
done = true;
}
// public void run()
// {
// String methodName = "run";
// while ( ! done ) {
// try { sleep(epoch); } catch (InterruptedException e) { }
// logger.info(methodName, null, "========================== Epoch starts ===========================");
// try {
// schedule();
// } catch ( SchedulingException e ) {
// logger.info(methodName, e.jobid, e);
// }
// logger.info(methodName, null, "========================== Epoch ends ===========================");
// }
// }
/**
* maps from both the fully-qualified name and th shortnmae
*/
void updateNodepoolsByNode(String longname, NodePool np)
{
String methodName = "updateNodepoolsByNode";
String shortname = longname;
int ndx = longname.indexOf(".");
logger.info(methodName, null, "Map", longname, "to", np.getId());
nodepoolsByNode.put(longname, np);
if ( ndx >= 0 ) {
shortname = longname.substring(0, ndx);
nodepoolsByNode.put(shortname, np);
shortToLongNode.put(shortname, longname);
logger.info(methodName, null, "Map", shortname, "to", np.getId());
}
}
//
// Return a nodepool by Node. If the node can't be associated with a nodepool, return the
// default nodepool, which is always the first one defined in the config file.
//
NodePool getNodepoolByName(NodeIdentity ni)
{
NodePool np = nodepoolsByNode.get( ni.getName() );
if ( np == null ) {
np = nodepoolsByNode.get( ni.getIp() );
}
if ( np == null ) {
np = nodepools[0];
updateNodepoolsByNode(ni.getName(), np); // assign this guy to the default np
// nodepoolsByNode.put( ni.getName(), np); // assign this guy to the default np
}
return np;
}
private int total_arrivals = 0;
public synchronized void nodeArrives(Node node)
{
String methodName = "nodeArrives";
if ( ! isInitialized() ) {
logger.info(methodName, null, "Waiting for (re)initialization; node = " + node.getNodeIdentity().getName());
return;
}
synchronized(illNodes) { // stop flagging it as a problem
illNodes.remove(node);
}
// String methodName = "nodeArrives";
// The first block insures the node is in the scheduler's records as soon as possible
total_arrivals++; // report these in the main schedule loop
NodePool np = getNodepoolByName(node.getNodeIdentity()); // finds np assigned in ducc.nodes; if none, returns the default np
Machine m = np.getMachine(node);
int share_order = 0;
// let's always recalculate this in case it changes for whatever bizarre reason (reboot, or pinned process gone, or whatever)
long allocatable_mem = node.getNodeMetrics().getNodeMemory().getMemFree() - share_free_dram;
if ( dramOverride > 0 ) {
allocatable_mem = dramOverride;
}
share_order = (int) (allocatable_mem / np.getShareQuantum()); // conservative - rounds down (this will always cast ok)
// NOTE: we cannot set the order into the machine yet, in case it has changed, because NodePool needs to adjust based
// on current and new
max_order = Math.max(share_order, max_order);
m = np.nodeArrives(node, share_order); // announce to the nodepools
m.heartbeatArrives();
}
public void nodeHb(Node n, int count)
{
synchronized(illNodes) {
illNodes.put(n, count);
}
}
public void nodeDeath(Map<Node, Node> nodes)
{
synchronized(deadNodes) {
deadNodes.putAll(nodes);
}
}
/**
* User passed us a node by name. Maybe did and maybe didn't qualify it.
* Maybe the node checked in qualified maybe it didn't. Here we try to find
* something that kind of matches.
* UIMA-4142. Technically a bug on vary-on and vary-off but found and fixed as part of
* the indicated Jira.
*/
synchronized String resolve(String node)
{
NodePool np = nodepoolsByNode.get(node);
if ( np == null ) return null; // indexed by long and short so if not found we're stuck
if ( np.hasNode(node) ) return node; // he knows it by this name we're done
int ndx = node.indexOf(".");
if ( ndx > 0 ) {
// np MUST know it by either long or short or it wouldn't be in nodepoolsByNode
// so it must be short
return node.substring(0, ndx);
} else {
// and vice-versa, it must be the long
return shortToLongNode.get(node);
}
}
public synchronized RmAdminReply varyon(String[] nodes)
{
String methodName = "varyon";
RmAdminVaryReply ret = new RmAdminVaryReply();
StringBuffer sb = new StringBuffer();
for (String n : nodes ) {
String rn = resolve(n);
if ( rn == null ) {
ret.setRc(false);
ret.addFailedHost(n);
sb.append("VaryOn: " + n + " cannot be found in the RM.\n");
} else {
NodePool np = nodepoolsByNode.get(rn); // if null, resolve will fail
if ( np == null ) {
ret.setRc(false);
ret.addFailedHost(rn);
sb.append("VaryOn: " + n + " cannot find associated nodepool.\n");
} else {
String repl = np.varyon(rn);
logger.info(methodName, null, repl);
sb.append(repl);
sb.append("\n");
}
}
}
ret.setMessage(sb.toString());
return ret;
}
public synchronized RmAdminReply varyoff(String[] nodes)
{
String methodName = "varyoff";
RmAdminVaryReply ret = new RmAdminVaryReply();
StringBuffer sb = new StringBuffer();
for (String n : nodes ) {
String rn = resolve(n);
if ( rn == null ) {
ret.setRc(false);
ret.addFailedHost(n);
sb.append("VaryOff: " + n + " cannot be found in the RM.\n");
} else {
NodePool np = nodepoolsByNode.get(rn); // if null, resolve will fail
if ( np == null ) {
ret.setRc(false);
ret.addFailedHost(rn);
} else {
String repl = np.varyoff(rn);
logger.info(methodName, null, repl);
sb.append(repl);
sb.append("\n");
}
}
}
ret.setMessage(sb.toString());
return ret;
}
RmQueriedNodepool getNpStats(NodePool np)
{
RmQueriedNodepool ret = new RmQueriedNodepool();
ret.setName(np.getId());
ret.setOnline(np.countLocalMachines());
ret.setDead(np.countLocalUnresponsiveMachines());
ret.setOffline(np.countLocalOfflineMachines());
ret.setSharesAvailable(np.countLocalShares());
ret.setSharesFree(np.countLocalQShares());
ret.setAllMachines(np.countAllLocalMachines());
int[] onlineMachines = np.makeArray();
int[] freeMachines = np.makeArray();
for ( int i = 1; i < freeMachines.length; i++ ) {
freeMachines[i] += np.countFreeMachines(i); // (these are local, as we want)
}
//np.getLocalOnlineByOrder(onlineMachines);
ret.setOnlineMachines(onlineMachines);
ret.setFreeMachines(freeMachines);
ret.setVirtualMachines(np.countLocalVMachinesByOrder());
// logger.info(methodName, null, np.getId() + ": online", online, "dead", dead, "offline", offline, "shares_available", shares_available, "shares_free", shares_free);
// logger.info(methodName, null, np.getId() + ": allMachines ", Arrays.toString(allMachines));
// logger.info(methodName, null, np.getId() + ": onlineByOrder ", Arrays.toString(onlineMachines));
// logger.info(methodName, null, np.getId() + "------- freeMachines should match free -------");
// logger.info(methodName, null, np.getId() + ": freeMachines ", Arrays.toString(freeMachines));
// logger.info(methodName, null, np.getId() + ": free ", Arrays.toString(free));
// logger.info(methodName, null, np.getId() + "----------------------------------------------");
// logger.info(methodName, null, np.getId() + ": virtualMachines", Arrays.toString(virtualMachines));
return ret;
}
void calculateLoad(RmAdminQLoadReply reply)
{
for ( ResourceClass cl : resourceClasses.values() ) {
RmQueriedClass qcl = new RmQueriedClass();
switch ( cl.getPolicy() ) {
case FAIR_SHARE:
qcl.setPolicy("FAIR_SHARE");
break;
case FIXED_SHARE:
qcl.setPolicy("FIXED_SHARE");
break;
case RESERVE:
qcl.setPolicy("RESERVE");
break;
}
// TODO MUST FIX THIS
// int[] demanded = NodePool.makeArray();
// int[] awarded = NodePool.makeArray();
// HashMap<IRmJob, IRmJob> jobs = cl.getAllJobs();
// for ( IRmJob j : jobs.values() ) {
// int o = j.getShareOrder();
// demanded[o] += j.queryDemand();
// awarded[o] += j.countNShares();
// }
// qcl.setName(cl.getName());
// qcl.setDemanded(demanded);
// qcl.setAwarded(awarded);
// reply.addClass(qcl);
}
}
void listAllNodepools(NodePool parent, ArrayList<NodePool> list)
{
list.add(parent);
for (NodePool np : parent.getChildren().values() ) {
listAllNodepools(np, list);
}
}
public synchronized RmAdminQLoadReply queryLoad()
{
RmAdminQLoadReply ret = new RmAdminQLoadReply();
if ( ! ready() ) {
ret.notReady();
return ret;
}
calculateLoad(ret);
ArrayList<NodePool> allpools = new ArrayList<NodePool>();
for ( NodePool np : nodepools ) {
listAllNodepools(np, allpools);
}
for ( NodePool np : allpools ) {
ret.addNodepool(getNpStats(np));
}
return ret;
}
public synchronized RmAdminQOccupancyReply queryOccupancy()
{
RmAdminQOccupancyReply ret = new RmAdminQOccupancyReply();
if ( ! ready() ) {
ret.notReady();
return ret;
}
//
// iterate top-level nodepools to get all their subpools
// iterate the subpools to get all their machines
// iterage the machines and request a query object
// add query object to ret
// return ret
// We want to be dependent on common project, not the other way around, so
// we keep the query objects in common and put knowledge of how to construc
// them into rm's Machine class.
//
// The alternative, passing RM's Machine to the query object creates a circular
// dependency with RM depending on common and common depending on RM.
//
//
// Not a cheap query, by the way.
//
// NOTE: No longer used by the rm_qoccupancy script which now goes directly to the database
//
for ( NodePool np : nodepools ) {
// NOTE: The offline & dead nodes are also in the AllMachines list so must be removed
Map<Node, Machine> allMachs = np.getAllMachines();
Map<Node, Machine> offline = np.getOfflineMachines(); // UIMA-4234
Map<Node, Machine> unresponsive = np.getUnresponsiveMachines(); // UIMA-4234
for ( Node n : offline.keySet() ) {
Machine m = offline.get(n);
RmQueriedMachine qm = m.queryMachine();
qm.setOffline();
if ( unresponsive.containsKey(n) ) {
unresponsive.remove(n);
qm.setUnresponsive();
}
ret.addMachine(qm);
allMachs.remove(n);
}
for ( Node n : unresponsive.keySet() ) {
Machine m = unresponsive.get(n);
RmQueriedMachine qm = m.queryMachine();
qm.setUnresponsive();
ret.addMachine(qm);
allMachs.remove(n);
}
for ( Node n : allMachs.keySet() ) {
Machine m = allMachs.get(n);
ret.addMachine(m.queryMachine());
}
}
return ret;
}
public synchronized void signalState(DuccId jobid, String state)
{
IRmJob j = allJobs.get(jobid);
if ( j != null ) { // might not be here yet, we'll get it later
j.setState(state);
}
}
/**
* Callback from job manager, need shares for a new fair-share job.
*/
public void signalNewWork(IRmJob job)
{
// We'll synchronize only on the incoming job list
synchronized(incomingJobs) {
incomingJobs.add(job);
}
}
// public void signalForceEpoch()
// {
// synchronized( force_epoch ) {
// force_epoch = true;
// }
// }
public void signalInitialized(IRmJob job)
{
// We'll synchronize only on the incoming job list
synchronized(initializedJobs) {
initializedJobs.add(job);
}
}
public void signalRecovery(IRmJob job)
{
synchronized(recoveredJobs) {
recoveredJobs.add(job);
}
}
public void jobCancelled(DuccId id)
{
// TODO Fill this in.
}
/**
* Callback from job manager when a job completes. We just believe him, no sanity checks or other such stuff.
*/
public void signalCompletion(DuccId id)
{
String methodName = "signalCompletion";
synchronized(completedJobs) {
try {
IRmJob job = allJobs.get(id);
if ( job == null ) {
logger.warn(methodName, id, "Job completion signal: early termination; nothing to complete.");
return; // canceled or terminated very soon.
}
logger.info(methodName, id, "Job completion signal.");
completedJobs.add(job);
} catch (Throwable t) {
logger.warn(methodName, id, t);
}
}
}
/**
* Callback from job manager when a specific share exits but the job is still alive.
*/
public void signalCompletion(IRmJob job, Share share)
{
String methodName = "signalCompletion";
synchronized(vacatedShares) {
logger.info(methodName, job.getId(), "Job vacate signal share: ", share.toString());
vacatedShares.put(share.getId(), new Pair<IRmJob, Share>(job, share));
}
}
/**
* Callback from job manager when a specific share gets a process associated.
*/
// public void signalGrowth(DuccId jobid, Share share)
// {
// String methodName = "signalGrowth";
// synchronized(vacatedShares) {
// logger.info(methodName, jobid, "Job growth signal share: ", share.toString());
// growthOccurred = true;
// }
// }
/**
* Called in scheduling cycle, to actually complete the job - avoids deadlock
*/
private synchronized void processCompletion(IRmJob job)
{
String methodName = "processCompletion";
logger.info(methodName, job.getId(), "Job completes.");
try {
persistence.deleteJob(job); // UIMA-4577
} catch (Exception e) {
logger.warn(methodName, job.getId(), "Cannot delete job from database:", e);
}
// -- clean up the running jobs list
IRmJob j = allJobs.remove(job.getId());
if ( j == null ) {
logger.info(methodName, job.getId(), "Job is not in run list!"); // can happen if job is refused very early
return;
}
j.markComplete();
// -- clean up user list
User user = users.get(j.getUserName());
user.remove(job); // UIMA4275 don't clean up users list because it may have registry things in it
ResourceClass rc = job.getResourceClass();
if ( rc != null ) {
rc.removeJob(j); // also clears it if it's a reservation
} else if ( !j.isRefused() ) {
throw new SchedInternalError(j.getId(), "Job exits from class " + job.getClassName() + " but we cannot find the priority class definition.");
}
// -- clean up machine lists
HashMap<Share, Share> shares= job.getAssignedShares();
for (Share s: shares.values()) {
purgeShare(s, job);
}
job.removeAllShares();
}
/**
* Called from scheduling cycle - a specific share has run out of work for the give job (but the
* job is not done yet).
*/
private synchronized void processCompletion(IRmJob job, Share share)
{
String methodName = "processCompletion";
logger.debug(methodName, job.getId(), "Job vacates share ", share.toString());
//share.removeJob();
job.removeShare(share);
purgeShare(share, job);
}
/**
* Log following / reconstruction, needed to init before recovery.
*/
public void resetNodepools()
{
for ( NodePool np : nodepools ) {
np.reset(np.getMaxOrder());
}
}
/**
* Determine if the given share is in a nodepool that this job is allowed to be scheduled over.
* You can get a mismatch if the classes or nodepools are reconfigured and RM is restarted
* with jobs still in the system.
*
* UIMA-4142
*
* @param s The share to validate.
* @param j The job to validate against.
* @return true if s and j are compatible, false otherwise.
*/
boolean compatibleNodepool(Share s, IRmJob j)
{
// cut to the chase and ask the NP directly if this dude is allowed
NodePool np = s.getNodepool();
ResourceClass rc = j.getResourceClass();
Policy p = rc.getPolicy();
return np.compatibleNodepool(p, rc);
}
/**
* Make this public for log following.
*/
public synchronized void processRecovery(IRmJob j)
{
String methodName = "processRecovery";
ResourceClass rc = resourceClassesByName.get(j.getClassName());
j.setResourceClass(rc);
int share_order = calcShareOrder(j);
j.setShareOrder(share_order);
HashMap<Share, Share> shares = j.getRecoveredShares();
List<Share> sharesToShrink = new ArrayList<Share>(); // UIMA-4142
StringBuffer sharenames = new StringBuffer();
for ( Share s : shares.values() ) {
sharenames.append(s.toString());
sharenames.append(" ");
switch ( rc.getPolicy() ) {
case FAIR_SHARE:
s.setShareOrder(share_order);
if ( !compatibleNodepool(s, j) ) { // UIMA-4142
sharesToShrink.add(s);
break;
}
break;
case FIXED_SHARE:
logger.info(methodName, j.getId(), "Set fixed bit for FIXED job");
s.setShareOrder(share_order);
s.setFixed();
if ( !compatibleNodepool(s, j) ) { // UIMA-4142
if ( j.isService() ) {
sharesToShrink.add(s); // nodepool reconfig snafu, SM will reallocate the process
} else {
logger.warn(methodName, j.getId(), "Share is in incompatible nodepool but cannot be evicted:", s);
}
}
break;
case RESERVE:
logger.info(methodName, j.getId(), "Set fixed bit for RESERVE job");
s.setFixed();
if ( j.isService() && !compatibleNodepool(s, j) ) { // UIMA-4142
sharesToShrink.add(s); // nodepool reconfig snafu, SM will reallocate the process
}
break;
}
// if ( rc.getPolicy() != Policy.RESERVE ) { // if it's RESERVE, the share order is already set from
// // the machine when the job arrives.
// s.setShareOrder(share_order);
// }
Machine m = s.getMachine();
NodePool np = m.getNodepool();
np.connectShare(s, m, j, s.getShareOrder());
busyShares.put(s.getId(), s);
}
String username = j.getUserName();
User user = users.get(username);
if ( user == null ) {
user = new User(username);
users.put(username, user);
logger.info(methodName, j.getId(), "&&&&&&&&&&&&&&&& new user", user.toString(), "-------------------");
}
j.setUser(user);
user.addJob(j);
j.promoteShares(); // NOT expanded, just recovered, promote them right away
j.clearRecoveredShares();
String clid = j.getClassName();
ResourceClass prclass = resourceClassesByName.get(clid);
allJobs.put(j.getId(), j);
prclass.addJob(j);
j.setResourceClass(prclass);
logger.info(methodName, j.getId(), "Recovered job:", j.toString());
logger.info(methodName, j.getId(), "Recovered shares:", sharenames.toString());
try {
persistence.addJob(j);
} catch (Exception e) {
logger.warn(methodName, j.getId(), "Cannot persist recovered job in database:", j);
}
// After a reconfig/restart the share may be in the wrong place, in which case it
// needs to be removed. We have to wait until it is fully hooked into the structures
// before scheduling for removal because it could take a while to go away and
// we have to be careful not to overcommit.
// UIMA-4142
for ( Share s : sharesToShrink ) {
logger.info(methodName, j.getId(), "Recovery - Removing share from wrong nodepool after reconfiguration:", s);
j.shrinkByOne(s);
}
}
/**
* The share is gone, purge from our structures.
*/
private void purgeShare(Share s, IRmJob j)
{
busyShares.remove(s.getId()); // so long, and thanks for all the fish
Machine m = s.getMachine();
m.removeShare(s);
}
public synchronized static DuccId newId()
{
return idFactory.next();
}
public synchronized static DuccId newId(long id)
{
return idFactory.next(id);
}
public void queryMachines()
{
for ( NodePool np : nodepools ) {
np.queryMachines();
}
}
class MachineByOrderSorter
implements Comparator<Machine>
{
public int compare(Machine m1, Machine m2)
{
if ( m1.equals(m2) ) return 0;
if (m1.getShareOrder() == m2.getShareOrder()) {
return (m1.getId().compareTo(m2.getId()));
}
return (int) (m1.getShareOrder() - m2.getShareOrder());
}
}
}