blob: 3a6a58984537f719b688d7bcd620723f9c627f52 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.orchestrator.maintenance;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.uima.ducc.common.NodeIdentity;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
import org.apache.uima.ducc.common.utils.DuccPropertiesResolver;
import org.apache.uima.ducc.common.utils.id.DuccId;
import org.apache.uima.ducc.transport.event.common.IDuccProcess;
public class NodeAccounting {
private static final DuccLogger logger = DuccLoggerComponents.getOrLogger(NodeAccounting.class.getName());
private static final DuccId jobid = null;
private static NodeAccounting instance = new NodeAccounting();
public static NodeAccounting getInstance() {
return instance;
}
private ConcurrentHashMap<String, Long> timeMap = new ConcurrentHashMap<String, Long>();
private long inventoryRate = 30 * 1000;
private long inventorySkip = 0;
private long heartbeatMissingTolerance = 3;
private boolean inventoryRateMessage = false;
private boolean inventorySkipMessage = false;
private long getRate() {
String methodName = "getRate";
long retVal = inventoryRate;
try {
String property_inventory_rate = DuccPropertiesResolver.getInstance().getFileProperty(DuccPropertiesResolver.ducc_agent_node_inventory_publish_rate);
if(property_inventory_rate == null) {
property_inventory_rate = ""+inventoryRate;
}
long property_value = Long.parseLong(property_inventory_rate.trim());
if(property_value != inventoryRate) {
inventoryRate = property_value;
logger.info(methodName, jobid, "rate:"+inventoryRate);
}
retVal = property_value;
}
catch(Throwable t) {
if(!inventoryRateMessage) {
inventoryRateMessage = true;
logger.warn(methodName, jobid, t);
}
}
return retVal;
}
private long getSkip() {
String methodName = "getSkip";
long retVal = inventorySkip;
try {
String property_inventory_skip = DuccPropertiesResolver.getInstance().getFileProperty(DuccPropertiesResolver.ducc_agent_node_inventory_publish_rate_skip);
if(property_inventory_skip == null) {
property_inventory_skip = ""+inventorySkip;
}
long property_value = Long.parseLong(property_inventory_skip.trim());
if(property_value != inventorySkip) {
inventorySkip = property_value;
logger.info(methodName, jobid, "skip:"+inventorySkip);
}
retVal = property_value;
}
catch(Throwable t) {
if(!inventorySkipMessage) {
inventorySkipMessage = true;
logger.warn(methodName, jobid, t);
}
}
return retVal;
}
private long getNodeMissingTime() {
String methodName = "getNodeMissingTime";
long retVal = inventoryRate * heartbeatMissingTolerance;
try {
long rate = getRate();
long skip = getSkip();
if(skip > 0) {
rate = rate * skip;
}
retVal = rate * heartbeatMissingTolerance;
}
catch(Throwable t) {
logger.error(methodName, jobid, t);
}
return retVal;
}
public void heartbeat(HashMap<DuccId,IDuccProcess> processMap) {
String location = "heartbeat";
try {
Iterator<DuccId> iterator = processMap.keySet().iterator();
while(iterator.hasNext()) {
DuccId duccId = iterator.next();
IDuccProcess process = processMap.get(duccId);
NodeIdentity nodeIdentity = process.getNodeIdentity();
String nodeName = nodeIdentity.getName();
heartbeat(nodeName);
break;
}
}
catch(Throwable t) {
logger.error(location, jobid, "");
}
}
public void heartbeat(String nodeName) {
String location = "heartbeat";
record(nodeName);
logger.debug(location, jobid, nodeName);
}
private void record(String nodeName) {
if(nodeName != null) {
Long value = new Long(System.currentTimeMillis());
timeMap.put(nodeName, value);
}
}
public boolean isAlive(String nodeName) {
String location = "isAlive";
boolean retVal = true;
try {
if(!timeMap.containsKey(nodeName)) {
record(nodeName);
}
long heartbeatTime = timeMap.get(nodeName);
long currentTime = System.currentTimeMillis();
long elapsed = currentTime - heartbeatTime;
if( elapsed > getNodeMissingTime() ) {
retVal = false;
logger.info(location, jobid, "down:"+nodeName+" elapsed:"+elapsed);
}
}
catch(Throwable t) {
logger.error(location, jobid, nodeName);
}
return retVal;
}
}