blob: e44835a073122a4bcf8819f4b4c9c1189d2985dc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.orchestrator;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.uima.ducc.common.internationalization.Messages;
import org.apache.uima.ducc.common.node.metrics.ProcessGarbageCollectionStats;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
import org.apache.uima.ducc.common.utils.TimeStamp;
import org.apache.uima.ducc.common.utils.id.DuccId;
import org.apache.uima.ducc.orchestrator.user.UserLogging;
import org.apache.uima.ducc.orchestrator.utilities.TrackSync;
import org.apache.uima.ducc.transport.agent.IUimaPipelineAEComponent;
import org.apache.uima.ducc.transport.event.common.DuccWorkJob;
import org.apache.uima.ducc.transport.event.common.DuccWorkMap;
import org.apache.uima.ducc.transport.event.common.IDuccProcess;
import org.apache.uima.ducc.transport.event.common.IDuccProcessMap;
import org.apache.uima.ducc.transport.event.common.IDuccSchedulingInfo;
import org.apache.uima.ducc.transport.event.common.IDuccState.JobState;
import org.apache.uima.ducc.transport.event.common.IDuccWork;
import org.apache.uima.ducc.transport.event.common.IDuccWorkExecutable;
import org.apache.uima.ducc.transport.event.common.IDuccWorkJob;
import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
import org.apache.uima.ducc.transport.event.common.IResourceState.ProcessDeallocationType;
import org.apache.uima.ducc.transport.event.common.IResourceState.ResourceState;
import org.apache.uima.ducc.transport.event.common.ITimeWindow;
import org.apache.uima.ducc.transport.event.common.TimeWindow;
import org.apache.uima.ducc.transport.event.jd.IDriverStatusReport;
public class ProcessAccounting {
private static final DuccLogger logger = DuccLoggerComponents.getOrLogger(ProcessAccounting.class.getName());
private OrchestratorCommonArea orchestratorCommonArea = OrchestratorCommonArea.getInstance();
private Messages messages = orchestratorCommonArea.getSystemMessages();
private DuccWorkMap workMap = orchestratorCommonArea.getWorkMap();
private ConcurrentHashMap<DuccId,DuccId> processToJobMap = new ConcurrentHashMap<DuccId,DuccId>();
private StateJobAccounting stateJobAccounting = StateJobAccounting.getInstance();
public ProcessAccounting() {
}
public ProcessAccounting(ConcurrentHashMap<DuccId,DuccId> processToJobMap) {
setProcessToJobMap(processToJobMap);
}
public ConcurrentHashMap<DuccId,DuccId> getProcessToJobMap() {
return this.processToJobMap;
}
private void setProcessToJobMap(ConcurrentHashMap<DuccId,DuccId> processToJobMap) {
this.processToJobMap = processToJobMap;
}
public DuccId getJobId(DuccId processId) {
String methodName = "getJobId";
DuccId retVal;
TrackSync ts = TrackSync.await(workMap, this.getClass(), methodName);
synchronized(workMap) {
ts.using();
retVal = processToJobMap.get(processId);
}
ts.ended();
return retVal;
}
public int processCount() {
String methodName = "processCount";
int retVal;
TrackSync ts = TrackSync.await(workMap, this.getClass(), methodName);
synchronized(workMap) {
ts.using();
retVal = processToJobMap.size();
}
ts.ended();
return retVal;
}
public boolean addProcess(DuccId processId, DuccId jobId) {
String methodName = "addProcess";
logger.trace(methodName, null, messages.fetch("enter"));
boolean retVal = false;
TrackSync ts = TrackSync.await(workMap, this.getClass(), methodName);
ts.using();
DuccId key = processToJobMap.put(processId, jobId);
if(key == null) {
retVal = true;
logger.info(methodName, jobId, processId, messages.fetch("added"));
}
else {
logger.warn(methodName, jobId, processId, messages.fetch("exists"));
}
ts.ended();
logger.trace(methodName, null, messages.fetch("exit"));
return retVal;
}
public boolean removeProcess(DuccId processId) {
String methodName = "removeProcess";
logger.trace(methodName, null, messages.fetch("enter"));
boolean retVal = false;
TrackSync ts = TrackSync.await(workMap, this.getClass(), methodName);
synchronized(workMap) {
ts.using();
if(processToJobMap.containsKey(processId)) {
DuccId jobId = processToJobMap.remove(processId);
retVal = true;
logger.info(methodName, jobId, processId, messages.fetch("removed"));
}
else {
logger.warn(methodName, null, processId, messages.fetch("not found"));
}
}
ts.ended();
logger.trace(methodName, null, messages.fetch("exit"));
return retVal;
}
private boolean compare(String a, String b) {
boolean retVal = false;
if(a == null) {
if(b == null) {
retVal = true;
}
}
else {
return a.equals(b);
}
return retVal;
}
private boolean compare(ITimeWindow a, ITimeWindow b) {
boolean retVal = false;
if((a == null) && (b == null)) {
retVal = true;
}
else if((a != null) && (b != null)) {
retVal = compare(a.getStart(),b.getStart()) && compare(a.getEnd(),b.getEnd());
}
return retVal;
}
private void copyInventoryPID(IDuccWork dw, IDuccProcess inventoryProcess, IDuccProcess process) {
String methodName = "copyInventoryPID";
logger.trace(methodName, null, messages.fetch("enter"));
String newValue = inventoryProcess.getPID();
String oldValue = process.getPID();
logger.debug(methodName, dw.getDuccId(), inventoryProcess.getDuccId(), ""+newValue);
if(newValue == null) {
if(oldValue != null) {
logger.warn(methodName, dw.getDuccId(), inventoryProcess.getDuccId(), "PID"+" "+"old:"+oldValue+" "+"new:"+newValue+" "+"keeping old");
}
}
else {
if(oldValue == null) {
process.setPID(newValue);
}
else {
if(oldValue.equals(newValue)) {
//OK
}
else {
logger.warn(methodName, dw.getDuccId(), inventoryProcess.getDuccId(), "PID"+" "+"old:"+oldValue+" "+"new:"+newValue+" "+"keeping old");
}
}
}
logger.trace(methodName, null, messages.fetch("exit"));
return;
}
private void copyInventorySwapUsage(IDuccWork dw, IDuccProcess inventoryProcess, IDuccProcess process) {
String methodName = "copyInventorySwapUsage";
logger.trace(methodName, null, messages.fetch("enter"));
long value = inventoryProcess.getSwapUsage();
logger.trace(methodName, dw.getDuccId(), inventoryProcess.getDuccId(), "PID:"+process.getPID()+" "+"swap:"+value);
process.setSwapUsage(value);
if(process.getSwapUsageMax() < process.getSwapUsage()) {
process.setSwapUsageMax(process.getSwapUsage());
}
logger.trace(methodName, null, messages.fetch("exit"));
return;
}
private void copyInventoryMajorFaults(IDuccWork dw, IDuccProcess inventoryProcess, IDuccProcess process) {
String methodName = "copyInventoryMajorFaults";
logger.trace(methodName, null, messages.fetch("enter"));
process.setMajorFaults(inventoryProcess.getMajorFaults());
logger.trace(methodName, null, messages.fetch("exit"));
return;
}
private void copyInventoryRss(IDuccWork dw, IDuccProcess inventoryProcess, IDuccProcess process) {
String methodName = "copyInventoryRss";
logger.trace(methodName, null, messages.fetch("enter"));
process.setResidentMemory(inventoryProcess.getResidentMemory());
logger.trace(methodName, null, messages.fetch("exit"));
return;
}
private boolean validateGCStats(DuccId jobid, DuccId processId, ProcessGarbageCollectionStats newGCS, ProcessGarbageCollectionStats oldGCS) {
String location = "validateGCStats";
boolean retVal = true;
if(oldGCS == null) {
//retVal = true;
}
else if(newGCS == null) {
logger.warn(location, jobid, processId, "ProcessGarbageCollectionStats missing?");
retVal = false;
}
else {
long newCC = newGCS.getCollectionCount();
long oldCC = oldGCS.getCollectionCount();
if(newCC < oldCC) {
logger.warn(location, jobid, processId, "CollectionCount "+newCC+" < "+oldCC);
retVal = false;
}
long newCT = newGCS.getCollectionTime();
long oldCT = oldGCS.getCollectionTime();
if(newCT < oldCT) {
logger.warn(location, jobid, processId, "CollectionTime "+newCT+" < "+oldCT);
retVal = false;
}
}
return retVal;
}
private void copyInventoryGCStats(IDuccWork dw, IDuccProcess inventoryProcess, IDuccProcess process) {
String methodName = "copyInventoryGCStats";
logger.trace(methodName, null, messages.fetch("enter"));
DuccId jobId = dw.getDuccId();
DuccId processId = process.getDuccId();
ProcessGarbageCollectionStats newGCS = inventoryProcess.getGarbageCollectionStats();
ProcessGarbageCollectionStats oldGCS = process.getGarbageCollectionStats();
if(validateGCStats(jobId,processId,newGCS,oldGCS)) {
process.setGarbageCollectionStats(newGCS);
ProcessGarbageCollectionStats gcs = process.getGarbageCollectionStats();
if(gcs != null) {
logger.trace(methodName, jobId, processId, "GC Stats Count:"+gcs.getCollectionCount());
logger.trace(methodName, jobId, processId, "GC Stats Time:"+gcs.getCollectionTime());
}
}
logger.trace(methodName, null, messages.fetch("exit"));
return;
}
private void copyInventoryCpuTime(IDuccWork dw, IDuccProcess inventoryProcess, IDuccProcess process) {
String methodName = "copyInventoryCpuTime";
logger.trace(methodName, null, messages.fetch("enter"));
process.setCpuTime(inventoryProcess.getCpuTime());
process.setCurrentCPU(inventoryProcess.getCurrentCPU());
logger.trace(methodName, dw.getDuccId(), process.getDuccId(), "Cpu Time (overall):"+process.getCpuTime());
logger.trace(methodName, dw.getDuccId(), process.getDuccId(), "Cpu Time (current):"+process.getCurrentCPU());
logger.trace(methodName, null, messages.fetch("exit"));
return;
}
private void copyTimeInit(IDuccProcess inventoryProcess, IDuccProcess process) {
String methodName = "copyTimeInit";
logger.trace(methodName, null, messages.fetch("enter"));
DuccId processId = inventoryProcess.getDuccId();
DuccId jobId = getJobId(processId);
ITimeWindow twInit = inventoryProcess.getTimeWindowInit();
if(twInit != null) {
if(!compare(twInit,process.getTimeWindowInit())) {
process.setTimeWindowInit(twInit);
String millis;
String ts;
millis = process.getTimeWindowInit().getStart();
if(millis != null) {
ts = TimeStamp.simpleFormat(millis);
logger.trace(methodName, jobId, processId, messages.fetchLabel("initialization start")+ts);
}
millis = process.getTimeWindowInit().getEnd();
if(millis != null) {
ts = TimeStamp.simpleFormat(millis);
logger.trace(methodName, jobId, processId, messages.fetchLabel("initialization end")+ts);
}
}
}
logger.trace(methodName, null, messages.fetch("exit"));
return;
}
private void copyTimeRun(IDuccProcess inventoryProcess, IDuccProcess process) {
String methodName = "copyTimeRun";
logger.trace(methodName, null, messages.fetch("enter"));
//validate(inventoryProcess);
DuccId processId = inventoryProcess.getDuccId();
DuccId jobId = getJobId(processId);
ITimeWindow twRun = inventoryProcess.getTimeWindowRun();
if(twRun != null) {
if(!compare(twRun,process.getTimeWindowRun())) {
process.setTimeWindowRun(twRun);
String millis;
String ts;
millis = process.getTimeWindowRun().getStart();
if(millis != null) {
ts = TimeStamp.simpleFormat(millis);
logger.trace(methodName, jobId, processId, messages.fetchLabel("run start")+ts);
}
millis = process.getTimeWindowRun().getEnd();
if(millis != null) {
ts = TimeStamp.simpleFormat(millis);
logger.trace(methodName, jobId, processId, messages.fetchLabel("run end")+ts);
}
}
}
logger.trace(methodName, null, messages.fetch("exit"));
return;
}
private void setResourceStateAndReason(IDuccWorkJob job, IDuccProcess inventoryProcess, IDuccProcess process) {
String methodName = "setResourceStateAndReason";
logger.trace(methodName, job.getDuccId(), messages.fetch("enter"));
switch(inventoryProcess.getProcessState()) {
case LaunchFailed:
case Stopped:
case Failed:
case FailedInitialization:
case InitializationTimeout:
case Killed:
switch(process.getResourceState()) {
case Allocated:
OrUtil.setResourceState(job, process, ResourceState.Deallocated);
String reason = inventoryProcess.getReasonForStoppingProcess();
logger.info(methodName, job.getDuccId(), process.getDuccId(), messages.fetchLabel("process state")+inventoryProcess.getProcessState()+" => "+messages.fetchLabel("resource state")+process.getResourceState()+" : "+messages.fetchLabel("reason")+reason);
switch(inventoryProcess.getProcessState()) {
case Stopped:
if(reason != null) {
process.setReasonForStoppingProcess(reason);
}
process.setProcessDeallocationType(ProcessDeallocationType.AutonomousStop);
break;
case LaunchFailed:
case Failed:
if(reason != null) {
process.setReasonForStoppingProcess(reason);
}
process.setProcessDeallocationType(ProcessDeallocationType.Failed);
break;
/*
case FailedInitialization:
if(reason != null) {
process.setReasonForStoppingProcess(reason);
}
process.setProcessDeallocationType(ProcessDeallocationType.FailedInitialization);
break;
case InitializationTimeout:
if(reason != null) {
process.setReasonForStoppingProcess(reason);
}
process.setProcessDeallocationType(ProcessDeallocationType.InitializationTimeout);
break;
*/
case Killed:
if(reason != null) {
process.setReasonForStoppingProcess(reason);
}
process.setProcessDeallocationType(ProcessDeallocationType.Killed);
break;
default:
break;
}
break;
default:
logger.debug(methodName, job.getDuccId(), process.getDuccId(), messages.fetchLabel("process state")+inventoryProcess.getProcessState()+" => "+messages.fetchLabel("resource state")+process.getResourceState());
break;
}
switch(job.getDuccType()) {
case Service:
IDuccWorkJob service = job;
String userName = service.getStandardInfo().getUser();
String userLogDir = service.getUserLogsDir();
UserLogging.error(userName, userLogDir, "reason for stopping service instance["+service.getDuccId().getFriendly()+"]: "+process.getReasonForStoppingProcess());
break;
default:
break;
}
break;
default:
logger.debug(methodName, job.getDuccId(), process.getDuccId(), messages.fetchLabel("process state")+inventoryProcess.getProcessState()+" => "+messages.fetchLabel("resource state")+process.getResourceState());
break;
}
logger.trace(methodName, job.getDuccId(), messages.fetch("exit"));
}
private void copyInventoryProcessState(IDuccWorkJob job, IDuccProcess inventoryProcess, IDuccProcess process) {
String methodName = "copyInventoryProcessState";
logger.trace(methodName, job.getDuccId(), messages.fetch("enter"));
if(!compare(inventoryProcess.getProcessState().toString(),process.getProcessState().toString())) {
switch((JobState)job.getStateObject()) {
//case Initializing:
// logger.info(methodName, jobId, processId, messages.fetchLabel("process state ignored")+inventoryProcess.getProcessState());
// break;
default:
process.advanceProcessState(inventoryProcess.getProcessState());
logger.trace(methodName, job.getDuccId(), process.getDuccId(), messages.fetchLabel("process state")+process.getProcessState());
if ( inventoryProcess.getProcessJmxUrl() != null && process.getProcessJmxUrl() == null) {
process.setProcessJmxUrl(inventoryProcess.getProcessJmxUrl());
}
break;
}
}
logger.trace(methodName, job.getDuccId(), messages.fetch("exit"));
}
private void copyReasonForStoppingProcess(IDuccWorkJob job, IDuccProcess inventoryProcess, IDuccProcess process) {
String methodName = "copyReasonForStoppingProcess";
logger.trace(methodName, job.getDuccId(), messages.fetch("enter"));
switch(inventoryProcess.getProcessState()) {
case LaunchFailed:
case Stopped:
case Failed:
case FailedInitialization:
case InitializationTimeout:
case Killed:
String reasonNew = inventoryProcess.getReasonForStoppingProcess();
String reasonOld = process.getReasonForStoppingProcess();
if(reasonNew != null) {
if(reasonOld == null) {
process.setReasonForStoppingProcess(reasonNew);
logger.info(methodName, job.getDuccId(), process.getDuccId(), messages.fetchLabel("process reason code")+process.getReasonForStoppingProcess());
}
else if(!reasonNew.equals(reasonOld)) {
process.setReasonForStoppingProcess(reasonNew);
logger.info(methodName, job.getDuccId(), process.getDuccId(), messages.fetchLabel("process reason code")+process.getReasonForStoppingProcess());
}
}
break;
default:
break;
}
logger.trace(methodName, job.getDuccId(), messages.fetch("exit"));
}
private void copyProcessExitCode(IDuccWorkJob job, IDuccProcess inventoryProcess, IDuccProcess process) {
String methodName = "copyProcessExitCode";
logger.trace(methodName, job.getDuccId(), messages.fetch("enter"));
switch(inventoryProcess.getProcessState()) {
case LaunchFailed:
case Stopped:
case Failed:
case FailedInitialization:
case InitializationTimeout:
case Killed:
int codeNew = inventoryProcess.getProcessExitCode();
int codeOld = process.getProcessExitCode();
if(codeNew != codeOld) {
process.setProcessExitCode(codeNew);
logger.info(methodName, job.getDuccId(), process.getDuccId(), messages.fetchLabel("process exit code")+process.getProcessExitCode());
}
break;
default:
break;
}
logger.trace(methodName, job.getDuccId(), messages.fetch("exit"));
}
private void copyUimaPipelineComponentsState(IDuccWorkJob job, IDuccProcess inventoryProcess, IDuccProcess process) {
String methodName = "copyUimaPipelineComponentsState";
logger.trace(methodName, job.getDuccId(), messages.fetch("enter"));
List<IUimaPipelineAEComponent> list = inventoryProcess.getUimaPipelineComponents();
if(list != null) {
logger.trace(methodName, job.getDuccId(), "size: "+list.size());
process.setUimaPipelineComponents(list);
}
else {
logger.trace(methodName, job.getDuccId(), "size: null");
}
logger.trace(methodName, job.getDuccId(), messages.fetch("exit"));
}
private ITimeWindow makeTimeWindow(String ts) {
ITimeWindow tw = new TimeWindow();
tw.setStart(ts);
tw.setEnd(ts);
return tw;
}
private void initStop(IDuccWorkJob job, IDuccProcess process) {
String ts = TimeStamp.getCurrentMillis();
ITimeWindow twi = process.getTimeWindowInit();
if(twi == null) {
twi = makeTimeWindow(ts);
process.setTimeWindowInit(twi);
}
else {
long i0 = twi.getStartLong();
long i1 = twi.getEndLong();
if(i0 != i1) {
if(i1 < i0) {
twi.setEnd(ts);
}
}
}
}
private void runStart(IDuccWorkJob job, IDuccProcess process) {
ITimeWindow twi = process.getTimeWindowInit();
ITimeWindow twr = makeTimeWindow(twi.getEnd());
process.setTimeWindowRun(twr);
}
private void runStop(IDuccWorkJob job, IDuccProcess process) {
String ts = TimeStamp.getCurrentMillis();
ITimeWindow twi = process.getTimeWindowInit();
if(twi == null) {
twi = makeTimeWindow(ts);
process.setTimeWindowRun(twi);
}
ITimeWindow twr = process.getTimeWindowRun();
if(twr == null) {
twr = makeTimeWindow(twi.getEnd());
process.setTimeWindowRun(twr);
}
else {
long r0 = twr.getStartLong();
long r1 = twr.getEndLong();
if(r0 != r1) {
if(r1 < r0) {
twr.setEnd(ts);
}
}
}
adjustWindows(job, process);
adjustRunTime(process);
}
// <uima-3351>
private void adjustRunTime(IDuccProcess process) {
if(!process.isAssignedWork()) {
ITimeWindow twr = process.getTimeWindowRun();
if(twr == null) {
twr = new TimeWindow();
process.setTimeWindowRun(twr);
}
long time = 0;
twr.setStartLong(time);
twr.setEndLong(time);
}
}
// </uima-3351>
private void adjustWindows(IDuccWorkJob job, IDuccProcess process) {
String methodName = "adjustWindows";
ITimeWindow twi = process.getTimeWindowInit();
long i0 = twi.getStartLong();
long i1 = twi.getEndLong();
ITimeWindow twr = process.getTimeWindowRun();
long r0 = twr.getStartLong();
long r1 = twr.getEndLong();
if(i0 != i1) {
if(r0 != r1) {
if(r0 < i1) {
logger.warn(methodName, job.getDuccId(), process.getDuccId(), "run-start: "+r0+" -> "+i1);
r0 = i1;
twr.setStartLong(r0);
if(r1 < r0) {
logger.warn(methodName, job.getDuccId(), process.getDuccId(), "run-end: "+r1+" -> "+r0);
r1 = r0;
twr.setEndLong(r1);
}
}
}
}
}
private void updateProcessInitilization(IDuccWorkJob job, IDuccProcess inventoryProcess, IDuccProcess process) {
switch(inventoryProcess.getProcessState()) {
case Running:
process.setInitialized();
if(job != null) {
switch(job.getDuccType()) {
case Service:
switch(job.getJobState()) {
case Initializing:
stateJobAccounting.stateChange(job, JobState.Running);
break;
default:
break;
}
break;
default:
break;
}
}
default:
break;
}
}
private void updateProcessTime(IDuccWorkJob job, IDuccProcess inventoryProcess, IDuccProcess process) {
String methodName = "updateProcessTime";
logger.trace(methodName, job.getDuccId(), messages.fetch("enter"));
switch(inventoryProcess.getProcessState()) {
case Starting: // Process Manager sent request to start the Process
case Initializing: // Process Agent is initializing process
copyTimeInit(inventoryProcess, process);
break;
case Running: // Process Agent is processing work items
copyTimeInit(inventoryProcess, process);
initStop(job, process);
runStart(job, process);
copyTimeRun(inventoryProcess, process);
break;
case LaunchFailed: // Process Agent reports process launch failed
case Stopped: // Process Agent reports process stopped
case Failed: // Process Agent reports process failed
case FailedInitialization: // Process Agent reports process failed initialization
case InitializationTimeout: // Process Agent reports process initialization timeout
case Killed: // Agent forcefully killed the process
copyTimeInit(inventoryProcess, process);
copyTimeRun(inventoryProcess, process);
initStop(job, process);
runStop(job, process);
break;
case Undefined:
break;
default:
break;
}
logger.trace(methodName, job.getDuccId(), messages.fetch("exit"));
}
public void setStatus(IDuccProcess inventoryProcess) {
String methodName = "setStatus";
logger.trace(methodName, null, messages.fetch("enter"));
try {
DuccId processId = inventoryProcess.getDuccId();
logger.debug(methodName, null, processId, messages.fetchLabel("node")+inventoryProcess.getNodeIdentity().getName()+" "+messages.fetchLabel("PID")+inventoryProcess.getPID());
TrackSync ts = TrackSync.await(workMap, this.getClass(), methodName);
synchronized(workMap) {
ts.using();
if(processToJobMap.containsKey(processId)) {
DuccId jobId = getJobId(processId);
IDuccWork duccWork = WorkMapHelper.findDuccWork(workMap, jobId, this, methodName);
if(duccWork != null) {
if(duccWork instanceof IDuccWorkExecutable) {
IDuccWorkExecutable duccWorkExecutable = (IDuccWorkExecutable) duccWork;
IDuccWorkJob job = null;
if(duccWork instanceof IDuccWorkJob) {
job = (IDuccWorkJob)duccWork;
}
IDuccProcessMap processMap = duccWorkExecutable.getProcessMap();
IDuccProcess process = processMap.get(processId);
if(process == null) {
if(job != null) {
process = job.getDriver().getProcessMap().get(processId);
OrchestratorHelper.jdDeallocate(job, inventoryProcess);
}
}
if(process != null) {
if(process.isComplete()) {
logger.trace(methodName, jobId, process.getDuccId(), "finalized");
}
else {
logger.trace(methodName, jobId, process.getDuccId(), "active");
// PID
copyInventoryPID(job, inventoryProcess, process);
// Scheduler State
setResourceStateAndReason(job, inventoryProcess, process);
// Process State
copyInventoryProcessState(job, inventoryProcess, process);
// Process Reason
copyReasonForStoppingProcess(job, inventoryProcess, process);
// Process Exit code
copyProcessExitCode(job, inventoryProcess, process);
// Process Init & Run times
updateProcessTime(job, inventoryProcess, process);
// Process Initialization State
updateProcessInitilization(job, inventoryProcess, process);
// Process Pipeline Components State
copyUimaPipelineComponentsState(job, inventoryProcess, process);
// Process Swap Usage
copyInventorySwapUsage(job, inventoryProcess, process);
// Process Major Faults
copyInventoryMajorFaults(job, inventoryProcess, process);
// Process Rss
copyInventoryRss(job, inventoryProcess, process);
// Process GC Stats
copyInventoryGCStats(job, inventoryProcess, process);
// Process CPU Time
copyInventoryCpuTime(job, inventoryProcess, process);
}
}
else {
logger.warn(methodName, jobId, processId, messages.fetch("process not found job's process table"));
}
}
else {
logger.warn(methodName, jobId, processId, messages.fetch("not executable"));
}
}
else {
logger.warn(methodName, jobId, processId, messages.fetch("job ID not found"));
}
}
else {
logger.warn(methodName, null, processId, messages.fetch("ID not found in process map"));
}
}
ts.ended();
}
catch(Throwable t) {
logger.error(methodName, null, t);
}
logger.trace(methodName, null, messages.fetch("exit"));
return;
}
public boolean setStatus(IDriverStatusReport jdStatusReport, DuccWorkJob duccWorkJob) {
String methodName = "setStatus";
logger.trace(methodName, null, messages.fetch("enter"));
boolean retVal = false;
String jdTotalWorkItems = ""+jdStatusReport.getWorkItemsTotal();
if(!compare(jdTotalWorkItems,duccWorkJob.getSchedulingInfo().getWorkItemsTotal())) {
duccWorkJob.getSchedulingInfo().setWorkItemsTotal(jdTotalWorkItems);
// <uima-3533> Limit max-processes to most that can be used (relevant only for policy fixed)
long total_work = jdStatusReport.getWorkItemsTotal();
int tps = duccWorkJob.getSchedulingInfo().getIntThreadsPerProcess();
if (total_work > 0 && tps > 0) {
long max_usable = (total_work + tps - 1) / tps;
long max_processes = duccWorkJob.getSchedulingInfo().getLongProcessesMax();
if (max_processes > max_usable) {
duccWorkJob.getSchedulingInfo().setLongProcessesMax(max_usable);
logger.info(methodName, duccWorkJob.getDuccId(), "reduced max-processes", max_processes, "to max-usable", max_usable);
}
}
// </uima-3353>
}
String jdCompletedWorkItems = ""+jdStatusReport.getWorkItemsProcessingCompleted();
if(!compare(jdCompletedWorkItems,duccWorkJob.getSchedulingInfo().getWorkItemsCompleted())) {
duccWorkJob.getSchedulingInfo().setWorkItemsCompleted(jdCompletedWorkItems);
}
String jdDispatchedWorkItems = ""+jdStatusReport.getWorkItemsDispatched();
if(!compare(jdDispatchedWorkItems,duccWorkJob.getSchedulingInfo().getWorkItemsDispatched())) {
duccWorkJob.getSchedulingInfo().setWorkItemsDispatched(jdDispatchedWorkItems);
}
String jdErrorWorkItems = ""+jdStatusReport.getWorkItemsProcessingError();
if(!compare(jdErrorWorkItems,duccWorkJob.getSchedulingInfo().getWorkItemsError())) {
duccWorkJob.getSchedulingInfo().setWorkItemsError(jdErrorWorkItems);
}
String jdRetryWorkItems = ""+jdStatusReport.getWorkItemsRetry();
if(!compare(jdRetryWorkItems,duccWorkJob.getSchedulingInfo().getWorkItemsRetry())) {
duccWorkJob.getSchedulingInfo().setWorkItemsRetry(jdRetryWorkItems);
}
String jdPreemptWorkItems = ""+jdStatusReport.getWorkItemsPreempted();
if(!compare(jdPreemptWorkItems,duccWorkJob.getSchedulingInfo().getWorkItemsPreempt())) {
duccWorkJob.getSchedulingInfo().setWorkItemsPreempt(jdPreemptWorkItems);
}
IDuccSchedulingInfo si = duccWorkJob.getSchedulingInfo();
si.setMostRecentWorkItemStart(jdStatusReport.getMostRecentStart());
si.setPerWorkItemStatistics(jdStatusReport.getPerWorkItemStatistics());
double avgTimeForWorkItemsSkewedByActive = jdStatusReport.getAvgTimeForWorkItemsSkewedByActive();
si.setAvgTimeForWorkItemsSkewedByActive(avgTimeForWorkItemsSkewedByActive);
logger.trace(methodName, null, messages.fetch("exit"));
return retVal;
}
private void deallocate(IDuccWorkJob job, ProcessDeallocationType processDeallocationType, ProcessState processState, IDuccProcessMap processMap, String type) {
String methodName = "deallocate";
logger.trace(methodName, job.getDuccId(), messages.fetch("enter"));
if(processMap != null) {
Collection<IDuccProcess> processCollection = processMap.values();
Iterator<IDuccProcess> iterator = processCollection.iterator();
while(iterator.hasNext()) {
IDuccProcess process = iterator.next();
switch(process.getResourceState()) {
case Allocated:
OrUtil.setResourceState(job, process, ResourceState.Deallocated);
process.setProcessDeallocationType(processDeallocationType);
logger.info(methodName, job.getDuccId(), process.getDuccId(), type);
if(processState != null) {
logger.debug(methodName, job.getDuccId(), process.getProcessState()+" -> "+processState);
process.advanceProcessState(processState);
}
break;
case Deallocated:
if(processState != null) {
logger.debug(methodName, job.getDuccId(), process.getProcessState()+" -> "+processState);
process.advanceProcessState(processState);
}
break;
default:
break;
}
}
}
logger.trace(methodName, job.getDuccId(), messages.fetch("exit"));
return;
}
private void deallocate(IDuccWorkJob job, ProcessDeallocationType processDeallocationType, ProcessState processState) {
String methodName = "deallocate";
logger.trace(methodName, job.getDuccId(), messages.fetch("enter"));
deallocate(job,processDeallocationType,processState,job.getProcessMap(),"worker");
switch(job.getDuccType()) {
case Job:
deallocate(job,processDeallocationType,processState,job.getDriver().getProcessMap(),"driver");
break;
case Service:
break;
default:
break;
}
logger.trace(methodName, job.getDuccId(), messages.fetch("exit"));
return;
}
public void deallocate(IDuccWorkJob job, ProcessDeallocationType processDeallocationType) {
deallocate(job,processDeallocationType,null);
}
public void deallocateAndStop(IDuccWorkJob job, ProcessDeallocationType processDeallocationType) {
deallocate(job,processDeallocationType,ProcessState.Stopped);
}
}