blob: 0c00a0b8c73fc7c33723517f28722d297d321f59 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.ws.server;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.uima.ducc.common.CancelReasons.CancelReason;
import org.apache.uima.ducc.common.json.MonitorInfo;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
import org.apache.uima.ducc.common.utils.id.DuccId;
import org.apache.uima.ducc.transport.event.OrchestratorStateDuccEvent;
import org.apache.uima.ducc.transport.event.cli.JobRequestProperties;
import org.apache.uima.ducc.transport.event.cli.SpecificationProperties;
import org.apache.uima.ducc.transport.event.common.DuccWorkReservation;
import org.apache.uima.ducc.transport.event.common.IDuccProcess;
import org.apache.uima.ducc.transport.event.common.IDuccState.JobState;
import org.apache.uima.ducc.transport.event.common.IDuccState.ReservationState;
import org.apache.uima.ducc.transport.event.common.IDuccWork;
import org.apache.uima.ducc.transport.event.common.IDuccWorkMap;
import org.apache.uima.ducc.transport.event.common.IRationale;
import org.apache.uima.ducc.ws.DuccData;
import org.apache.uima.ducc.ws.authentication.DuccAsUser;
public class DuccWebMonitorReservation {
private static DuccLogger duccLogger = DuccLoggerComponents.getWsLogger(DuccWebMonitorReservation.class.getName());
private static DuccId jobid = null;
private ConcurrentHashMap<DuccId,MonitorInfo> mMap = new ConcurrentHashMap<DuccId,MonitorInfo>();
private ConcurrentHashMap<DuccId,TrackingInfo> tMap = new ConcurrentHashMap<DuccId,TrackingInfo>();
private ConcurrentHashMap<DuccId,Long> cMap = new ConcurrentHashMap<DuccId,Long>();
private long millisPerMinute = 60*1000;
private long timeoutMillis;
protected DuccWebMonitorReservation(long timeoutMillis) {
this.timeoutMillis = timeoutMillis;
}
protected void monitor(OrchestratorStateDuccEvent duccEvent) {
String location = "monitor";
duccLogger.trace(location, jobid, "enter");
IDuccWorkMap dwm = duccEvent.getWorkMap();
int size = dwm.getReservationKeySet().size();
duccLogger.debug(location, jobid, "reservations: "+size);
Iterator<DuccId> iterator;
ArrayList<DuccId> gone = new ArrayList<DuccId>();
iterator = mMap.keySet().iterator();
while( iterator.hasNext() ) {
DuccId duccId = iterator.next();
gone.add(duccId);
}
long expiryMillis = System.currentTimeMillis()+timeoutMillis+1;
iterator = dwm.getReservationKeySet().iterator();
while( iterator.hasNext() ) {
DuccId duccId = iterator.next();
IDuccWork dw = (IDuccWork)dwm.findDuccWork(duccId);
gone.remove(duccId);
if(!mMap.containsKey(duccId)) {
MonitorInfo monitorInfo = new MonitorInfo();
mMap.putIfAbsent(duccId, monitorInfo);
duccLogger.info(location, duccId, "monitor start");
if(!tMap.containsKey(duccId)) {
if(dw.isCancelOnInterrupt()) {
TrackingInfo ti = new TrackingInfo();
ti.time = expiryMillis;
ti.user = dw.getStandardInfo().getUser();
tMap.putIfAbsent(duccId,ti);
duccLogger.info(location, duccId, "auto-cancel on");
}
else {
duccLogger.info(location, duccId, "auto-cancel off");
}
}
}
DuccWorkReservation dwr = (DuccWorkReservation) dw;
MonitorInfo monitorInfo = mMap.get(duccId);
ArrayList<String> stateSequence = monitorInfo.stateSequence;
String state = dwr.getReservationState().toString();
if(!stateSequence.contains(state)) {
duccLogger.info(location, duccId, "state: "+state);
stateSequence.add(state);
}
String text = null;
String rmReason = dwr.getRmReason();
if(rmReason != null) {
text = rmReason;
}
IRationale rationale = dwr.getCompletionRationale();
if (rationale != null && rationale.isSpecified()) {
text = rationale.getText();
}
if (text != null) {
monitorInfo.rationale = text;
}
}
iterator = gone.iterator();
while( iterator.hasNext() ) {
DuccId duccId = iterator.next();
mMap.remove(duccId);
tMap.remove(duccId);
duccLogger.info(location, duccId, "monitor stop");
}
duccLogger.trace(location, jobid, "exit");
}
protected String getCode(Map<DuccId, IDuccProcess> map) {
String code = "?";
if(map != null) {
Iterator<DuccId> iterator = map.keySet().iterator();
while(iterator.hasNext()) {
DuccId key = iterator.next();
IDuccProcess process = map.get(key);
code = ""+process.getProcessExitCode();
break;
}
}
return code;
}
protected DuccId getKey(String jobId) {
DuccId retVal = null;
Enumeration<DuccId> keys = mMap.keys();
while(keys.hasMoreElements()) {
DuccId duccId = keys.nextElement();
String mapId = ""+duccId.getFriendly();
if(mapId.equals(jobId)) {
retVal = duccId;
break;
}
}
return retVal;
}
public MonitorInfo renew(String jobId, AtomicInteger updateCounter) {
String location = "renew";
duccLogger.trace(location, jobid, "enter");
MonitorInfo monitorInfo = new MonitorInfo();
int countAtArrival = updateCounter.get();
int countAtPresent = countAtArrival;
int sleepSecondsMax = 3*60;
DuccId duccId = getKey(jobId);
if(duccId == null) {
int sleepSeconds = 0;
duccLogger.info(location, duccId, "Waiting for update...");
while(duccId == null) {
try {
duccLogger.debug(location, duccId, "Waiting continues...");
Thread.sleep(1000);
sleepSeconds += 1;
if(sleepSeconds > sleepSecondsMax) {
break;
}
countAtPresent = updateCounter.get();
if((countAtPresent-countAtArrival) > 2) {
break;
}
duccId = getKey(jobId);
}
catch(Exception e) {
}
}
duccLogger.info(location, duccId, "Waiting complete.");
duccId = getKey(jobId);
}
if(duccId != null) {
monitorInfo = mMap.get(duccId);
if(tMap.containsKey(duccId)) {
long expiryMillis = System.currentTimeMillis()+timeoutMillis+1;
TrackingInfo ti = tMap.get(duccId);
ti.time = expiryMillis;
duccLogger.info(location, duccId, "auto-cancel expiry extended");
}
}
else {
try {
int iJobId = Integer.parseInt(jobId);
duccId = new DuccId(iJobId);
duccLogger.info(location, duccId, "not found");
}
catch(Exception e) {
duccLogger.error(location, jobid, e);
}
}
DuccData duccData = DuccData.getInstance();
IDuccWork dw = duccData.getReservation(duccId);
DuccWorkReservation dwr = (DuccWorkReservation) dw;
ArrayList<String> stateSequence = monitorInfo.stateSequence;
ReservationState reservationState = dwr.getReservationState();
if(reservationState != null) {
String state = reservationState.toString();
if(!stateSequence.contains(state)) {
duccLogger.info(location, duccId, "state: "+state);
stateSequence.add(state);
}
}
monitorInfo.nodes = dwr.getNodes();
if(monitorInfo.nodes != null) {
StringBuffer sb = new StringBuffer();
for(String node : monitorInfo.nodes) {
sb.append(node);
sb.append(" ");
}
duccLogger.debug(location, duccId, "nodes: "+sb);
}
String text = null;
String rmReason = dwr.getRmReason();
if(rmReason != null) {
text = rmReason;
}
IRationale rationale = dwr.getCompletionRationale();
if (rationale != null && rationale.isSpecified()) {
text = rationale.getText();
}
if (text != null) {
monitorInfo.rationale = text;
}
duccLogger.trace(location, jobid, "exit");
return monitorInfo;
}
protected Long getExpiry(DuccId duccId) {
String location = "getExpiry";
duccLogger.trace(location, duccId, "enter");
Long retVal = null;
if(!isCanceled(duccId)) {
if(isCancelable(duccId)) {
ConcurrentHashMap<DuccId,Long> eMap = getExpiryMap();
if(eMap.containsKey(duccId)) {
retVal = eMap.get(duccId);
}
}
}
duccLogger.trace(location, duccId, "exit");
return retVal;
}
public ConcurrentHashMap<DuccId,Long> getExpiryMap() {
String location = "getExpiryMap";
duccLogger.trace(location, jobid, "enter");
ConcurrentHashMap<DuccId,Long> eMap = new ConcurrentHashMap<DuccId,Long>();
long nowMillis = System.currentTimeMillis();
Enumeration<DuccId> keys = tMap.keys();
while(keys.hasMoreElements()) {
long minutesLeft = 0;
DuccId duccId = keys.nextElement();
TrackingInfo ti = tMap.get(duccId);
long expiryMillis = ti.time;
if(nowMillis < expiryMillis) {
minutesLeft = (expiryMillis - nowMillis) / millisPerMinute;
}
eMap.put(duccId, minutesLeft);
}
duccLogger.trace(location, jobid, "exit");
return eMap;
}
protected boolean isCanceled(DuccId duccId) {
return cMap.containsKey(duccId);
}
private boolean isCancelable(DuccId duccId) {
String location = "isCancelable";
duccLogger.trace(location, duccId, "enter");
boolean retVal = false;
if(!cMap.containsKey(duccId)) {
MonitorInfo monitorInfo = mMap.get(duccId);
if(monitorInfo != null) {
ArrayList<String> stateSequence = monitorInfo.stateSequence;
if(stateSequence != null) {
if(stateSequence.contains(JobState.Completing.toString())) {
duccLogger.debug(location, duccId, "state: <uncancelable> "+stateSequence);
}
else if(stateSequence.contains(JobState.Completed.toString())) {
duccLogger.debug(location, duccId, "state: <uncancelable> "+stateSequence);
}
else {
duccLogger.debug(location, duccId, "state: <cancelable> "+stateSequence);
retVal = true;
}
}
else {
duccLogger.warn(location, duccId, "stateSequence: <null>");
}
}
else {
duccLogger.warn(location, duccId, "monitorInfo: <null>");
}
}
else {
duccLogger.debug(location, duccId, "already canceled");
}
duccLogger.trace(location, duccId, "exit");
return retVal;
}
protected void cancel(DuccId duccId, String userId) {
String location = "cancel";
duccLogger.trace(location, jobid, "enter");
duccLogger.info(location, duccId, userId);
String java = "/bin/java";
String jhome = System.getProperty("java.home");
String cp = System.getProperty("java.class.path");
String jclass = "org.apache.uima.ducc.cli.DuccReservationCancel";
String arg1 = "--"+JobRequestProperties.key_id;
String arg2 = ""+duccId;
String arg3 = "--"+SpecificationProperties.key_reason;
String reason = CancelReason.MonitorPingOverdue.getText();
String arg4 = "\""+reason+"\"";
String[] arglistUser = { "-u", userId, "--", jhome+java, "-cp", cp, jclass, arg1, arg2, arg3, arg4 };
String result = DuccAsUser.duckling(userId, arglistUser);
duccLogger.warn(location, duccId, result);
cMap.put(duccId, new Long(System.currentTimeMillis()));
tMap.remove(duccId);
duccLogger.trace(location, jobid, "exit");
}
protected void canceler(long nowMillis) {
String location = "canceler";
duccLogger.trace(location, jobid, "enter");
Enumeration<DuccId> keys = tMap.keys();
while(keys.hasMoreElements()) {
DuccId duccId = keys.nextElement();
TrackingInfo ti = tMap.get(duccId);
long expiryMillis = ti.time;
if(nowMillis > expiryMillis) {
if(isCancelable(duccId)) {
cancel(duccId, ti.user);
}
else {
duccLogger.debug(location, duccId, "not cancelable");
}
}
}
duccLogger.trace(location, jobid, "exit");
}
}