blob: 719be01581eddda71f5495c26a4f0a7c0fb37af2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.orchestrator.maintenance;
import java.util.Collection;
import java.util.Iterator;
import java.util.Set;
import org.apache.uima.ducc.common.internationalization.Messages;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
import org.apache.uima.ducc.common.utils.id.DuccId;
import org.apache.uima.ducc.orchestrator.Constants;
import org.apache.uima.ducc.orchestrator.OrUtil;
import org.apache.uima.ducc.orchestrator.OrchestratorCheckpoint;
import org.apache.uima.ducc.orchestrator.OrchestratorCommonArea;
import org.apache.uima.ducc.orchestrator.OrchestratorHelper;
import org.apache.uima.ducc.orchestrator.StateManager;
import org.apache.uima.ducc.orchestrator.WorkMapHelper;
import org.apache.uima.ducc.transport.event.common.DuccWorkMap;
import org.apache.uima.ducc.transport.event.common.DuccWorkPopDriver;
import org.apache.uima.ducc.transport.event.common.IDuccCompletionType.JobCompletionType;
import org.apache.uima.ducc.transport.event.common.IDuccProcess;
import org.apache.uima.ducc.transport.event.common.IDuccProcessMap;
import org.apache.uima.ducc.transport.event.common.IDuccWorkJob;
import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
import org.apache.uima.ducc.transport.event.common.IRationale;
import org.apache.uima.ducc.transport.event.common.IResourceState.ProcessDeallocationType;
import org.apache.uima.ducc.transport.event.common.IResourceState.ResourceState;
import org.apache.uima.ducc.transport.event.common.Rationale;
public class HealthMonitor {
private static final DuccLogger logger = DuccLoggerComponents.getOrLogger(HealthMonitor.class.getName());
private static HealthMonitor healthMonitor = new HealthMonitor();
public static HealthMonitor getInstance() {
return healthMonitor;
}
private OrchestratorCommonArea orchestratorCommonArea = OrchestratorCommonArea.getInstance();
private Messages messages = orchestratorCommonArea.getSystemMessages();
private DuccWorkMap workMap = orchestratorCommonArea.getWorkMap();
private boolean isCancelJobExcessiveInitializationFailures(IDuccWorkJob job) {
String methodName = "isCancelJobExcessiveInitializationFailures";
logger.trace(methodName, null, messages.fetch("enter"));
boolean ckpt = false;
if(!job.isInitialized()) {
long count = job.getProcessInitFailureCount();
long limit = job.getProcessInitFailureLimit();
if(count >= limit) {
IRationale rationale = new Rationale("health monitor detected job initialization failures limit reached:"+limit);
StateManager.getInstance().jobTerminate(job, JobCompletionType.ProcessInitializationFailure, rationale, ProcessDeallocationType.JobCanceled);
logger.info(methodName, job.getDuccId(), JobCompletionType.ProcessInitializationFailure);
ckpt = true;
}
}
logger.trace(methodName, null, messages.fetch("exit"));
return ckpt;
}
private boolean isCancelJobCappedWithNoJobProcesses(IDuccWorkJob job) {
String methodName = "isCancelJobCappedWithNoJobProcesses";
logger.trace(methodName, null, messages.fetch("enter"));
boolean ckpt = false;
long count = job.getProcessInitFailureCount();
long cap = job.getProcessInitFailureCap();
long procs = job.getAliveProcessCount();
logger.debug(methodName, null, "fail.count:"+count+" "+"fail.cap:"+cap+" "+"alive.procs:"+procs);
if(count >= cap) {
if(job.getAliveProcessCount() == 0) {
IRationale rationale = new Rationale("health monitor detected no resources assigned and job initialization failures cap reached:"+cap);
StateManager.getInstance().jobTerminate(job, JobCompletionType.ProcessInitializationFailure, rationale, ProcessDeallocationType.JobCanceled);
logger.info(methodName, job.getDuccId(), JobCompletionType.ProcessInitializationFailure);
ckpt = true;
}
}
logger.trace(methodName, null, messages.fetch("exit"));
return ckpt;
}
private boolean isCancelJobExcessiveProcessFailures(IDuccWorkJob job) {
String methodName = "isCancelJobExcessiveProcessFailures";
logger.trace(methodName, null, messages.fetch("enter"));
boolean ckpt = false;
long processFailureCount = job.getProcessFailureCount();
if(processFailureCount > 0) {
long limit = job.getProcessFailureLimit();
if(job.isInitialized()) {
if(processFailureCount >= limit) {
IRationale rationale = new Rationale("health monitor detected job process failures limit reached:"+limit);
StateManager.getInstance().jobTerminate(job, JobCompletionType.ProcessFailure, rationale, ProcessDeallocationType.JobCanceled);
logger.info(methodName, job.getDuccId(), JobCompletionType.ProcessFailure);
ckpt = true;
}
}
else {
IRationale rationale = new Rationale("health monitor detected job process failure during initialization of first process");
StateManager.getInstance().jobTerminate(job, JobCompletionType.ProcessInitializationFailure, rationale, ProcessDeallocationType.JobCanceled);
logger.info(methodName, job.getDuccId(), JobCompletionType.ProcessInitializationFailure);
ckpt = true;
}
}
logger.trace(methodName, null, messages.fetch("exit"));
return ckpt;
}
private boolean isCancelJobDriverProcessFailed(IDuccWorkJob job) {
String methodName = "isCancelJobDriverProcessFailed";
logger.trace(methodName, null, messages.fetch("enter"));
boolean ckpt = false;
if(!job.isFinished()) {
DuccWorkPopDriver driver = job.getDriver();
IDuccProcessMap processMap = driver.getProcessMap();
if(processMap != null) {
Collection<IDuccProcess> processCollection = processMap.values();
Iterator<IDuccProcess> iterator = processCollection.iterator();
while(iterator.hasNext()) {
IDuccProcess process = iterator.next();
if(process.isComplete()) {
IRationale rationale = new Rationale("health monitor detected job driver failed unexpectedly");
StateManager.getInstance().jobTerminate(job, JobCompletionType.DriverProcessFailed, rationale, ProcessDeallocationType.JobCanceled);
logger.info(methodName, job.getDuccId(), JobCompletionType.DriverProcessFailed);
ckpt = true;
break;
}
}
}
if(job.getProcessMap().getAliveProcessCount() == 0) {
job.getSchedulingInfo().setWorkItemsDispatched("0");
}
}
else {
if(!job.getSchedulingInfo().getWorkItemsDispatched().equals("0")) {
job.getSchedulingInfo().setWorkItemsDispatched("0");
logger.info(methodName, job.getDuccId(), "dispatched set to 0");
ckpt = true;
}
}
logger.trace(methodName, null, messages.fetch("exit"));
return ckpt;
}
private boolean isDriverCompleted(IDuccWorkJob job) {
String methodName = "isDriverCompleted";
logger.trace(methodName, null, messages.fetch("enter"));
boolean ckpt = false;
if(job.isFinished()) {
if(!job.hasAliveProcess()) {
DuccWorkPopDriver driver = job.getDriver();
IDuccProcessMap processMap = driver.getProcessMap();
if(processMap != null) {
Collection<IDuccProcess> processCollection = processMap.values();
Iterator<IDuccProcess> iterator = processCollection.iterator();
while(iterator.hasNext()) {
IDuccProcess process = iterator.next();
if(!process.isDeallocated()) {
OrUtil.setResourceState(job, process, ResourceState.Deallocated);
process.setProcessDeallocationType(ProcessDeallocationType.JobCompleted);
logger.info(methodName, job.getDuccId(), process.getDuccId(), ProcessDeallocationType.JobCompleted);
ckpt = true;
}
else {
if(!process.isComplete()) {
String nodeName = process.getNodeIdentity().getName();
if(!NodeAccounting.getInstance().isAlive(nodeName)) {
process.advanceProcessState(ProcessState.Stopped);
logger.info(methodName, job.getDuccId(), process.getDuccId(), ProcessState.Stopped);
ckpt = true;
}
}
}
}
}
}
}
if(ckpt) {
OrchestratorHelper.jdDeallocate(job);
}
logger.trace(methodName, null, messages.fetch("exit"));
return ckpt;
}
private void ajudicateJobs() {
String methodName = "ajudicateJobs";
logger.trace(methodName, null, messages.fetch("enter"));
long t0 = System.currentTimeMillis();
try {
Set<DuccId> jobKeySet = workMap.getJobKeySet();
boolean ckpt = false;
for(DuccId jobId : jobKeySet) {
try {
IDuccWorkJob job = (IDuccWorkJob) WorkMapHelper.findDuccWork(workMap, jobId, this, methodName);
if(isDriverCompleted(job)) {
ckpt = true;
}
if(isCancelJobExcessiveProcessFailures(job)) {
ckpt = true;
}
else if(isCancelJobCappedWithNoJobProcesses(job)) {
ckpt = true;
}
else if(isCancelJobDriverProcessFailed(job)) {
ckpt = true;
}
long cap = job.getProcessInitFailureCap();
// if an initialization cap was specified
if(cap > 0) {
long initFails = job.getProcessInitFailureCount();
// if current number of initialization failures exceeds specified cap
if(initFails > cap) {
// set job's max processes to -1, indicating stop process expansion to RM
job.getSchedulingInfo().setLongProcessesMax(-1);
}
}
}
catch(Exception e) {
logger.error(methodName, null, e);
}
}
if(ckpt) {
OrchestratorCheckpoint.getInstance().saveState();
}
}
catch(Throwable t) {
logger.error(methodName, null, t);
}
long t1 = System.currentTimeMillis();
long elapsed = t1 - t0;
if(elapsed > Constants.SYNC_LIMIT) {
logger.debug(methodName, null, "elapsed msecs: "+elapsed);
}
logger.trace(methodName, null, messages.fetch("exit"));
return;
}
private void ajudicateServices() {
String methodName = "ajudicateServices";
logger.trace(methodName, null, messages.fetch("enter"));
long t0 = System.currentTimeMillis();
try {
Set<DuccId> serviceKeySet = workMap.getServiceKeySet();
boolean ckpt = false;
for(DuccId serviceId : serviceKeySet) {
try {
IDuccWorkJob service = (IDuccWorkJob) WorkMapHelper.findDuccWork(workMap, serviceId, this, methodName);
if(isCancelJobExcessiveProcessFailures(service)) {
ckpt = true;
}
else if(isCancelJobExcessiveInitializationFailures(service)) {
ckpt = true;
}
long cap = service.getProcessInitFailureCap();
// if an initialization cap was specified
if(cap > 0) {
long initFails = service.getProcessInitFailureCount();
// if current number of initialization failures exceeds specified cap
if(initFails > cap) {
// set job's max processes to -1, indicating stop process expansion to RM
service.getSchedulingInfo().setLongProcessesMax(-1);
}
}
}
catch(Exception e) {
logger.error(methodName, null, e);
}
}
if(ckpt) {
OrchestratorCheckpoint.getInstance().saveState();
}
}
catch(Throwable t) {
logger.error(methodName, null, t);
}
long t1 = System.currentTimeMillis();
long elapsed = t1 - t0;
if(elapsed > Constants.SYNC_LIMIT) {
logger.debug(methodName, null, "elapsed msecs: "+elapsed);
}
logger.trace(methodName, null, messages.fetch("exit"));
return;
}
public void ajudicate() {
ajudicateJobs();
ajudicateServices();
}
}