| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.uima.ducc.rm; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.LinkedHashMap; |
| import java.util.Map; |
| |
| import org.apache.uima.ducc.common.Node; |
| import org.apache.uima.ducc.common.NodeIdentity; |
| import org.apache.uima.ducc.common.utils.DuccCollectionUtils; |
| import org.apache.uima.ducc.common.utils.DuccCollectionUtils.DuccMapDifference; |
| import org.apache.uima.ducc.common.utils.DuccCollectionUtils.DuccMapValueDifference; |
| import org.apache.uima.ducc.common.utils.DuccLogger; |
| import org.apache.uima.ducc.common.utils.SystemPropertyResolver; |
| import org.apache.uima.ducc.common.utils.id.DuccId; |
| import org.apache.uima.ducc.rm.scheduler.IJobManager; |
| import org.apache.uima.ducc.rm.scheduler.IRmJob; |
| import org.apache.uima.ducc.rm.scheduler.ISchedulerMain; |
| import org.apache.uima.ducc.rm.scheduler.JobManagerUpdate; |
| import org.apache.uima.ducc.rm.scheduler.Machine; |
| import org.apache.uima.ducc.rm.scheduler.ResourceClass; |
| import org.apache.uima.ducc.rm.scheduler.RmJob; |
| import org.apache.uima.ducc.rm.scheduler.SchedConstants; |
| import org.apache.uima.ducc.rm.scheduler.SchedulingException; |
| import org.apache.uima.ducc.rm.scheduler.Share; |
| import org.apache.uima.ducc.transport.event.RmStateDuccEvent; |
| import org.apache.uima.ducc.transport.event.common.DuccWorkMap; |
| import org.apache.uima.ducc.transport.event.common.IDuccPerWorkItemStatistics; |
| import org.apache.uima.ducc.transport.event.common.IDuccProcess; |
| import org.apache.uima.ducc.transport.event.common.IDuccProcessMap; |
| import org.apache.uima.ducc.transport.event.common.IDuccReservation; |
| import org.apache.uima.ducc.transport.event.common.IDuccReservationMap; |
| import org.apache.uima.ducc.transport.event.common.IDuccSchedulingInfo; |
| import org.apache.uima.ducc.transport.event.common.IDuccStandardInfo; |
| import org.apache.uima.ducc.transport.event.common.IDuccWork; |
| import org.apache.uima.ducc.transport.event.common.IDuccWorkExecutable; |
| import org.apache.uima.ducc.transport.event.common.IDuccWorkJob; |
| import org.apache.uima.ducc.transport.event.common.IDuccWorkReservation; |
| import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState; |
| import org.apache.uima.ducc.transport.event.common.ITimeWindow; |
| import org.apache.uima.ducc.transport.event.rm.IResource; |
| import org.apache.uima.ducc.transport.event.rm.IRmJobState; |
| import org.apache.uima.ducc.transport.event.rm.Resource; |
| import org.apache.uima.ducc.transport.event.rm.RmJobState; |
| |
| |
| /** |
| * Convert the scheduler's structures into the events that get returned to the world. |
| */ |
| |
| public class JobManagerConverter |
| implements IJobManager, |
| SchedConstants |
| { |
| DuccLogger logger = DuccLogger.getLogger(JobManagerConverter.class, COMPONENT_NAME); |
| ISchedulerMain scheduler; |
| NodeStability nodeStability = null; |
| |
| DuccWorkMap localMap = null; |
| JobManagerUpdate lastJobManagerUpdate = new JobManagerUpdate(); |
| |
| Map<IRmJob, IRmJob> refusedJobs = new HashMap<IRmJob, IRmJob>(); |
| |
| boolean recovery = false; |
| |
| public JobManagerConverter(ISchedulerMain scheduler, NodeStability ns) |
| { |
| this.scheduler = scheduler; |
| this.localMap = new DuccWorkMap(); |
| this.nodeStability = ns; |
| |
| DuccLogger.setUnthreaded(); |
| |
| recovery = SystemPropertyResolver.getBooleanProperty("ducc.rm.fast.recovery", true); |
| } |
| |
| int toInt(String s, int deflt) |
| { |
| try { |
| int val = Integer.parseInt(s); |
| return ( val == 0 ) ? deflt : val; |
| } catch ( Throwable t ) { |
| return deflt; |
| } |
| } |
| |
| void refuse(IRmJob j, String reason) |
| { |
| j.refuse(reason); |
| synchronized(refusedJobs) { |
| refusedJobs.put(j, j); |
| } |
| } |
| |
| String getElapsedTime(ITimeWindow w) |
| { |
| if ( w == null ) return "0"; |
| return w.getDiff(); |
| } |
| |
| // void formatSchedulingInfo(DuccId id, IDuccSchedulingInfo si, int remaining_work) |
| // { |
| // String methodName = "formatSchedulingInfo"; |
| // SynchronizedDescriptiveStatistics stats = si.getPerWorkItemProcessingTime(); |
| // double arith_mean = stats.getMean(); |
| // double geom_mean = stats.getGeometricMean(); |
| // double[] vals = stats.getSortedValues(); |
| // |
| // logger.info(methodName, null, id, "STATS: arithmetic mean:", arith_mean); |
| // logger.info(methodName, null, id, "STATS: geometric mean:", geom_mean); |
| // logger.info(methodName, null, id, "STATS: remaining work:", remaining_work); |
| // logger.info(methodName, null, id, "STATS: nvals :", vals.length); |
| // |
| // if ( vals.length > 0 ) { |
| // StringBuffer buf = new StringBuffer(); |
| // int cnt = 0; |
| // |
| // for ( int i = 0; i < vals.length; i++ ) { |
| // buf.append(Double.toString(vals[i])); |
| // if ( (++cnt) % 10 == 0 ) { |
| // buf.append("\n"); |
| // } else { |
| // buf.append(" "); |
| // } |
| // } |
| // logger.info(methodName, null, id, "STATS: vals:\n", buf.toString()); |
| // } |
| // |
| // } |
| |
| /** |
| * Update scheduler internal job structure with updated data from arriving job state. |
| */ |
| void jobUpdate(Object state, IDuccWork job) |
| { |
| String methodName = "jobUpate"; |
| IDuccSchedulingInfo si = job.getSchedulingInfo(); |
| |
| DuccId jobid = job.getDuccId(); |
| IRmJob j = scheduler.getJob(jobid); |
| if ( j == null ) { |
| // this can happen right when the job is submitted, if we haven't yet called |
| // the scheduler to deal with it. just ignore, but take note. |
| // logger.info(methodName, jobid, "**** Cannot find job to update! ****"); |
| return; |
| } else { |
| int total_work = toInt(si.getWorkItemsTotal(), scheduler.getDefaultNTasks()); |
| int completed_work = toInt(si.getWorkItemsCompleted(), 0) + toInt(si.getWorkItemsError(), 0)+ toInt(si.getWorkItemsLost(), 0); |
| |
| int max_shares = toInt(si.getSharesMax(), Integer.MAX_VALUE); |
| int existing_max_shares = j.getMaxShares(); |
| |
| int remaining_work = Math.max(total_work - completed_work, 0); |
| |
| double arith_mean = Double.NaN; |
| IDuccPerWorkItemStatistics stats = si.getPerWorkItemStatistics(); |
| if(stats != null) { |
| arith_mean = stats.getMean(); |
| } |
| |
| logger.info(methodName, job.getDuccId(), |
| String.format("tot: %d %s -> %s compl: %s err: %s rem: %d mean: %f", |
| total_work, |
| state, |
| job.getStateObject(), |
| si.getWorkItemsCompleted(), // note this comes in as string (!) from OR |
| si.getWorkItemsError(), // also string |
| remaining_work, |
| arith_mean |
| )); |
| |
| if ( max_shares != existing_max_shares ) { |
| j.setMaxShares(max_shares); |
| logger.info(methodName, job.getDuccId(), "Max shares adjusted from", existing_max_shares, "to", max_shares, "(incoming)", |
| si.getSharesMax()); |
| } |
| |
| j.setNQuestions(total_work, remaining_work, arith_mean); |
| |
| // formatSchedulingInfo(job.getDuccId(), si, remaining_work); |
| |
| if ( job instanceof IDuccWorkJob ) { |
| if ( j.setInitWait( ((IDuccWorkJob) job).isRunnable()) ) { |
| logger.info(methodName, jobid, "Set Initialized."); |
| scheduler.signalInitialized(j); |
| } |
| } else { |
| j.setInitWait(true); // pop is always ready to go |
| } |
| } |
| } |
| |
| /** |
| * NOTE: If this returns false, it maust also refuse(). |
| */ |
| private boolean receiveExecutable(IRmJob j, IDuccWork job) |
| { |
| String methodName = "receiveExecutable"; |
| IDuccWorkExecutable de = (IDuccWorkExecutable) job; |
| IDuccProcessMap pm = de.getProcessMap(); |
| |
| if ( (pm.size() > 0) && !job.isCompleted() ) { // need to recover, apparently RM crashed. hmph. |
| for ( IDuccProcess proc : pm.values() ) { // build up Shares from the incoming state |
| |
| ProcessState state = proc.getProcessState(); |
| String pid = proc.getPID(); |
| NodeIdentity ni = proc.getNodeIdentity(); |
| |
| if ( proc.isComplete() ) { |
| logger.debug(methodName, j.getId(), "Skipping process", pid, "on", ni.getName(), "beacause state is", state); |
| continue; |
| } |
| |
| Machine m = scheduler.getMachine(ni); |
| if ( m == null ) { // not known, huh? maybe next epoch it will have checked in |
| refuse(j, "Cannot restore job because node " + ni.getName() + " is unknown."); |
| return false; // so we don't add it to global tables |
| } else { |
| DuccId id = proc.getDuccId(); |
| Share s = new Share(id, m, j, m.getShareOrder()); // guess share order; scheduler will reset when it recovers job |
| long mem = proc.getResidentMemory(); |
| |
| logger.info(methodName, j.getId(), "Assigning share in state", state, "pid", pid, "for recovery", s.toString()); |
| j.recoverShare(s); |
| s.update(j.getId(), mem, state, proc.getTimeWindowInit(), proc.getTimeWindowRun(), pid); |
| } |
| } |
| logger.info(methodName, j.getId(), "Scheduling for recovery."); |
| scheduler.signalRecovery(j); |
| } else { |
| logger.info(methodName, j.getId(), "Scheduling as new."); |
| scheduler.signalNewWork(j); |
| } |
| return true; |
| } |
| |
| /** |
| * NOTE: If this returns false, it maust also refuse(). |
| */ |
| private boolean receiveReservation(IRmJob j, IDuccWork job) |
| { |
| String methodName = "receiveReservation"; |
| j.setReservation(); |
| |
| IDuccWorkReservation dr = (IDuccWorkReservation) job; |
| IDuccReservationMap rm = dr.getReservationMap(); |
| if ( (rm.size() > 0) && !job.isCompleted() ) { // need to recover, apparently RM crashed. hmph. |
| for ( IDuccReservation res : rm.values() ) { // build up Shares from the incoming state |
| NodeIdentity ni = res.getNodeIdentity(); |
| Machine m = scheduler.getMachine(ni); |
| if ( m == null ) { // not known, huh? maybe next epoch it will have checked in |
| refuse(j, "Cannot restore reservation because node " + ni.getName() + " is unknown."); |
| return false; // so we don't add it to global tables |
| } else { |
| DuccId id = res.getDuccId(); |
| Share s = new Share(id, m, j, m.getShareOrder()); |
| s.setFixed(); |
| j.recoverShare(s); |
| logger.debug(methodName, j.getId(), "Assigning share for recovery", s.toString()); |
| } |
| } |
| logger.info(methodName, j.getId(), "Scheduling for recovery."); |
| scheduler.signalRecovery(j); |
| } else { |
| logger.info(methodName, j.getId(), "Scheduling as new."); |
| scheduler.signalNewWork(j); |
| } |
| return true; |
| } |
| |
| /** |
| * Convert a JobManager Job into a ResourceManager RmJob. We assume this job is NOT in |
| * our lists. |
| * |
| * NOTE IMPORTANT NOTE |
| * |
| * Until Lou's job contains all required scheduling fields I do a conversion that enhances |
| * what I receive with defaults so the scheduler can actually schedule the job. |
| * |
| * NOTE IMPORTANT NOTE |
| * |
| * @param job |
| */ |
| boolean jobArrives(IDuccWork job) |
| { |
| String methodName = "jobArrives"; |
| logger.trace(methodName, job.getDuccId(), "Job arives"); |
| logger.trace(methodName, job.getDuccId(), "Job is of type", job.getDuccType()); |
| |
| // Properties props = new Properties(); |
| |
| // Set<String> keys = props.stringPropertyNames(); |
| // for ( String k : keys ) { |
| // logger.debug("methodName", job.getDuccId(), "Next property [", k, ", ", props.getProperty(k), "]"); |
| // } |
| |
| // Properties rmProps = new DuccProperties(); |
| // for ( int i = 0; i < requiredProperties.length; i++ ) { |
| // String v = props.getProperty(requiredProperties[i]); |
| // if ( v == null ) { |
| // v = defaultValues[i]; |
| // } |
| // rmProps.setProperty(rmProperties[i], v); |
| // } |
| // IRmJob j = new RmJob(job.getDuccId(), rmProps); |
| |
| // Convert Lou's structure into mine. |
| IRmJob j = new RmJob(job.getDuccId()); |
| |
| IDuccSchedulingInfo si = job.getSchedulingInfo(); |
| IDuccStandardInfo sti = job.getStandardInfo(); |
| |
| String name = sti.getDescription(); |
| if ( name == null ) { |
| name = "A Job With No Name."; |
| } |
| String user_name = sti.getUser(); |
| j.setUserName(user_name); |
| j.setJobName(name); |
| |
| int min_shares = toInt(si.getSharesMin(), 0); |
| int threads = toInt(si.getThreadsPerShare(), scheduler.getDefaultNThreads()); |
| int user_priority = toInt(si.getSchedulingPriority(), 100); |
| |
| int total_work = toInt(si.getWorkItemsTotal(), scheduler.getDefaultNTasks()); |
| int completed_work = toInt(si.getWorkItemsCompleted(), 0); |
| int remaining_work = Math.max(total_work - completed_work, 1); // never let this go 0 or negative - both cases |
| // are (probably user) errors. |
| |
| logger.info(methodName, job.getDuccId(), "total_work", total_work, "completed_work", completed_work,"remaining_work", remaining_work); |
| |
| int memory = toInt(si.getShareMemorySize(), scheduler.getDefaultMemory()); |
| String className = si.getSchedulingClass(); |
| if ( className == null ) { |
| switch ( job.getDuccType() ) { |
| case Job: |
| className = scheduler.getDefaultFairShareName(); |
| break; |
| case Service: |
| case Pop: |
| case Reservation: |
| className = scheduler.getDefaultReserveName(); |
| break; |
| } |
| if ( className == null ) { |
| j.refuse("No scheduling class defined and no default class configured."); |
| return false; |
| } |
| } |
| |
| j.setMinShares(min_shares); |
| j.setThreads(threads); |
| j.setUserPriority(user_priority); |
| j.setNQuestions(total_work, remaining_work, 0.0); |
| j.setClassName(className); |
| |
| switch (si.getShareMemoryUnits()) { |
| case GB: |
| break; |
| default: |
| logger.warn(methodName, job.getDuccId(), "Memory units other than GB are not currently supported. Job returned."); |
| break; |
| } |
| j.setMemory(memory); |
| j.init(); |
| |
| j.setTimestamp(Long.parseLong(sti.getDateOfSubmission())); |
| // logger.info(methodName, j.getId(), "SUBMISSION DATE:", subd, (new Date(subd)).toString()); |
| |
| if ( job instanceof IDuccWorkJob ) { |
| j.setInitWait( ((IDuccWorkJob) job).isRunnable()); |
| } else { |
| j.setInitWait(true); // pop is always ready to go |
| } |
| |
| j.setDuccType(job.getDuccType()); // ugly and artificial but ... not going to rant here |
| // it's needed so messages can be made legible |
| |
| // |
| // Now: must either create a new job, or recover one that we didn't know about, on the assumption that we |
| // have just crashed and are recovering. |
| // |
| // Be SURE that if status is turned false for any reason, or if you exit early with false, that you |
| // refuse() the job. |
| // |
| boolean status = true; |
| |
| int max_processes = 0; |
| int max_machines = 0; |
| ResourceClass rescl = scheduler.getResourceClass(className); |
| j.setResourceClass(rescl); |
| |
| if ( rescl == null ) { |
| // ph darn, we can't continue past this point |
| refuse(j, "Cannot find priority class " + className + " for job"); |
| return false; |
| } |
| |
| // if ( logger.isDebug() ) { |
| // logger.debug(methodName, j.getId(),"sharesMax", si.getSharesMax()); |
| // logger.debug(methodName, j.getId(),"getInstancesCount", si.getInstancesCount()); |
| // logger.debug(methodName, j.getId(), "rescl.getMaxProcesses", rescl.getMaxProcesses()); |
| // logger.debug(methodName, j.getId(), "rescl.getMaxMachines", rescl.getMaxMachines()); |
| // } |
| |
| switch ( job.getDuccType() ) { |
| case Service: |
| case Pop: |
| case Job: |
| // instance and share count are a function of the class |
| switch ( rescl.getPolicy() ) { |
| case FAIR_SHARE: |
| max_processes = toInt(si.getSharesMax(), DEFAULT_PROCESSES); |
| max_processes = Math.min(rescl.getMaxProcesses(), max_processes); |
| j.setMaxShares(max_processes); |
| j.setNInstances(-1); |
| break; |
| |
| case FIXED_SHARE: |
| max_processes = toInt(si.getSharesMax(), DEFAULT_INSTANCES); |
| j.setMaxShares(max_processes); |
| j.setNInstances(max_processes); |
| break; |
| |
| case RESERVE: |
| max_machines = toInt(si.getSharesMax(), DEFAULT_INSTANCES); |
| j.setMaxShares(max_machines); |
| j.setNInstances(max_machines); |
| break; |
| } |
| |
| status = receiveExecutable(j, job); |
| logger.trace(methodName, j.getId(), "Serivce, Pop, or Job arrives, accepted:", status); |
| break; |
| case Reservation: |
| switch ( rescl.getPolicy() ) { |
| case FIXED_SHARE: |
| max_machines = toInt(si.getInstancesCount(), DEFAULT_INSTANCES); |
| break; |
| case RESERVE: |
| max_machines = toInt(si.getInstancesCount(), DEFAULT_INSTANCES); |
| break; |
| } |
| |
| j.setMaxShares(-1); |
| j.setNInstances(max_machines); |
| |
| status = receiveReservation(j, job); |
| logger.trace(methodName, j.getId(), "Reservation arrives, accepted:", status); |
| break; |
| default: |
| refuse(j, "Unknown job type: " + job.getDuccType()); |
| status = false; |
| break; |
| } |
| |
| // logger.debug(methodName, j.getId(), "Max_processes:", max_processes); |
| // logger.debug(methodName, j.getId(), "Max_machines:", max_machines); |
| |
| return status; |
| } |
| |
| /** |
| * Our records indicate that we know about this job but JM doesn't so we purge |
| * it from the scheduler |
| * @param job |
| */ |
| void jobRemoved(DuccId id) |
| { |
| String methodName = "jobRemoved"; |
| logger.trace(methodName, id, "Signalling removal"); |
| scheduler.signalCompletion(id); |
| localMap.removeDuccWork(id); |
| logger.trace(methodName, id, "Remove signalled"); |
| } |
| |
| public void reconcileProcesses(DuccId jobid, IDuccWork l, IDuccWork r) |
| { |
| String methodName = "reconcileProcess"; |
| IDuccProcessMap lpm = ((IDuccWorkJob )l).getProcessMap(); |
| IDuccProcessMap rpm = ((IDuccWorkJob)r).getProcessMap(); |
| |
| @SuppressWarnings("unchecked") |
| DuccMapDifference<DuccId, IDuccProcess> diffmap = DuccCollectionUtils.difference(lpm, rpm); |
| |
| // new stuff in in the left side of the map |
| Map<DuccId, IDuccProcess> lproc = diffmap.getLeft(); |
| |
| for ( IDuccProcess p : lproc.values() ) { |
| // look up share, update resident memory, process state, investment (eventually), maybe pid? |
| // simply update the share with the information. we pass in the jobid as a sanity check so |
| // we can crash or at least complain loudly on mismatch. |
| |
| Share s = scheduler.getShare(p.getDuccId()); |
| long mem = p.getResidentMemory(); |
| ProcessState state = p.getProcessState(); |
| String pid = p.getPID(); |
| |
| logger.info(methodName, jobid, "New process ", s.toString(), mem, state, pid); |
| if ( ! s.update(jobid, mem, state, p.getTimeWindowInit(), p.getTimeWindowRun(), pid) ) { |
| // TODO: probably change to just a warning and cancel the job - for now I want an attention-getter |
| throw new SchedulingException(jobid, "Process assignemnt arrives for share " + s.toString() + |
| " but jobid " + jobid + " does not match share " + s.getJob().getId()); |
| } |
| //scheduler.signalGrowth(jobid, s); |
| // sadly, the pid is almost always null here |
| //logger.info(methodName, jobid, |
| // "New process arrives for share", s.toString(), "PID", pid); |
| } |
| |
| // gone stuff in in the right side of the map |
| Map<DuccId, IDuccProcess> rproc = diffmap.getRight(); |
| for ( IDuccProcess p : rproc .values()) { |
| // these processes are done. look up the job and tell it process complete. |
| Share s = scheduler.getShare(p.getDuccId()); |
| IRmJob j = scheduler.getJob(jobid); |
| if ( j == null ) { |
| throw new SchedulingException(jobid, "Process completion arrives for share " + s.toString() + |
| " but job " + jobid + "cannot be found."); |
| } |
| scheduler.signalCompletion(j, s); |
| logger.info(methodName, jobid, |
| String.format("Process %5s", p.getPID()), |
| "Completion:", s.toString()); |
| } |
| |
| for( DuccMapValueDifference<IDuccProcess> pd: diffmap ) { |
| IDuccProcess pl = pd.getLeft(); |
| IDuccProcess pr = pd.getRight(); |
| |
| Share sl = scheduler.getShare(pl.getDuccId()); |
| Share sr = scheduler.getShare(pr.getDuccId()); |
| |
| String shareL = ( sl == null ) ? "<none>" : sl.toString(); |
| String shareR = ( sr == null ) ? "<none>" : sr.toString(); |
| |
| ITimeWindow initL = pl.getTimeWindowInit(); |
| ITimeWindow initR = pr.getTimeWindowInit(); |
| long init_timeL = (initL == null) ? 0 : initL.getElapsedMillis(); |
| long init_timeR = (initR == null) ? 0 : initR.getElapsedMillis(); |
| |
| /** extreme debugging only*/ |
| if ( logger.isTrace() ) { |
| logger.trace(methodName, jobid, |
| "\n\tReconciling. incoming.(pid, mem, state, share, initTime)", |
| pl.getPID(), |
| pl.getResidentMemory(), |
| pl.getProcessState(), |
| shareL, |
| init_timeL, |
| "\n\tReconciling. existing.(pid, mem, state, share, initTime)", |
| pr.getPID(), |
| pr.getResidentMemory(), |
| pr.getProcessState(), |
| shareR, |
| init_timeR |
| ); |
| } else { |
| if ( (pr.getPID() == null) && (pl.getPID() != null) ) { |
| logger.trace(methodName, jobid, |
| String.format("Process %5s", pl.getPID()), |
| "PID assignement for share", shareL); |
| } |
| if ( pl.getProcessState() != pr.getProcessState() ) { |
| logger.info(methodName, jobid, |
| String.format("Process %5s", pl.getPID()), sl.toString(), |
| "State:", pr.getProcessState(), "->", pl.getProcessState(), |
| getElapsedTime(pr.getTimeWindowInit()), getElapsedTime(pr.getTimeWindowRun())); |
| } |
| } |
| |
| long mem = pl.getResidentMemory(); |
| ProcessState state = pl.getProcessState(); |
| String pid = pl.getPID(); |
| Share s = scheduler.getShare(pl.getDuccId()); |
| if ( pl.isActive() ) { |
| |
| if ( s == null ) { |
| // this can happen if a node dies and the share is purged so it's ok. |
| logger.warn(methodName, jobid, "Update for share from process", pl.getPID(), pl.getDuccId(), "but cannot find share."); |
| continue; |
| } |
| |
| if ( s.isPurged() ) { |
| IRmJob j = scheduler.getJob(jobid); |
| scheduler.signalCompletion(j, s); |
| logger.info(methodName, jobid, "Process", pl.getPID(), "marked complete because it is purged. State:", state); |
| } |
| |
| if ( ! s.update(jobid, mem, state, pl.getTimeWindowInit(), pl.getTimeWindowRun(), pid) ) { |
| // TODO: probably change to just a warning and cancel the job - for now I want an attention-getter |
| throw new SchedulingException(jobid, "Process update arrives for share " + s.toString() + |
| " but jobid " + jobid + " does not match job in share " + s.getJob().getId()); |
| } |
| // logger.debug(methodName, jobid, "Process update to process ", pid, "mem", mem, "state", state, "is assigned for share", s.toString()); |
| |
| } else if ( pl.isComplete() ) { |
| if ( s != null ) { // in some final states the share is already gone, not an error (e.g. Stopped) |
| IRmJob j = scheduler.getJob(jobid); |
| scheduler.signalCompletion(j, s); |
| logger.info(methodName, jobid, "Process", pl.getPID(), " completed due to state", state); |
| } |
| } else { |
| logger.info(methodName, jobid, "Process", pl.getPID(), "ignoring update because of state", state); |
| } |
| |
| } |
| |
| } |
| |
| boolean first_or_state = true; |
| public void eventArrives(DuccWorkMap jobMap) |
| { |
| String methodName = "eventArrives"; |
| |
| if ( jobMap.size() == 0 ) { |
| logger.debug(methodName, null, "No state from Orchestrator"); |
| return; |
| } |
| |
| // The init file is read and configured ? |
| if ( ! scheduler.isInitialized() ) return; |
| |
| if ( first_or_state ) { |
| first_or_state = false; |
| if ( ! recoverFromOrchestrator(jobMap) ) { |
| logger.info(methodName, null, "There are no active jobs in map so can't build up state. Waiting for init stability."); |
| return; |
| } |
| |
| if ( recovery ) { |
| logger.info(methodName, null, "Fast recovery is enabled: Recovered state from Orchestrator, starting scheduler."); |
| scheduler.start(); |
| } |
| } |
| |
| // scheduler is readied either by fast-recovery, or by init stability |
| if ( !scheduler.ready() ) { |
| logger.info(methodName, null, "Orchestrator event is discarded: waiting for init stability."); |
| return; |
| } |
| |
| @SuppressWarnings("unchecked") |
| DuccMapDifference<DuccId, IDuccWork> diffmap = DuccCollectionUtils.difference(jobMap, localMap); |
| |
| for ( IDuccWork w : jobMap.values() ) { |
| //IDuccWork j = (IDuccWork) w; |
| logger.trace(methodName, w.getDuccId(), "Arrives in JmStateEvent state =", w.getStateObject()); |
| } |
| |
| // |
| // First handle new stuff |
| // |
| Map<DuccId, IDuccWork> jobs = diffmap.getLeft(); |
| for ( IDuccWork w : jobs.values() ) { |
| |
| if ( w.isSchedulable() ) { |
| logger.info(methodName, w.getDuccId(), "Incoming, state = ", w.getStateObject()); |
| try { |
| if ( jobArrives(w) ) { // if not ... something is fubar and we have to ignore it for now |
| localMap.addDuccWork(w); |
| } |
| } catch ( Exception e ) { |
| logger.error(methodName, w.getDuccId(), "Can't receive job because of exception", e); |
| } |
| } else { |
| logger.info(methodName, w.getDuccId(), "Received non-schedulable job, state = ", w.getStateObject()); |
| } |
| } |
| |
| jobs = diffmap.getRight(); |
| for ( IDuccWork w :jobs.values() ) { |
| logger.info(methodName, w.getDuccId(), "Gone"); |
| jobRemoved(w.getDuccId()); |
| } |
| |
| // |
| // Stuff on the left is incoming. Stuff on the right is already in my map. |
| // |
| for( DuccMapValueDifference<IDuccWork> jd: diffmap ) { |
| IDuccWork r = jd.getRight(); |
| IDuccWork l = jd.getLeft(); |
| |
| if ( ! l.isSchedulable() ) { |
| logger.info(methodName, l.getDuccId(), "Removing unschedulable:", r.getStateObject(), "->", l.getStateObject()); |
| jobRemoved(r.getDuccId()); |
| } else { |
| |
| localMap.addDuccWork(l); // still schedulable, and we already know about it, just sync the state |
| |
| switch ( l.getDuccType() ) { |
| case Job: |
| jobUpdate(r.getStateObject(), l); |
| reconcileProcesses(l.getDuccId(), l, r); |
| break; |
| case Service: |
| case Pop: |
| case Reservation: |
| if ( r.getStateObject() != l.getStateObject() ) { |
| logger.info(methodName, l.getDuccId(), "[SPR] State: ", r.getStateObject(), "->", l.getStateObject()); |
| } |
| // for the moment, these guys have nothing to reconcile. |
| break; |
| case Undefined: |
| throw new SchedulingException(l.getDuccId(), "Work arrives as type Undefined - should have been filtered out by now."); |
| } |
| } |
| |
| } |
| |
| logger.trace(methodName, null, "Done with JmStateDuccEvent with some jobs processed"); |
| |
| } |
| |
| /** |
| * This is an ugly kludge because we discovered OR isn't doing map diffs! So in the case |
| * of lost messagees, OR may not be able to discover that jobs actually have shares assigned. |
| * |
| * Here we look into the OR map, dig out the "work", and if the indicated share is not |
| * there, forcibly add it to the expanded shares list. |
| */ |
| Map<Share, Share> sanityCheckForOrchestrator(IRmJob j, Map<Share, Share> shares, Map<Share, Share>expanded) |
| { |
| String methodName = "sanityCheckForOrchestrator"; |
| IDuccWork w = localMap.findDuccWork(j.getId()); |
| |
| if ( w == null ) return null; // deal with race - the job could have completed right as we are ready to |
| // publish, in which case it's gone from localMap |
| if ( shares == null ) return null; // no shares for whatever reason, we couldn't care less ... |
| |
| Map<Share, Share> ret = new HashMap<Share, Share>(); |
| switch ( w.getDuccType() ) { |
| case Job: |
| case Service: |
| { |
| IDuccWorkExecutable de = (IDuccWorkExecutable) w; |
| IDuccProcessMap pm = de.getProcessMap(); |
| |
| for ( Share s : shares.values() ) { |
| IDuccProcess p = pm.get(s.getId()); |
| if ( p == null ) { |
| if ( (expanded == null) || (!expanded.containsKey(s)) ) { |
| logger.warn(methodName, j.getId(), "Redrive share assignment: ", s); |
| ret.put(s, s); |
| } |
| } |
| } |
| } |
| break; |
| |
| case Reservation: |
| { |
| IDuccWorkReservation de = (IDuccWorkReservation) w; |
| IDuccReservationMap rm = de.getReservationMap(); |
| |
| for ( Share s : shares.values() ) { |
| IDuccReservation r = rm.get(s.getId()); |
| if ( r == null ) { |
| if ( (expanded == null) || (!expanded.containsKey(s)) ) { |
| logger.warn(methodName, j.getId(), "Redrive share assignment:", s); |
| ret.put(s, s); |
| } |
| } |
| } |
| } |
| break; |
| } |
| return ret; |
| } |
| |
| boolean isPendingNonPreemptable(IRmJob j) |
| { |
| String methodName = "isPendingNonPreemptable"; |
| // If fair share it definitely isn't any kind of preemptable |
| if ( j.getResourceClass().getPolicy() == Policy.FAIR_SHARE) return false; |
| |
| // otherwise, if the shares it has allocated is < the number it wants, it is in fact |
| // pending but not complete. |
| logger.info(methodName, j.getId(), "countNShares", j.countNShares(), "countInstances", j.countInstances(), "isComplete", j.isCompleted()); |
| |
| if ( j.isCompleted() ) { |
| return false; |
| } |
| |
| if ( j.countNShares() == j.countInstances() ) { |
| j.markComplete(); // non-preemptable, remember it finally got it's max |
| return false; |
| } |
| |
| return (j.countNShares() < j.countInstances()); |
| } |
| |
| /** |
| * If no state has changed, we just resend that last one. |
| */ |
| Map<DuccId, IRmJobState> previousJobState = new HashMap<DuccId, IRmJobState>(); |
| |
| |
| /** |
| * Here's where we make a IRmStateEvent from the JobManagerUpdate so the caller can publish it. |
| */ |
| public RmStateDuccEvent createState(JobManagerUpdate jmu) |
| { |
| String methodName = "createState"; |
| //ArrayList<IRmJobState> rmJobState = null; |
| Map<DuccId, IRmJobState> rmJobState = null; |
| |
| if ( jmu == null ) { // no changes |
| rmJobState = previousJobState; |
| } else { |
| rmJobState = new HashMap<DuccId, IRmJobState>(); |
| |
| // Must handle all jobs that ar refused here in JMC because nobody else knows about them |
| Map<IRmJob, IRmJob> refused = new HashMap<IRmJob, IRmJob>(); |
| synchronized(refusedJobs) { |
| refused.putAll(refusedJobs); |
| refusedJobs.clear(); |
| } |
| |
| for ( IRmJob j : refused.values() ) { |
| RmJobState rjs = new RmJobState(j.getId(), j.getRefusalReason()); |
| rjs.setDuccType(j.getDuccType()); |
| rmJobState.put(j.getId(), rjs); |
| } |
| |
| // Now handle the jobs that made it into the scheduler proper |
| Map<DuccId, IRmJob> jobs = jmu.getAllJobs(); |
| Map<DuccId, HashMap<Share, Share>> shrunken = jmu.getShrunkenShares(); |
| Map<DuccId, HashMap<Share, Share>> expanded = jmu.getExpandedShares(); |
| // for ( DuccId id : expanded.keySet() ) { |
| // logger// .info(methodName, id, "Fetched these expanded shares:", expanded.get(id)); |
| // } |
| |
| /** |
| * Convert RM internal state into the simplified externally published state. |
| */ |
| for (IRmJob j : jobs.values()) { |
| |
| if ( j.isRefused() ) { |
| RmJobState rjs = new RmJobState(j.getId(), j.getRefusalReason()); |
| rjs.setDuccType(j.getDuccType()); |
| rmJobState.put(j.getId(), rjs); |
| jobRemoved(j.getId()); |
| logger.warn(methodName, j.getId(), "Refusal: ", j.getRefusalReason()); |
| continue; |
| } |
| |
| Map<DuccId, IResource> all_shares = new LinkedHashMap<DuccId, IResource>(); |
| Map<DuccId, IResource> shrunken_shares = new LinkedHashMap<DuccId, IResource>(); |
| Map<DuccId, IResource> expanded_shares = new LinkedHashMap<DuccId, IResource>(); |
| Map<Share, Share> shares = null; |
| Map<Share, Share> redrive = null; |
| |
| if (isPendingNonPreemptable(j) ) { |
| logger.info(methodName, j.getId(), "Delaying publication of expansion because it's not yet complete."); |
| } else { |
| shares = j.getAssignedShares(); |
| if ( shares != null ) { |
| ArrayList<Share> sorted = new ArrayList<Share>(shares.values()); |
| Collections.sort(sorted, new RmJob.ShareByInvestmentSorter()); |
| for ( Share s : sorted ) { |
| Resource r = new Resource(s.getId(), s.getNode(), s.isPurged(), s.getShareOrder(), s.getInitializationTime()); |
| all_shares.put(s.getId(), r); |
| } |
| redrive = sanityCheckForOrchestrator(j, shares, expanded.get(j.getId())); |
| } |
| |
| shares = shrunken.get(j.getId()); |
| if ( shares != null ) { |
| for ( Share s : shares.values() ) { |
| Resource r = new Resource(s.getId(), s.getNode(), s.isPurged(), s.getShareOrder(), 0); |
| shrunken_shares.put(s.getId(), r); |
| } |
| } |
| |
| shares = expanded.get(j.getId()); |
| if ( shares != null ) { |
| for ( Share s : shares.values() ) { |
| Resource r = new Resource(s.getId(), s.getNode(), s.isPurged(), s.getShareOrder(), 0); |
| expanded_shares.put(s.getId(), r); |
| } |
| } |
| |
| if ( redrive != null ) { |
| for ( Share s : redrive.values() ) { |
| Resource r = new Resource(s.getId(), s.getNode(), s.isPurged(), s.getShareOrder(), 0); |
| expanded_shares.put(s.getId(), r); |
| } |
| } |
| } |
| |
| RmJobState rjs = new RmJobState(j.getId(), all_shares, shrunken_shares, expanded_shares); |
| rjs.setDuccType(j.getDuccType()); |
| rmJobState.put(j.getId(), rjs); |
| } |
| |
| previousJobState = rmJobState; |
| } |
| |
| RmStateDuccEvent response = new RmStateDuccEvent(rmJobState); |
| try { |
| logger.info(methodName, null, "Schedule sent to Orchestrator"); |
| logger.info(methodName, null, response.toString() ); |
| } catch (Exception e) { |
| logger.error(methodName, null, e); |
| } |
| |
| return response; |
| |
| } |
| |
| /** |
| * Got an OR map and we're ok for fast recovery. If the map has no "live" jobs we just ignore it - that's first-time |
| * startup and OR will not start if there is no JD node, so we do normal init stability. Otherwise, we assume that the |
| * JD node is included, build the resource map, and allow scheduling to proceed. |
| */ |
| boolean recoverFromOrchestrator(DuccWorkMap jobmap) |
| { |
| String methodName = "recoverFromOrchestrator"; |
| Map<Node, Node> nodes = new HashMap<Node, Node>(); |
| for ( IDuccWork w : jobmap.values() ) { |
| String prefix = "?"; |
| switch ( w.getDuccType() ) { |
| case Job: |
| prefix = "JOB"; |
| break; |
| case Service: |
| prefix = "SVC"; |
| break; |
| case Reservation: |
| prefix = "RES"; |
| break; |
| } |
| |
| if ( w.isCompleted() ) { |
| logger.info(methodName, w.getDuccId(), "Ignoring completed work:", w.getDuccType(), ":", w.getStateObject()); |
| continue; |
| } |
| |
| switch ( w.getDuccType() ) { |
| case Job: |
| case Service: |
| { |
| IDuccWorkExecutable de = (IDuccWorkExecutable) w; |
| IDuccProcessMap pm = de.getProcessMap(); |
| logger.info(methodName, w.getDuccId(), "Receive:", prefix, w.getDuccType(), w.getStateObject(), "processes[", pm.size(), "] Completed:", w.isCompleted()); |
| |
| for ( IDuccProcess proc : pm.values() ) { |
| String pid = proc.getPID(); |
| ProcessState state = proc.getProcessState(); |
| Node n = proc.getNode(); |
| if ( n == null ) { |
| logger.info(methodName, w.getDuccId(), " Process[", pid, "] state [", state, "] is complete[", proc.isComplete(), "] Node [N/A] mem[N/A"); |
| } else { |
| long mem = n .getNodeMetrics().getNodeMemory().getMemTotal(); |
| logger.info(methodName, w.getDuccId(), |
| " Process[", pid, |
| "] state [", state, |
| "] is complete [", proc.isComplete(), |
| "] Node [", n.getNodeIdentity().getName() + "." + proc.getDuccId(), |
| "] mem [", mem, "]"); |
| logger.info(methodName, w.getDuccId(), " Recover node[", n.getNodeIdentity().getName()); |
| // |
| // Note, not ignoring dead processes belonging to live jobs. Is this best or should we be |
| // more conservative and not use nodes that we don't know 100% for sure are ok? |
| // |
| nodes.put(n, n); |
| } |
| } |
| } |
| break; |
| |
| // case Service: |
| // { |
| // IDuccWorkExecutable de = (IDuccWorkExecutable) w; |
| // IDuccProcessMap pm = de.getProcessMap(); |
| // logger.info(methodName, w.getDuccId(), prefix, w.getDuccType(), "processes[", pm.size(), "]."); |
| // } |
| // break; |
| |
| case Reservation: |
| { |
| IDuccWorkReservation de = (IDuccWorkReservation) w; |
| IDuccReservationMap rm = de.getReservationMap(); |
| |
| logger.info(methodName, w.getDuccId(), "Receive:", prefix, w.getDuccType(), w.getStateObject(), "processes[", rm.size(), "] Completed:", w.isCompleted()); |
| |
| for ( IDuccReservation r: rm.values()) { |
| Node n = r.getNode(); |
| if ( n == null ) { |
| logger.info(methodName, w.getDuccId(), |
| " Node [N/A] mem[N/A"); |
| } else { |
| long mem = n .getNodeMetrics().getNodeMemory().getMemTotal(); |
| logger.info(methodName, w.getDuccId(), |
| " Node[", n.getNodeIdentity().getName(), |
| "] mem[", mem, "]"); |
| nodes.put(n, n); |
| } |
| } |
| } |
| break; |
| |
| default: |
| logger.info(methodName, w.getDuccId(), "Received work of type ?", w.getDuccType()); |
| break; |
| } |
| } |
| logger.info(methodName, null, "Recovered[", nodes.size(), "] nodes from OR state."); |
| for (Node n : nodes.values() ) { |
| nodeStability.nodeArrives(n); |
| } |
| |
| return (nodes.size() != 0); |
| } |
| |
| } |
| |