blob: a8fc2a42eec580a42fc084b2cf51dfc51e775cc5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.ws;
import java.io.File;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.camel.CamelContext;
import org.apache.uima.ducc.common.IDuccEnv;
import org.apache.uima.ducc.common.boot.DuccDaemonRuntimeProperties;
import org.apache.uima.ducc.common.boot.DuccDaemonRuntimeProperties.DaemonName;
import org.apache.uima.ducc.common.component.AbstractDuccComponent;
import org.apache.uima.ducc.common.config.CommonConfiguration;
import org.apache.uima.ducc.common.internationalization.Messages;
import org.apache.uima.ducc.common.main.DuccService;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
import org.apache.uima.ducc.common.utils.id.DuccId;
import org.apache.uima.ducc.transport.event.NodeMetricsUpdateDuccEvent;
import org.apache.uima.ducc.transport.event.OrchestratorStateDuccEvent;
import org.apache.uima.ducc.transport.event.PmStateDuccEvent;
import org.apache.uima.ducc.transport.event.RmStateDuccEvent;
import org.apache.uima.ducc.transport.event.SmStateDuccEvent;
import org.apache.uima.ducc.transport.event.common.IDuccWorkMap;
import org.apache.uima.ducc.ws.registry.ServicesRegistry;
import org.apache.uima.ducc.ws.self.message.WebServerStateDuccEvent;
import org.apache.uima.ducc.ws.server.DuccListeners;
import org.apache.uima.ducc.ws.server.DuccWebServer;
import org.apache.uima.ducc.ws.utils.DatedNodeMetricsUpdateDuccEvent;
public class WebServerComponent extends AbstractDuccComponent
implements IWebServer {
private static DuccLogger duccLogger = DuccLoggerComponents.getWsLogger(WebServerComponent.class.getName());
private static DuccId jobid = null;
private DuccWebServer duccWebServer = null;
private Messages duccMsg= Messages.getInstance();
private static AtomicInteger jobCount = new AtomicInteger(0);
private static AtomicInteger serviceCount = new AtomicInteger(0);
private static AtomicInteger reservationCount = new AtomicInteger(0);
private static AtomicLong updateLast = new AtomicLong(System.currentTimeMillis());
public static long updateIntervalSecondsInitial = 5;
public static long updateIntervalSecondsNormal = 60;
public static AtomicLong updateIntervalCount = new AtomicLong(0);
public static long updateIntervalLimit = 12;
public WebServerComponent(CamelContext context, CommonConfiguration common) {
super("WebServer",context);
String methodName = "WebServerComponent";
duccLogger.info(methodName, jobid, "##### boot #####");
String cp = System.getProperty("java.class.path");
String[] cpArray = cp.split(":");
int lc = 0;
for(String line : cpArray) {
duccLogger.trace(methodName, jobid, "cp."+lc+" "+line);
lc++;
}
String[] propertyNames = { "ducc.broker.url" };
for(String property : propertyNames) {
duccLogger.info(methodName, jobid, property+"="+System.getProperty(property));
}
duccLogger.info(methodName, jobid, System.getProperty("ducc.broker.url"));
duccWebServer = new DuccWebServer(common);
init();
}
public void start(DuccService service, String[] args) throws Exception {
super.start(service, args);
DuccDaemonRuntimeProperties.getInstance().boot(DaemonName.Webserver,getProcessJmxUrl());
}
public DuccLogger getLogger() {
return duccLogger;
}
public void webServerStart() {
String methodName = "webServerStart";
try {
duccWebServer.start();
duccLogger.info(methodName, jobid, "webserver started");
} catch (Exception e) {
duccLogger.error(methodName, jobid, e);
}
}
public void webServerStop() {
String methodName = "webServerStop";
try {
duccWebServer.stop();
duccLogger.info(methodName, jobid, "webserver stopped");
} catch (Exception e) {
duccLogger.error(methodName, jobid, e);
}
}
private void init() {
String methodName = "init";
duccLogger.trace(methodName, jobid, duccMsg.fetch("enter"));
File file = new File(IDuccEnv.DUCC_LOGS_WEBSERVER_DIR);
file.mkdirs();
webServerStart();
duccLogger.trace(methodName, jobid, duccMsg.fetch("exit"));
}
public void update(OrchestratorStateDuccEvent duccEvent) {
String methodName = "update";
duccLogger.trace(methodName, jobid, duccMsg.fetch("enter"));
duccLogger.debug(methodName, jobid, duccMsg.fetchLabel("received")+"OrchestratorStateDuccEvent");
DuccDaemonsData.getInstance().put(duccEvent);
IDuccWorkMap wm = duccEvent.getWorkMap();
boolean change = false;
int count;
count = wm.getJobCount();
if(count != jobCount.get()) {
jobCount.set(count);
change = true;
}
count = wm.getReservationCount();
if(count != reservationCount.get()) {
reservationCount.set(count);
change = true;
}
count = wm.getServiceCount();
if(count !=serviceCount.get()) {
serviceCount.set(count);
change = true;
}
if(change) {
duccLogger.info(methodName, jobid, duccMsg.fetchLabel("jobs")+jobCount.get()+" "+duccMsg.fetchLabel("reservations")+reservationCount.get()+" "+duccMsg.fetchLabel("services")+serviceCount.get());
}
DuccData.getInstance().put(wm);
DuccPlugins.getInstance().update(wm);
DuccListeners.getInstance().update(duccEvent);
Map<String,Long> map = Distiller.deriveMachineMemoryInUse(duccEvent);
report(map);
duccLogger.trace(methodName, jobid, duccMsg.fetch("exit"));
}
private void report(Map<String,Long> map) {
String location = "report";
for(Entry<String, Long> entry : map.entrySet()) {
duccLogger.trace(location, jobid, entry.getKey()+"="+entry.getValue());
}
}
/**
* Sort machines if interval has elapsed (60 seconds)
*
* Note: Use an initial short interval (e.g. every 5 seconds)
* when the Web Server first boots in order to populate quickly.
* After N (e.g. 12) quick recalculations, revert to the normal
* interval (e.g. every 60 seconds).
*/
private void sortMachines() {
String methodName = "sortMachines";
long last = updateLast.get();
long updateIntervalMilliSeconds = updateIntervalSecondsNormal * 1000;
if(updateIntervalCount.get() < updateIntervalLimit) {
updateIntervalMilliSeconds = updateIntervalSecondsInitial * 1000;
}
long deadline = last + updateIntervalMilliSeconds;
long now = System.currentTimeMillis();
if(now > deadline) {
boolean success = updateLast.compareAndSet(last, now);
if(success) {
DuccMachinesData.getInstance().updateSortedMachines();
updateIntervalCount.incrementAndGet();
duccLogger.trace(methodName, jobid, "count: "+updateIntervalCount.get());
}
else {
duccLogger.trace(methodName, jobid, "missed: "+"last="+last+" "+"now="+now);
}
}
else {
duccLogger.trace(methodName, jobid, "togo: "+(deadline - now)/1000);
}
}
public void update(NodeMetricsUpdateDuccEvent duccEvent) {
String methodName = "update";
duccLogger.trace(methodName, jobid, duccMsg.fetch("enter"));
duccLogger.trace(methodName, jobid, duccMsg.fetchLabel("received")+"NodeMetricsUpdateDuccEvent");
DuccMachinesData dmd = DuccMachinesData.getInstance();
DatedNodeMetricsUpdateDuccEvent datedEvent = new DatedNodeMetricsUpdateDuccEvent(duccEvent);
dmd.put(datedEvent);
sortMachines();
duccLogger.trace(methodName, jobid, duccMsg.fetch("exit"));
}
public void update(RmStateDuccEvent duccEvent) {
String methodName = "update";
duccLogger.trace(methodName, jobid, duccMsg.fetch("enter"));
duccLogger.debug(methodName, jobid, duccMsg.fetchLabel("received")+"RmStateDuccEvent");
DuccDaemonsData.getInstance().put(duccEvent);
duccLogger.trace(methodName, jobid, duccMsg.fetch("exit"));
}
public void update(SmStateDuccEvent duccEvent) {
String methodName = "update";
duccLogger.trace(methodName, jobid, duccMsg.fetch("enter"));
duccLogger.debug(methodName, jobid, duccMsg.fetchLabel("received")+"SmStateDuccEvent");
DuccDaemonsData.getInstance().put(duccEvent);
ServicesRegistry.getInstance().update();
duccLogger.trace(methodName, jobid, duccMsg.fetch("exit"));
}
public void update(PmStateDuccEvent duccEvent) {
String methodName = "update";
duccLogger.trace(methodName, jobid, duccMsg.fetch("enter"));
duccLogger.debug(methodName, jobid, duccMsg.fetchLabel("received")+"PmStateDuccEvent");
DuccDaemonsData.getInstance().put(duccEvent);
duccLogger.trace(methodName, jobid, duccMsg.fetch("exit"));
}
/**
* process the received self-publication (broker is alive!)
*/
public void update(WebServerStateDuccEvent duccEvent) {
String methodName = "update";
duccLogger.trace(methodName, jobid, duccMsg.fetch("enter"));
duccLogger.debug(methodName, jobid, duccMsg.fetchLabel("received")+"PmStateDuccEvent");
DuccDaemonsData.getInstance().put(duccEvent);
duccLogger.trace(methodName, jobid, duccMsg.fetch("exit"));
}
}