blob: a30cf364fcc9ab1c7d6b18b35fd25253cb940c51 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.ws;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.uima.ducc.common.boot.DuccDaemonRuntimeProperties.DaemonName;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
import org.apache.uima.ducc.common.utils.TimeStamp;
import org.apache.uima.ducc.transport.event.AbstractDuccEvent;
import org.apache.uima.ducc.transport.event.DuccEvent.EventType;
import org.apache.uima.ducc.ws.server.DuccWebProperties;
public class DuccDaemonsData {
private static DuccLogger duccLogger = DuccLoggerComponents.getWsLogger(DuccDaemonsData.class.getName());
private static DuccDaemonsData duccDaemonsData = new DuccDaemonsData();
private static ConcurrentHashMap<DaemonName,String> mapCurr = new ConcurrentHashMap<DaemonName,String>();
private static ConcurrentHashMap<DaemonName,String> mapPrev = new ConcurrentHashMap<DaemonName,String>();
private static ConcurrentHashMap<DaemonName,String> mapMax = new ConcurrentHashMap<DaemonName,String>();
private static ConcurrentHashMap<DaemonName,String> mapMaxTOD = new ConcurrentHashMap<DaemonName,String>();
private static ConcurrentHashMap<DaemonName,Long> eventSize = new ConcurrentHashMap<DaemonName,Long>();
private static ConcurrentHashMap<DaemonName,Long> eventSizeMax = new ConcurrentHashMap<DaemonName,Long>();
private static ConcurrentHashMap<DaemonName,String> eventSizeMaxTOD = new ConcurrentHashMap<DaemonName,String>();
public static DuccDaemonsData getInstance() {
return duccDaemonsData;
}
public void put(AbstractDuccEvent duccEvent) {
EventType eventType = duccEvent.getEventType();
DaemonName key;
switch(eventType) {
case ORCHESTRATOR_STATE:
key = DaemonName.Orchestrator;
putHeartbeat(key);
putEventSize(key, duccEvent);
break;
case PM_STATE:
key = DaemonName.ProcessManager;
putHeartbeat(key);
putEventSize(key, duccEvent);
break;
case RM_STATE:
key = DaemonName.ResourceManager;
putHeartbeat(key);
putEventSize(key, duccEvent);
break;
case SM_STATE:
key = DaemonName.ServiceManager;
putHeartbeat(key);
putEventSize(key, duccEvent);
break;
case WEBSERVER_STATE:
key = DaemonName.Webserver;
putHeartbeat(key);
putEventSize(key, duccEvent);
break;
default:
break;
}
}
public void putEventSize(DaemonName key, AbstractDuccEvent duccEvent) {
Long size = duccEvent.getEventSize();
eventSize.put(key, size);
Long prev;
if(eventSizeMax.containsKey(key)) {
prev = eventSizeMax.get(key);
}
else {
prev = new Long(0);
}
if(size > prev) {
eventSizeMax.put(key, size);
String timestamp = TimeStamp.getCurrentMillis();
eventSizeMaxTOD.put(key, timestamp);
}
}
public Long getEventSize(DaemonName key) {
Long retVal = new Long(0);
if(eventSize.containsKey(key)) {
retVal = eventSize.get(key);
}
return retVal;
}
public Long getEventSizeMax(DaemonName key) {
Long retVal = new Long(0);
if(eventSizeMax.containsKey(key)) {
retVal = eventSizeMax.get(key);
}
return retVal;
}
public String getEventSizeMaxTOD(DaemonName key) {
String retVal = "";
if(mapMaxTOD.containsKey(key)) {
retVal = eventSizeMaxTOD.get(key);
}
return retVal;
}
public void putHeartbeat(DaemonName key) {
String timestamp = TimeStamp.getCurrentMillis();
if(mapPrev.containsKey(key)) {
String t0 = mapPrev.get(key);
String t1 = timestamp;
long millis = TimeStamp.diffMillis(t1, t0);
if(mapMax.containsKey(key)) {
long max = Long.parseLong(mapMax.get(key));
if(millis > max) {
mapMax.put(key, ""+millis);
mapMaxTOD.put(key, t1);
}
}
else {
mapMax.put(key, ""+millis);
mapMaxTOD.put(key, t1);
}
}
if(mapCurr.containsKey(key)) {
mapPrev.put(key, mapCurr.get(key));
}
mapCurr.put(key, timestamp);
}
public String getHeartbeat(DaemonName key) {
String retVal = "";
if(mapCurr.containsKey(key)) {
String t1 = TimeStamp.getCurrentMillis();
String t0 = mapCurr.get(key);
long millis = TimeStamp.diffMillis(t1, t0);
retVal = ""+millis/1000;
}
return retVal;
}
public String getMaxHeartbeat(DaemonName key) {
String retVal = "";
if(mapMax.containsKey(key)) {
long max = Long.parseLong(mapMax.get(key));
retVal = ""+(max/1000);
}
return retVal;
}
public String getMaxHeartbeatTOD(DaemonName key) {
String retVal = "";
if(mapMaxTOD.containsKey(key)) {
retVal = mapMaxTOD.get(key);
}
return retVal;
}
/**
* @return number of millis since last receipt of self publication
*/
private long getWsTod() {
long retVal = Long.MAX_VALUE;
DaemonName key = DaemonName.Webserver;
if(mapCurr.containsKey(key)) {
String t1 = TimeStamp.getCurrentMillis();
String t0 = mapCurr.get(key);
long millis = TimeStamp.diffMillis(t1, t0);
retVal = millis;
}
return retVal;
}
/**
* @return number of millis after which webserver publication is considered late
*/
private long getMillisMIA() {
String methodName = "getMillisMIA";
long secondsMIA = -1;
Properties properties = DuccWebProperties.get();
String ws_rate = properties.getProperty("ducc.ws.state.publish.rate");
String ws_ratio = "1";
try {
long rate = Long.parseLong(ws_rate.trim());
long ratio = Long.parseLong(ws_ratio .trim());
secondsMIA = 3 * rate * ratio;
}
catch(Throwable t) {
duccLogger.debug(methodName, null, t);
}
return secondsMIA;
}
/**
*
* @return true if webserver self publications are on time, false otherwise
*/
public boolean isWsPublicationOntime() {
boolean retVal = true;
long expiry = getMillisMIA();
if(getWsTod() > expiry) {
retVal = false;
}
return retVal;
}
}