blob: a3912916a963827fde1958cf3f6475bbef2fb97a [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.airavata.server;
import ch.qos.logback.classic.LoggerContext;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.logging.kafka.KafkaAppender;
import org.apache.airavata.common.utils.*;
import org.apache.airavata.common.utils.ApplicationSettings.ShutdownStrategy;
import org.apache.airavata.common.utils.IServer.ServerStatus;
import org.apache.airavata.common.utils.StringUtil.CommandLineParameters;
import org.apache.commons.cli.ParseException;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.slf4j.ILoggerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class ServerMain {
private static List<IServer> servers;
private static final String SERVERS_KEY="servers";
private final static Logger logger = LoggerFactory.getLogger(ServerMain.class);
private static boolean serversLoaded=false;
private static final String stopFileNamePrefix = "server_stop";
private static int serverPID=-1;
private static final String serverStartedFileNamePrefix = "server_start";
private static boolean systemShutDown=false;
private static String STOP_COMMAND_STR = "stop";
private static final String ALL_IN_ONE = "all";
private static final String API_ORCH = "api-orch";
private static final String EXECUTION = "execution";
// server names
private static final String API_SERVER = "apiserver";
private static final String CREDENTIAL_STORE = "credentialstore";
private static final String REGISTRY_SERVER = "regserver";
private static final String SHARING_SERVER = "sharing_server";
private static final String GFAC_SERVER = "gfac";
private static final String ORCHESTRATOR = "orchestrator";
private static final String PROFILE_SERVICE = "profile_service";
private static final String DB_EVENT_MANAGER = "db_event_manager";
private static ServerCnxnFactory cnxnFactory;
// private static boolean shutdownHookCalledBefore=false;
static{
servers = new ArrayList<IServer>();
}
private static void loadServers(String serverNames) {
try {
if (serverNames != null) {
List<String> serversList = handleServerDependencies(serverNames);
for (String serverString : serversList) {
serverString = serverString.trim();
String serverClassName = ServerSettings.getSetting(serverString);
Class<?> classInstance;
try {
classInstance = ServerMain.class
.getClassLoader().loadClass(
serverClassName);
servers.add((IServer) classInstance.newInstance());
} catch (ClassNotFoundException e) {
logger.error("Error while locating server implementation \"" + serverString + "\"!!!", e);
} catch (InstantiationException e) {
logger.error("Error while initiating server instance \"" + serverString + "\"!!!", e);
} catch (IllegalAccessException e) {
logger.error("Error while initiating server instance \"" + serverString + "\"!!!", e);
} catch (ClassCastException e) {
logger.error("Invalid server \"" + serverString + "\"!!!", e);
}
}
} else {
logger.warn("No server name specify to start, use -h command line option to view help menu ...");
}
} catch (ApplicationSettingsException e) {
logger.error("Error while retrieving server list!!!",e);
}
serversLoaded=true;
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
setSystemShutDown(true);
stopAllServers();
}
});
}
private static List<String> handleServerDependencies(String serverNames) {
List<String> serverList = new ArrayList<>(Arrays.asList(serverNames.split(",")));
if (serverList.indexOf(ALL_IN_ONE) > -1) {
serverList.clear();
serverList.add(DB_EVENT_MANAGER); // DB Event Manager should start before everything
serverList.add(REGISTRY_SERVER); // registry server should start before everything else
serverList.add(CREDENTIAL_STORE); // credential store should start before api server
serverList.add(SHARING_SERVER);
serverList.add(API_SERVER);
serverList.add(ORCHESTRATOR);
serverList.add(GFAC_SERVER);
serverList.add(PROFILE_SERVICE);
} else if (serverList.indexOf(API_ORCH) > -1) {
serverList.clear();
serverList.add(DB_EVENT_MANAGER); // DB Event Manager should start before everything
serverList.add(REGISTRY_SERVER); // registry server should start before everything else
serverList.add(CREDENTIAL_STORE); // credential store should start before api server
serverList.add(SHARING_SERVER);
serverList.add(API_SERVER);
serverList.add(ORCHESTRATOR);
serverList.add(PROFILE_SERVICE);
} else if (serverList.indexOf(EXECUTION) > -1) {
serverList.clear();
serverList.add(GFAC_SERVER);
} else {
// registry server should start before everything
int regPos = serverList.indexOf(REGISTRY_SERVER);
if (regPos > 0) {
String temp = serverList.get(0);
serverList.set(0, serverList.get(regPos));
serverList.set(regPos, temp);
}
// credential store should start before api server
int credPos = serverList.indexOf(CREDENTIAL_STORE);
int apiPos = serverList.indexOf(API_SERVER);
if (credPos >= 0 && apiPos >= 0 && (credPos > apiPos)) {
String temp = serverList.get(apiPos);
serverList.set(apiPos, serverList.get(credPos));
serverList.set(credPos, temp);
}
}
return serverList;
}
// private static void addSecondaryShutdownHook(){
// Runtime.getRuntime().addShutdownHook(new Thread(){
// @Override
// public void run() {
// System.out.print("Graceful shutdown attempt is still active. Do you want to exit instead? (y/n)");
// String command=System.console().readLine().trim().toLowerCase();
// if (command.equals("yes") || command.equals("y")){
// System.exit(1);
// }
// }
// });
// }
public static void main(String args[]) throws ParseException, IOException, AiravataException {
ServerSettings.mergeSettingsCommandLineArgs(args);
ServerSettings.setServerRoles(ApplicationSettings.getSetting(SERVERS_KEY, "all").split(","));
if (ServerSettings.isEnabledKafkaLogging()) {
final ILoggerFactory iLoggerFactory = LoggerFactory.getILoggerFactory();
if (iLoggerFactory instanceof LoggerContext) {
final KafkaAppender kafkaAppender = new KafkaAppender(ServerSettings.getKafkaBrokerList(),
ServerSettings.getKafkaTopicPrefix());
kafkaAppender.setContext((LoggerContext) iLoggerFactory);
kafkaAppender.setName("kafka-appender");
kafkaAppender.clearAllFilters();
kafkaAppender.start();
// Until AIRAVATA-2073 filter org.apache.kafka logs
((LoggerContext) iLoggerFactory).getLogger("org.apache.airavata").addAppender(kafkaAppender);
((LoggerContext) iLoggerFactory).getLogger("org.apache.zookeeper").addAppender(kafkaAppender);
((LoggerContext) iLoggerFactory).getLogger("org.apache.derby").addAppender(kafkaAppender);
((LoggerContext) iLoggerFactory).getLogger("org.apache.commons").addAppender(kafkaAppender);
((LoggerContext) iLoggerFactory).getLogger("org.apache.thrift").addAppender(kafkaAppender);
((LoggerContext) iLoggerFactory).getLogger("com").addAppender(kafkaAppender);
((LoggerContext) iLoggerFactory).getLogger("net").addAppender(kafkaAppender);
} else {
logger.warn("Kafka logging is enabled but cannot find logback LoggerContext, found", iLoggerFactory.getClass().toString());
throw new AiravataException("Kafka logging is enabled but cannot find logback LoggerContext");
}
} else {
logger.info("Kafka logging is disabled in airavata server configurations");
}
CommandLineParameters commandLineParameters = StringUtil.getCommandLineParser(args);
if (commandLineParameters.getArguments().contains(STOP_COMMAND_STR)){
performServerStopRequest(commandLineParameters);
}else{
AiravataZKUtils.startEmbeddedZK(cnxnFactory);
performServerStart(args);
}
}
private static void performServerStart(String[] args) {
setServerStarted();
logger.info("Airavata server instance starting...");
for (String string : args) {
logger.info("Server Arguments: " + string);
}
String serverNames;
try {
serverNames = ApplicationSettings.getSetting(SERVERS_KEY);
startAllServers(serverNames);
} catch (ApplicationSettingsException e1) {
logger.error("Error finding servers property");
}
while(!hasStopRequested()){
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
stopAllServers();
}
}
if (hasStopRequested()){
ServerSettings.setStopAllThreads(true);
stopAllServers();
ShutdownStrategy shutdownStrategy;
try {
shutdownStrategy = ServerSettings.getShutdownStrategy();
} catch (Exception e) {
String strategies="";
for(ShutdownStrategy s:ShutdownStrategy.values()){
strategies+="/"+s.toString();
}
logger.warn(e.getMessage());
logger.warn("Valid shutdown options are : "+strategies.substring(1));
shutdownStrategy=ShutdownStrategy.SELF_TERMINATE;
}
if (shutdownStrategy==ShutdownStrategy.SELF_TERMINATE) {
System.exit(0);
}
}
}
private static void performServerStopRequest(
CommandLineParameters commandLineParameters) throws IOException {
// deleteOldStartRecords();
String serverIndexOption = "serverIndex";
if (commandLineParameters.getParameters().containsKey(serverIndexOption)){
serverPID=Integer.parseInt(commandLineParameters.getParameters().get(serverIndexOption));
}
if (isServerRunning()) {
logger.info("Requesting airavata server"+(serverPID==-1? "(s)":" instance "+serverPID)+" to stop...");
requestStop();
while(isServerRunning()){
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
logger.info("Server"+(serverPID==-1? "(s)":" instance "+serverPID)+" stopped!!!");
}else{
logger.error("Server"+(serverPID==-1? "":" instance "+serverPID)+" is not running!!!");
}
if (ServerSettings.isEmbeddedZK()) {
cnxnFactory.shutdown();
}
}
@SuppressWarnings("resource")
private static void requestStop() throws IOException {
File file = new File(getServerStopFileName());
file.createNewFile();
new RandomAccessFile(file, "rw").getChannel().lock();
file.deleteOnExit();
}
private static boolean hasStopRequested(){
return isSystemShutDown() || new File(getServerStopFileName()).exists() || new File(stopFileNamePrefix).exists();
}
private static String getServerStopFileName() {
return stopFileNamePrefix;
}
private static void deleteOldStopRequests(){
File[] files = new File(".").listFiles();
for (File file : files) {
if (file.getName().contains(stopFileNamePrefix)){
file.delete();
}
}
}
// private static void deleteOldStartRecords(){
// File[] files = new File(".").listFiles();
// for (File file : files) {
// if (file.getName().contains(serverStartedFileNamePrefix)){
// try {
// new FileOutputStream(file);
// file.delete();
// } catch (Exception e) {
// //file is locked which means there's an active process using it
// }
// }
// }
// }
private static boolean isServerRunning(){
if (serverPID==-1){
String[] files = new File(".").list();
for (String file : files) {
if (file.contains(serverStartedFileNamePrefix)){
return true;
}
}
return false;
}else{
return new File(getServerStartedFileName()).exists();
}
}
@SuppressWarnings({ "resource" })
private static void setServerStarted(){
try {
serverPID = getPID();
deleteOldStopRequests();
File serverStartedFile = null;
serverStartedFile = new File(getServerStartedFileName());
serverStartedFile.createNewFile();
serverStartedFile.deleteOnExit();
new RandomAccessFile(serverStartedFile,"rw").getChannel().lock();
} catch (FileNotFoundException e) {
logger.error(e.getMessage(), e);
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
private static String getServerStartedFileName() {
return new File(new File(System.getenv("AIRAVATA_HOME"),"bin"),serverStartedFileNamePrefix+"_"+Integer.toString(serverPID)).toString();
}
public static void stopAllServers() {
//stopping should be done in reverse order of starting the servers
for(int i=servers.size()-1;i>=0;i--){
try {
servers.get(i).stop();
waitForServerToStop(servers.get(i),null);
} catch (Exception e) {
logger.error("Server Stop Error:",e);
}
}
}
public static void startAllServers(String serversNames) {
if (!serversLoaded){
loadServers(serversNames);
}
for (IServer server : servers) {
try {
server.configure();
server.start();
waitForServerToStart(server,null);
} catch (Exception e) {
logger.error("Server Start Error:",e);
}
}
}
private static final int SERVER_STATUS_CHANGE_WAIT_INTERVAL=500;
private static void waitForServerToStart(IServer server,Integer maxWait) throws Exception {
int count=0;
// if (server.getStatus()==ServerStatus.STARTING) {
// logger.info("Waiting for " + server.getName() + " to start...");
// }
while(server.getStatus()==ServerStatus.STARTING && (maxWait==null || count<maxWait)){
Thread.sleep(SERVER_STATUS_CHANGE_WAIT_INTERVAL);
count+=SERVER_STATUS_CHANGE_WAIT_INTERVAL;
}
if (server.getStatus()!= ServerStatus.STARTED){
logger.error("The "+server.getName()+" did not start!!!");
}
}
private static void waitForServerToStop(IServer server,Integer maxWait) throws Exception {
int count=0;
if (server.getStatus()==ServerStatus.STOPING) {
logger.info("Waiting for " + server.getName() + " to stop...");
}
//we are doing hasStopRequested() check because while we are stuck in the loop to stop there could be a forceStop request
while(server.getStatus()==ServerStatus.STOPING && (maxWait==null || count<maxWait)){
Thread.sleep(SERVER_STATUS_CHANGE_WAIT_INTERVAL);
count+=SERVER_STATUS_CHANGE_WAIT_INTERVAL;
}
if (server.getStatus()!=ServerStatus.STOPPED){
logger.error("Error stopping the "+server.getName()+"!!!");
}
}
private static boolean isSystemShutDown() {
return systemShutDown;
}
private static void setSystemShutDown(boolean systemShutDown) {
ServerMain.systemShutDown = systemShutDown;
}
private static int getPID(){
try {
java.lang.management.RuntimeMXBean runtime = java.lang.management.ManagementFactory
.getRuntimeMXBean();
java.lang.reflect.Field jvm = runtime.getClass()
.getDeclaredField("jvm");
jvm.setAccessible(true);
sun.management.VMManagement mgmt = (sun.management.VMManagement) jvm
.get(runtime);
java.lang.reflect.Method pid_method = mgmt.getClass()
.getDeclaredMethod("getProcessId");
pid_method.setAccessible(true);
int pid = (Integer) pid_method.invoke(mgmt);
return pid;
} catch (Exception e) {
return -1;
}
}
}