blob: e5e72fab7c1352abc22b688d093d70a4013b4cd5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.core.main;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* A class to launch any service by name.
*
* It's designed to be subclassed for custom entry points.
*
*
* Workflow
* <ol>
* <li>An instance of the class is created</li>
* <li>If it implements RunService, it is given the binding args off the CLI</li>
* <li>Its service.init() and service.start() methods are called.</li>
* <li>If it implements RunService, runService() is called and its return
* code used as the exit code.</li>
* <li>Otherwise: it waits for the service to stop, assuming in its start() method
* it begins work</li>
* <li>If an exception returned an exit code, that becomes the exit code of the
* command.</li>
* </ol>
* Error and warning messages are logged to stderr. Why? If the classpath
* is wrong & logger configurations not on it, then no error messages by
* the started app will be seen and the caller is left trying to debug
* using exit codes.
*
*/
@SuppressWarnings("UseOfSystemOutOrSystemErr")
public class ServiceLauncher<S extends Service>
implements LauncherExitCodes, IrqHandler.Interrupted {
private static final Log LOG = LogFactory.getLog(ServiceLauncher.class);
protected static final int PRIORITY = 30;
public static final String NAME = "ServiceLauncher";
/**
* name of class for entry point strings: {@value}
*/
public static final String ENTRY_POINT =
"org.apache.hadoop.yarn.service.launcher." + NAME;
public static final String USAGE_MESSAGE =
"Usage: " + NAME + " classname [--conf <conf file>] <service arguments> | ";
/**
* Name of the "--conf" argument.
*/
public static final String ARG_CONF = "--conf";
static final int SHUTDOWN_TIME_ON_INTERRUPT = 30 * 1000;
private volatile S service;
private int serviceExitCode;
private final List<IrqHandler> interruptHandlers = new ArrayList<>(1);
private Configuration configuration;
private String serviceClassName;
private static AtomicBoolean signalAlreadyReceived = new AtomicBoolean(false);
/**
* Create an instance of the launcher
* @param serviceClassName classname of the service
*/
public ServiceLauncher(String serviceClassName) {
this.serviceClassName = serviceClassName;
}
/**
* Get the service. Null until and unless
* {@link #launchService(Configuration, String[], boolean)} has completed
* @return the service
*/
public S getService() {
return service;
}
/**
* Get the configuration constructed from the command line arguments
* @return the configuration used to create the service
*/
public Configuration getConfiguration() {
return configuration;
}
/**
* The exit code from a successful service execution
* @return the exit code.
*/
public int getServiceExitCode() {
return serviceExitCode;
}
@Override
public String toString() {
return "ServiceLauncher for " + serviceClassName;
}
/**
* Launch the service, by creating it, initing it, starting it and then
* maybe running it. {@link RunService#bindArgs(Configuration, String...)} is invoked
* on the service between creation and init.
*
* All exceptions that occur are propagated upwards.
*
* If the method returns a status code, it means that it got as far starting
* the service, and if it implements {@link RunService}, that the
* method {@link RunService#runService()} has completed.
*
* At this point, the service is returned by {@link #getService()}.
*
* @param conf configuration
* @param processedArgs arguments after the configuration parameters
* have been stripped out.
* @param addShutdownHook should a shutdown hook be added to terminate
* this service on shutdown. Tests should set this to false.
* @throws ClassNotFoundException classname not on the classpath
* @throws IllegalAccessException not allowed at the class
* @throws InstantiationException not allowed to instantiate it
* @throws InterruptedException thread interrupted
* @throws IOException any IO exception
*/
public int launchService(Configuration conf,
String[] processedArgs,
boolean addShutdownHook)
throws Throwable,
ClassNotFoundException,
InstantiationException,
IllegalAccessException,
ExitUtil.ExitException {
instantiateService(conf);
//Register the interrupt handlers
registerInterruptHandler();
//and the shutdown hook
if (addShutdownHook) {
ServiceShutdownHook shutdownHook = new ServiceShutdownHook(service);
ShutdownHookManager.get().addShutdownHook(shutdownHook, PRIORITY);
}
RunService runService = null;
if (service instanceof RunService) {
//if its a runService, pass in the conf and arguments before init)
runService = (RunService) service;
configuration = runService.bindArgs(configuration, processedArgs);
assert configuration != null : "null configuration returned by bindArgs()";
}
//some class constructors init; here this is picked up on.
if (!service.isInState(Service.STATE.INITED)) {
service.init(configuration);
}
service.start();
int exitCode = EXIT_SUCCESS;
if (runService != null) {
//assume that runnable services are meant to run from here
exitCode = runService.runService();
LOG.debug("Service exited with exit code " + exitCode);
} else {
//run the service until it stops or an interrupt happens on a different thread.
LOG.debug("waiting for service threads to terminate");
service.waitForServiceToStop(0);
}
//exit
serviceExitCode = exitCode;
return serviceExitCode;
}
/**
* Instantiate the service defined in <code>serviceClassName</code>
* . Sets the <code>configuration</code> field
* to the configuration, and <code>service</code> to the service.
*
* @param conf configuration to use
* @throws ClassNotFoundException no such class
* @throws InstantiationException no empty constructor,
* problems with dependencies
* @throws IllegalAccessException no access rights
*/
public Service instantiateService(Configuration conf) throws
ClassNotFoundException,
InstantiationException,
IllegalAccessException,
ExitUtil.ExitException,
NoSuchMethodException,
InvocationTargetException {
configuration = conf;
//Instantiate the class -this requires the service to have a public
// zero-argument constructor
Class<?> serviceClass =
this.getClass().getClassLoader().loadClass(serviceClassName);
Object instance = serviceClass.getConstructor().newInstance();
if (!(instance instanceof Service)) {
//not a service
throw new ExitUtil.ExitException(EXIT_BAD_CONFIGURATION,
"Not a Service class: " + serviceClassName);
}
service = (S) instance;
return service;
}
/**
* Register this class as the handler for the control-C interrupt.
* Can be overridden for testing.
* @throws IOException on a failure to add the handler
*/
protected void registerInterruptHandler() throws IOException {
try {
interruptHandlers.add(new IrqHandler(IrqHandler.CONTROL_C, this));
interruptHandlers.add(new IrqHandler(IrqHandler.SIGTERM, this));
} catch (IOException e) {
error("Signal handler setup failed : " + e, e);
}
}
/**
* The service has been interrupted.
* Trigger something resembling an elegant shutdown;
* Give the service time to do this before the exit operation is called
* @param interruptData the interrupted data.
*/
@Override
public void interrupted(IrqHandler.InterruptData interruptData) {
String message = "Service interrupted by " + interruptData.toString();
warn(message);
if (!signalAlreadyReceived.compareAndSet(false, true)) {
warn("Repeated interrupt: escalating to a JVM halt");
// signal already received. On a second request to a hard JVM
// halt and so bypass any blocking shutdown hooks.
ExitUtil.halt(EXIT_INTERRUPTED, message);
}
int shutdownTimeMillis = SHUTDOWN_TIME_ON_INTERRUPT;
//start an async shutdown thread with a timeout
ServiceForcedShutdown forcedShutdown =
new ServiceForcedShutdown(shutdownTimeMillis);
Thread thread = new Thread(forcedShutdown);
thread.setDaemon(true);
thread.start();
//wait for that thread to finish
try {
thread.join(shutdownTimeMillis);
} catch (InterruptedException ignored) {
//ignored
}
if (!forcedShutdown.isServiceStopped()) {
warn("Service did not shut down in time");
}
exit(EXIT_INTERRUPTED, message);
}
protected void warn(String text) {
System.err.println(text);
}
protected void error(String message, Throwable thrown) {
String text = "Exception: " + message;
System.err.println(text);
LOG.error(text, thrown);
}
/**
* Exit the code.
* This is method can be overridden for testing, throwing an
* exception instead. Any subclassed method MUST raise an
* {@link ExitUtil.ExitException} instance.
* The service launcher code assumes that after this method is invoked,
* no other code in the same method is called.
* @param exitCode code to exit
*/
protected void exit(int exitCode, String message) {
ExitUtil.terminate(exitCode, message);
}
/**
* Exit off an exception. This can be subclassed for testing
* @param ee exit exception
*/
protected void exit(ExitUtil.ExitException ee) {
ExitUtil.terminate(ee.status, ee);
}
/**
* Get the service name via {@link Service#getName()}.
* If the service is not instantiated, the classname is returned instead.
* @return the service name
*/
public String getServiceName() {
Service s = service;
String name = null;
if (s != null) {
try {
name = s.getName();
} catch (Exception ignored) {
// ignored
}
}
if (name != null) {
return "service " + name;
} else {
return "service classname " + serviceClassName;
}
}
/**
* Parse the command line, building a configuration from it, then
* launch the service and wait for it to finish. finally, exit
* passing the status code to the #exit(int) method.
* @param args arguments to the service. arg[0] is
* assumed to be the service classname and is automatically
*/
public void launchServiceAndExit(List<String> args) {
//Currently the config just the default
Configuration conf = new Configuration();
String[] processedArgs = extractConfigurationArgs(conf, args);
ExitUtil.ExitException ee = launchServiceRobustly(conf, processedArgs);
exit(ee);
}
/**
* Extract the configuration arguments and apply them to the configuration,
* building an array of processed arguments to hand down to the service.
*
* @param conf configuration to update
* @param args main arguments. args[0] is assumed to be the service
* classname and is skipped
* @return the processed list.
*/
public static String[] extractConfigurationArgs(Configuration conf,
List<String> args) {
//convert args to a list
int argCount = args.size();
if (argCount <= 1 ) {
return new String[0];
}
List<String> argsList = new ArrayList<>(argCount);
ListIterator<String> arguments = args.listIterator();
//skip that first entry
arguments.next();
while (arguments.hasNext()) {
String arg = arguments.next();
if (arg.equals(ARG_CONF)) {
//the argument is a --conf file tuple: extract the path and load
//it in as a configuration resource.
//increment the loop iterator
if (!arguments.hasNext()) {
//overshot the end of the file
exitWithMessage(EXIT_COMMAND_ARGUMENT_ERROR,
ARG_CONF + ": missing configuration file after ");
}
File file = new File(arguments.next());
if (!file.exists()) {
exitWithMessage(EXIT_COMMAND_ARGUMENT_ERROR,
ARG_CONF + ": configuration file not found: " + file);
}
try {
conf.addResource(file.toURI().toURL());
} catch (MalformedURLException e) {
exitWithMessage(EXIT_COMMAND_ARGUMENT_ERROR,
ARG_CONF + ": configuration file path invalid: " + file);
}
} else {
argsList.add(arg);
}
}
String[] processedArgs = new String[argsList.size()];
argsList.toArray(processedArgs);
return processedArgs;
}
/**
* Launch a service catching all exceptions and downgrading them to exit codes
* after logging.
* @param conf configuration to use
* @param processedArgs command line after the launcher-specific arguments have
* been stripped out
* @return an exit exception, which will have a status code of 0 if it worked
*/
public ExitUtil.ExitException launchServiceRobustly(Configuration conf,
String[] processedArgs) {
ExitUtil.ExitException exitException;
try {
int exitCode = launchService(conf, processedArgs, true);
if (service != null) {
Throwable failure = service.getFailureCause();
if (failure != null) {
//the service exited with a failure.
//check what state it is in
Service.STATE failureState = service.getFailureState();
if (failureState == Service.STATE.STOPPED) {
//the failure occurred during shutdown, not important enough to bother
//the user as it may just scare them
LOG.debug("Failure during shutdown: " + failure, failure);
} else {
//throw it for the catch handlers to deal with
throw failure;
}
}
}
exitException = new ExitUtil.ExitException(exitCode,
"In " + serviceClassName);
//either the service succeeded, or an error raised during shutdown,
//which we don't worry that much about
} catch (ExitUtil.ExitException ee) {
exitException = ee;
} catch (Throwable thrown) {
int exitCode;
String message = thrown.getMessage();
if (message == null) {
message = thrown.toString();
}
LOG.error(message) ;
if (thrown instanceof ExitCodeProvider) {
exitCode = ((ExitCodeProvider) thrown).getExitCode();
if (LOG.isDebugEnabled()) {
LOG.debug("While running " + getServiceName() + ": " + message, thrown);
}
} else {
//not any of the service launcher exceptions -assume something worse
error(message, thrown);
exitCode = EXIT_EXCEPTION_THROWN;
}
exitException = new ExitUtil.ExitException(exitCode, message);
exitException.initCause(thrown);
}
return exitException;
}
/**
* Build a log message for starting up and shutting down.
* This was grabbed from the ToolRunner code.
* @param classname the class of the server
* @param args arguments
*/
public static String startupShutdownMessage(String classname,
List<String> args) {
final String hostname = NetUtils.getHostname();
return toStartupShutdownString("STARTUP_MSG: ", new String[]{
"Starting " + classname,
" host = " + hostname,
" args = " + args,
" version = " + VersionInfo.getVersion(),
" classpath = " + System.getProperty("java.class.path"),
" build = " + VersionInfo.getUrl() + " -r "
+ VersionInfo.getRevision()
+ "; compiled by '" + VersionInfo.getUser()
+ "' on " + VersionInfo.getDate(),
" java = " + System.getProperty("java.version")
});
}
/**
* Exit with a printed message
* @param status status code
* @param message message
*/
private static void exitWithMessage(int status, String message) {
System.err.println(message);
ExitUtil.terminate(status);
}
private static String toStartupShutdownString(String prefix, String[] msg) {
StringBuilder b = new StringBuilder(prefix);
b.append("\n/************************************************************");
for (String s : msg) {
b.append("\n").append(prefix).append(s);
}
b.append("\n************************************************************/");
return b.toString();
}
/**
* forced shutdown runnable.
*/
protected class ServiceForcedShutdown implements Runnable {
private final int shutdownTimeMillis;
private boolean serviceStopped;
public ServiceForcedShutdown(int shutdownTimeoutMillis) {
this.shutdownTimeMillis = shutdownTimeoutMillis;
}
@Override
public void run() {
if (service != null) {
service.stop();
serviceStopped = service.waitForServiceToStop(shutdownTimeMillis);
} else {
serviceStopped = true;
}
}
private boolean isServiceStopped() {
return serviceStopped;
}
}
/**
* The real main function, which takes the arguments as a list
* arg 0 must be the service classname
* @param argsList the list of arguments
*/
public static void serviceMain(List<String> argsList) {
if (argsList.isEmpty()) {
exitWithMessage(EXIT_USAGE, USAGE_MESSAGE);
} else {
String serviceClassName = argsList.get(0);
if (LOG.isDebugEnabled()) {
LOG.debug(startupShutdownMessage(serviceClassName, argsList));
StringBuilder builder = new StringBuilder();
for (String arg : argsList) {
builder.append('"').append(arg).append("\" ");
}
LOG.debug(builder.toString());
}
Thread.setDefaultUncaughtExceptionHandler(
new YarnUncaughtExceptionHandler());
ServiceLauncher serviceLauncher = new ServiceLauncher<Service>(serviceClassName);
serviceLauncher.launchServiceAndExit(argsList);
}
}
/**
* This is the main entry point for the service launcher.
* @param args command line arguments.
*/
public static void main(String[] args) {
List<String> argsList = Arrays.asList(args);
serviceMain(argsList);
}
}