blob: f8d619b4d2e1a7136d57e9ec346a57c0930f1617 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.rm.scheduler;
import java.util.HashMap;
import java.util.Map;
import org.apache.uima.ducc.common.Node;
import org.apache.uima.ducc.common.NodeIdentity;
import org.apache.uima.ducc.common.admin.event.RmQueriedMachine;
import org.apache.uima.ducc.common.admin.event.RmQueriedShare;
import org.apache.uima.ducc.common.persistence.rm.IRmPersistence;
import org.apache.uima.ducc.common.persistence.rm.IRmPersistence.RmNodes;
import org.apache.uima.ducc.common.persistence.rm.RmPersistenceFactory;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.id.DuccId;
public class Machine
implements SchedConstants
{
private static DuccLogger logger = DuccLogger.getLogger(Machine.class, COMPONENT_NAME);
private String id;
private long memory; // in Kb
private int share_order = 1;
private int heartbeats = 0;
private NodePool nodepool;
//
// These are subtly different.
// - virtual_share_order is reset to share_order at the start of every scheduling cycle. It
// represents the *potential* shares in this machine. As a rule, once we give out shares on
// this machine we'll try to not move them around but eviction happens, and this helps us
// keep track of what we *could* give away on this machine. It represents the logical capacity
// of the machine, that is, true capacity, less shares given to orchestrator, less shares that
// we might be giving away this scheduling cycle.
//
// - shares_left tracks exactly the number of shares that are physically available to give away
// without preemption. This is updated when a share is assigned, or when it is returned. It
// represents the true capacity of the machine at this moment, less the shares that have been
// given to the orchestrator.
//
// Throughout much of the scheudling cycle these guys will tend to track each other, and at the end
// of a cycle they should probably bethe same, but they may diverge if shares are given out that we
// might want to preempt.
//
private int virtual_share_order = 0;
private int shares_left = 0;
// UIMA-4142
// count of shares unavailable because of blacklisting
private int blacklisted_shares = 0;
private Map<DuccId, Share> blacklistedWork = new HashMap<DuccId, Share>(); // id of process or reservation, share order
Node node;
private HashMap<Share, Share> activeShares = new HashMap<Share, Share>();
private IRmPersistence persistence = null;
public Machine(Node node)
{
this.node = node;
this.memory = node.getNodeMetrics().getNodeMemory().getMemTotal();
this.id = node.getNodeIdentity().getName();
this.persistence = RmPersistenceFactory.getInstance(this.getClass().getName(), "RM");
}
// public Machine(String id, long memory)
// {
// this.id = id;
// this.memory = memory;
// }
/**
* Return the hashmap key for this thing.
*/
public Node key()
{
return node;
}
/**
* Return the quantum for this machine - the actual amount allocated in a single share on this machine.
*/
public int getQuantum()
{
return (int) memory / share_order;
}
// UIMA-4712
// See if placing the subject job on this machine violates vertical stacking constraint
public synchronized boolean hasVerticalConflict(IRmJob j)
{
for ( Share s : activeShares.values() ) {
if ( s.getJob().getServiceId() == j.getServiceId() ) return true; // match service id, it violates
}
for ( Share s : blacklistedWork.values() ) { // just in case
if ( s.getJob().getServiceId() == j.getServiceId() ) return true;
}
return false; // nothing else violates
}
// UIMA-4142
// Black list some number of shares for a specific job and proc. This reduces the number of
// schedulable shares until they are whitelisted.
//public synchronized void blacklist(DuccId jobid, DuccId procid, int nshares)
public synchronized void blacklist(DuccId jobid, DuccId procid, long jobmem)
{
String methodName = "blacklist";
int q = getQuantum();
int nshares = (int) jobmem / q;
if ( jobmem % q > 0 ) nshares++;
if ( ! blacklistedWork.containsKey(procid) ) { // already condemned?
if ( nshares == -1 ) nshares = share_order; // whole machine - reservations
Share s = new Share(procid, this, jobid, nshares);
blacklistedWork.put(procid, s);
shares_left -= nshares;
blacklisted_shares += nshares;
if ( shares_left < 0 ) {
try {
throw new IllegalStateException("shares_left went negative");
} catch ( Exception e ) {
logger.error(methodName, null , e, "shares went negative on", this.id, "share", procid.toString(), "nshares", nshares, "shares_left", shares_left, "share_order", share_order);
}
}
}
logger.info(methodName, null, this.id, procid.toString(), procid.getUUID(), "procs", blacklistedWork.size(), "shares left", shares_left, "/", share_order);
}
// UIMA-4142
// Whitelist shares if they were previously blacklisted, making them available for scheduling.
public synchronized void whitelist(DuccId procid)
{
String methodName = "whitelist";
if ( blacklistedWork.containsKey(procid) ) {
Share s = blacklistedWork.remove(procid);
int nshares = s.getShareOrder();
shares_left += nshares;
blacklisted_shares -= nshares;
if ( shares_left > share_order ) {
try {
throw new IllegalStateException("shares_left exceeds share_order");
} catch ( Exception e ) {
logger.error(methodName, null , e, "shares left exceeds share_order on", this.id, "share", procid.toString(), "nshares", nshares, "shares_left", shares_left, "share_order", share_order);
}
}
}
logger.info(methodName, null, this.id, procid.toString(), procid.getUUID(), "procs", blacklistedWork.size(), "shares left", shares_left, "/", share_order);
}
// UIMA-4142
public synchronized boolean isBlacklisted()
{
return blacklistedWork.size() > 0;
}
public synchronized void heartbeatArrives()
{
String methodName = "heartbeatArrives";
long now = System.currentTimeMillis();
if ( heartbeats == 0 ) return;
heartbeats = 0;
try {
logger.info(methodName, null, id, "Reset heartbeat to 0 from", heartbeats);
persistence.setNodeProperty(id, RmNodes.Heartbeats, 0);
logger.info(methodName, null, id, "Time to reset heartbeat", System.currentTimeMillis() - now);
} catch (Exception e) {
logger.warn(methodName, null, id, "Cannot update heartbeat count in database:", e);
}
}
public synchronized void heartbeatMissed(int c)
{
String methodName = "heartbeatMissed";
long now = System.currentTimeMillis();
if ( c < 2 ) return; // we allow a couple because timing and races can create false negatives
heartbeats = c;
try {
logger.info(methodName, null, id, "Missed heartbeat count", c);
persistence.setNodeProperty(id, RmNodes.Heartbeats, c);
logger.info(methodName, null, id, "Time to record misssed heartbeat", System.currentTimeMillis() - now);
} catch (Exception e) {
logger.warn(methodName, null, id, "Cannot update heartbeat count in database:", e);
}
}
public NodeIdentity getNodeIdentity()
{
return node.getNodeIdentity();
}
public void setNodepool(NodePool np)
{
this.nodepool = np;
}
public NodePool getNodepool()
{
return nodepool;
}
public boolean isFree()
{
//
// A reservation temporarily might pull this guy out of the pool by setting the virtual share order to 0 but not
// yet assigning to a job so we have to check both the shares given out, and whether the virtual share order is
// still pristine.
//
// We use this trick so we can use the "normal" allocation mechanisms for bookkeeping without special-casing reservations.
//
// UIMA-4920, called only if isSchedulable is true
return ( (activeShares.size()) == 0 && (virtual_share_order == share_order) );
}
/**
* Can preemption free this machine?
* UIMA-4920, called only if isSchedulable is true
*/
public boolean isFreeable()
{
for ( Share s : activeShares.values() ) {
if ( s.isFixed() ) {
return false;
}
}
return true;
}
public int countNpShares()
{
int ret = 0;
for ( Share s : activeShares.values() ) {
if ( s.isFixed() ) {
ret += s.getShareOrder();
}
}
return ret;
}
public int countProcesses()
{
return activeShares.size();
}
HashMap<Share, Share> getActiveShares()
{
return activeShares;
}
public Node getNode()
{
return node;
}
public String getId() {
return id;
}
public String getIp() {
return node.getNodeIdentity().getIp();
}
public void setId(String id) {
this.id = id;
}
public long getMemory() {
return memory;
}
public void setMemory(long memory) {
this.memory = memory;
}
public int getNodepoolDepth() // UIMA-4275
{
return nodepool.getDepth();
}
public int getShareOrder()
{
return share_order;
}
public int getVirtualShareOrder()
{
return virtual_share_order;
}
public void setShareOrder(int o)
{
this.share_order = o;
this.shares_left = share_order;
resetVirtualShareOrder(); // UIMA-4142 use common code to calculate this
}
public void setVirtualShareOrder(int o)
{
this.virtual_share_order = o;
}
public void resetVirtualShareOrder()
{
// UIMA-4142, include blacklist considerations
this.virtual_share_order = share_order - blacklisted_shares;
}
public void reassignShare(Share s, IRmJob newjob)
{
removeShare(s);
s.reassignJob(newjob);
assignShare(s);
}
public void assignShare(Share s)
{
String methodName = "assignShare";
long now = System.currentTimeMillis();
activeShares.put(s, s);
shares_left -= s.getShareOrder();
if (shares_left < 0) {
logger.warn(methodName, s.getJob().getId(), "Node", this.id, "must have shrunk as it has",
-shares_left, "more slots in use than than its current size of", share_order);
}
try {
// Not transactional. If this turns into a problem we'll have to find a way
persistence.setNodeProperties(id, RmNodes.Assignments, activeShares.size(), RmNodes.NPAssignments, countNpShares(), RmNodes.SharesLeft, shares_left);
persistence.addAssignment(id, s.getJob().getId(), s, getQuantum(), s.getJob().getShortType()); // update jobs on machine and specific shares
logger.debug(methodName, s.getJob().getId(), "Time to assign share", s.getId(), "in db", System.currentTimeMillis() - now);
} catch (Exception e) {
logger.warn(methodName, s.getJob().getId(), "Cannot save state for share", s.getId(), "shares_left", shares_left, e);
}
}
public void removeShare(Share s)
{
String methodName = "removeShare";
long now = System.currentTimeMillis();
activeShares.remove(s);
nodepool.removeShare(s);
shares_left += s.getShareOrder();
try {
// Not transactional. If this turns into a problem we'll have to find a way
persistence.setNodeProperties(id, RmNodes.Assignments, activeShares.size(), RmNodes.NPAssignments, countNpShares(), RmNodes.SharesLeft, shares_left);
persistence.removeAssignment(id, s.getJob().getId(), s); // update jobs on machine and specific shares
logger.debug(methodName, s.getJob().getId(), "Time to remove share", s.getId(), "in db", System.currentTimeMillis() - now);
} catch (Exception e) {
logger.warn(methodName, s.getJob().getId(), "Cannot save state for share", s.getId(), "shares_left", shares_left);
}
}
/**
* How many shares of the given order can I support without preemption?
*/
public int countFreeShares(int order)
{
int in_use = 0;
for ( Share s : activeShares.values() ) {
in_use += s.getShareOrder();
}
// UIMA-4142, include blacklist considerations
return (share_order - in_use - blacklisted_shares) / order ;
}
public int countFreeShares()
{
return shares_left;
}
/**
* How much is left unused, plus the shares that are marked for eviction.
*/
public int countFreedUpShares()
{
// String methodName = "countFreedUpShares";
int cnt = shares_left;
// logger.debug(methodName, null, "shares_left", shares_left);
for ( Share s : activeShares.values() ) {
if ( s.isEvicted() ) {
// logger.debug(methodName, null, "Adding evicted shares: order", s.getShareOrder(), s);
cnt += s.getShareOrder();
}
}
return cnt;
}
RmQueriedMachine queryMachine()
{
// UIMA-4142, include blacklist considerations
RmQueriedMachine ret = new RmQueriedMachine(id, nodepool.getId(), memory, share_order, isBlacklisted());
for ( Share s : activeShares.values() ) {
RmQueriedShare rqs = new RmQueriedShare(s.getJob().getId().getFriendly(),
s.getId().getFriendly(),
s.getShareOrder(),
s.getInitializationTime(),
s.getInvestment());
rqs.setFixed(s.isFixed());
rqs.setPurged(s.isPurged());
rqs.setEvicted(s.isEvicted());
rqs.setInitialized(s.isInitialized());
ret.addShare(rqs);
}
for (Share s : blacklistedWork.values() ) {
RmQueriedShare rqs = new RmQueriedShare(s.getBlJobId().getFriendly(),
s.getId().getFriendly(),
s.getShareOrder(),
s.getInitializationTime(),
s.getInvestment());
rqs.setBlacklisted();
ret.addShare(rqs);
}
return ret;
}
// RmQueriedMachine queryOfflineMachine() // UIMA-4234
// {
// RmQueriedMachine ret = queryMachine();
// ret.setOffline();
// return ret;
// }
//
// RmQueriedMachine queryUnresponsiveMachine() // UIMA-4234
// {
// RmQueriedMachine ret = queryMachine();
// ret.setUnresponsive();
// return ret;
// }
/**
* A machine's investment is the sum of it's share's investments.
*/
public int getInvestment()
{
int answer = 0;
for ( Share s : activeShares.values() ) {
answer += s.getInvestment();
}
return answer;
}
public static String getDashes()
{
return String.format("%20s %12s %5s %13s %13s %11s %s", "--------------------", "------------", "-----", "-------------", "-------------", "-----------", "------ ...");
}
public static String getHeader()
{
// UIMA-4142, include blacklist considerations
return String.format("%20s %12s %5s %13s %13s %11s %s", "Name", "Blacklisted", "Order", "Active Shares", "Unused Shares", "Memory (MB)", "Jobs");
}
/**
* Does this machine match the thing specified in the input string 's'?
*/
public boolean matches(String s)
{
String name = getNodeIdentity().getName();
if ( s.equals(name) ) return true; // try for a perfect match
// see if s is qualified with the domain name
int ndx1 = s.indexOf(".");
int ndx2 = name.indexOf(".");
if ( (ndx1 > 0) && (ndx2 > 0) ) return false; // both qualified, so no match
if ( ndx1 == -1 ) {
return s.equals(name.substring(0, ndx2)); // s not qualified, so strip name
} else {
return s.substring(0, ndx1).equals(name); // name not qualified, so strip s
}
}
@Override
public int hashCode()
{
//System.out.println("hashcode for machine" + getId() + " is " + node.hashCode());
//return id.hashCode();
return getNodeIdentity().hashCode();
}
public boolean equals(Object o)
{
if ( o == null ) return false;
if ( this == o ) return true;
if ( this.getClass() != o.getClass() ) return false;
Machine m = (Machine) o;
//return (id.equals(m.getId()));
return this.getNodeIdentity().equals(m.getNodeIdentity());
}
public String toString()
{
int qshares = 0;
int unused = share_order;
String jobs = "<none>";
if ( activeShares.size() > 0 ) {
StringBuffer buf = new StringBuffer();
for ( Share s : activeShares.values() ) {
qshares += s.getShareOrder();
buf.append(s.getJob().getId());
buf.append(" ");
}
jobs = buf.toString();
unused = share_order - qshares;
}
// UIMA-4142, include blacklist considerations
return String.format("%20s %12s %5d %13d %13d %11d %s", id, isBlacklisted(), share_order, qshares, unused, (memory/1024), jobs);
}
}