blob: 958f2dc4e228d70f0e09b282dd7b4f9d4e590e84 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.server;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.StackTrace;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.ClusterCoordinator.RegistrationHandle;
import org.apache.drill.exec.coord.zk.ZKClusterCoordinator;
import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.server.options.OptionValue;
import org.apache.drill.exec.server.options.OptionValue.OptionType;
import org.apache.drill.exec.server.rest.DrillRestServer;
import org.apache.drill.exec.service.ServiceEngine;
import org.apache.drill.exec.store.sys.CachingStoreProvider;
import org.apache.drill.exec.store.sys.PStoreProvider;
import org.apache.drill.exec.store.sys.PStoreRegistry;
import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
import org.apache.drill.exec.work.WorkManager;
import org.apache.zookeeper.Environment;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.ErrorHandler;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.resource.Resource;
import org.glassfish.jersey.servlet.ServletContainer;
import com.codahale.metrics.servlets.MetricsServlet;
import com.codahale.metrics.servlets.ThreadDumpServlet;
import com.google.common.io.Closeables;
/**
* Starts, tracks and stops all the required services for a Drillbit daemon to work.
*/
public class Drillbit implements AutoCloseable {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Drillbit.class);
static {
Environment.logEnv("Drillbit environment:.", logger);
}
private boolean isClosed = false;
public static Drillbit start(final StartupOptions options) throws DrillbitStartupException {
return start(DrillConfig.create(options.getConfigLocation()), null);
}
public static Drillbit start(final DrillConfig config) throws DrillbitStartupException {
return start(config, null);
}
public static Drillbit start(final DrillConfig config, final RemoteServiceSet remoteServiceSet)
throws DrillbitStartupException {
logger.debug("Setting up Drillbit.");
Drillbit bit;
try {
bit = new Drillbit(config, remoteServiceSet);
} catch (Exception ex) {
throw new DrillbitStartupException("Failure while initializing values in Drillbit.", ex);
}
logger.debug("Starting Drillbit.");
try {
bit.run();
} catch (Exception e) {
bit.close();
throw new DrillbitStartupException("Failure during initial startup of Drillbit.", e);
}
return bit;
}
private final static String SYSTEM_OPTIONS_NAME = "org.apache.drill.exec.server.Drillbit.system_options";
private static void throwInvalidSystemOption(final String systemProp, final String errorMessage) {
throw new IllegalStateException("Property \"" + SYSTEM_OPTIONS_NAME + "\" part \"" + systemProp
+ "\" " + errorMessage + ".");
}
private static String stripQuotes(final String s, final String systemProp) {
if (s.isEmpty()) {
return s;
}
final char cFirst = s.charAt(0);
final char cLast = s.charAt(s.length() - 1);
if ((cFirst == '"') || (cFirst == '\'')) {
if (cLast != cFirst) {
throwInvalidSystemOption(systemProp, "quoted value does not have closing quote");
}
return s.substring(1, s.length() - 2); // strip the quotes
}
if ((cLast == '"') || (cLast == '\'')) {
throwInvalidSystemOption(systemProp, "value has unbalanced closing quote");
}
// return as-is
return s;
}
private void javaPropertiesToSystemOptions() {
// get the system options property
final String allSystemProps = System.getProperty(SYSTEM_OPTIONS_NAME);
if ((allSystemProps == null) || allSystemProps.isEmpty()) {
return;
}
final OptionManager optionManager = getContext().getOptionManager();
// parse out the properties, validate, and then set them
final String systemProps[] = allSystemProps.split(",");
for(String systemProp : systemProps) {
final String keyValue[] = systemProp.split("=");
if (keyValue.length != 2) {
throwInvalidSystemOption(systemProp, "does not contain a key=value assignment");
}
final String optionName = keyValue[0].trim();
if (optionName.isEmpty()) {
throwInvalidSystemOption(systemProp, "does not contain a key before the assignment");
}
final String optionString = stripQuotes(keyValue[1].trim(), systemProp);
if (optionString.isEmpty()) {
throwInvalidSystemOption(systemProp, "does not contain a value after the assignment");
}
final OptionValue defaultValue = optionManager.getOption(optionName);
if (defaultValue == null) {
throwInvalidSystemOption(systemProp, "does not specify a valid option name");
}
if (defaultValue.type != OptionType.SYSTEM) {
throwInvalidSystemOption(systemProp, "does not specify a SYSTEM option ");
}
final OptionValue optionValue = OptionValue.createOption(
defaultValue.kind, OptionType.SYSTEM, optionName, optionString);
optionManager.setOption(optionValue);
}
}
public static void main(final String[] cli) throws DrillbitStartupException {
StartupOptions options = StartupOptions.parse(cli);
start(options);
}
private final ClusterCoordinator coord;
private final ServiceEngine engine;
private final PStoreProvider storeProvider;
private final WorkManager manager;
private final BootStrapContext context;
private final Server embeddedJetty;
private RegistrationHandle registrationHandle;
public Drillbit(DrillConfig config, RemoteServiceSet serviceSet) throws Exception {
final boolean allowPortHunting = serviceSet != null;
final boolean enableHttp = config.getBoolean(ExecConstants.HTTP_ENABLE);
context = new BootStrapContext(config);
manager = new WorkManager(context);
engine = new ServiceEngine(manager.getControlMessageHandler(), manager.getUserWorker(), context,
manager.getWorkBus(), manager.getDataHandler(), allowPortHunting);
if (enableHttp) {
embeddedJetty = new Server(config.getInt(ExecConstants.HTTP_PORT));
} else {
embeddedJetty = null;
}
if (serviceSet != null) {
coord = serviceSet.getCoordinator();
storeProvider = new CachingStoreProvider(new LocalPStoreProvider(config));
} else {
coord = new ZKClusterCoordinator(config);
storeProvider = new PStoreRegistry(this.coord, config).newPStoreProvider();
}
}
private void startJetty() throws Exception {
if (embeddedJetty == null) {
return;
}
final ErrorHandler errorHandler = new ErrorHandler();
errorHandler.setShowStacks(true);
errorHandler.setShowMessageInTitle(true);
final ServletContextHandler servletContextHandler =
new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
servletContextHandler.setErrorHandler(errorHandler);
servletContextHandler.setContextPath("/");
embeddedJetty.setHandler(servletContextHandler);
final ServletHolder servletHolder = new ServletHolder(new ServletContainer(new DrillRestServer(manager)));
// servletHolder.setInitParameter(ServerProperties.PROVIDER_PACKAGES, "org.apache.drill.exec.server");
servletHolder.setInitOrder(1);
servletContextHandler.addServlet(servletHolder, "/*");
servletContextHandler.addServlet(
new ServletHolder(new MetricsServlet(context.getMetrics())), "/status/metrics");
servletContextHandler.addServlet(new ServletHolder(new ThreadDumpServlet()), "/status/threads");
final ServletHolder staticHolder = new ServletHolder("static", DefaultServlet.class);
staticHolder.setInitParameter("resourceBase", Resource.newClassPathResource("/rest/static").toString());
staticHolder.setInitParameter("dirAllowed","false");
staticHolder.setInitParameter("pathInfoOnly","true");
servletContextHandler.addServlet(staticHolder,"/static/*");
embeddedJetty.start();
}
public void run() throws Exception {
coord.start(10000);
storeProvider.start();
final DrillbitEndpoint md = engine.start();
manager.start(md, engine.getController(), engine.getDataConnectionCreator(), coord, storeProvider);
final DrillbitContext drillbitContext = manager.getContext();
drillbitContext.getStorage().init();
drillbitContext.getOptionManager().init();
javaPropertiesToSystemOptions();
registrationHandle = coord.register(md);
startJetty();
Runtime.getRuntime().addShutdownHook(new ShutdownThread(this, new StackTrace()));
}
@Override
public synchronized void close() {
// avoid complaints about double closing
if (isClosed) {
return;
}
// wait for anything that is running to complete
manager.waitToExit();
if (coord != null && registrationHandle != null) {
coord.unregister(registrationHandle);
}
try {
Thread.sleep(context.getConfig().getInt(ExecConstants.ZK_REFRESH) * 2);
} catch (InterruptedException e) {
logger.warn("Interrupted while sleeping during coordination deregistration.");
}
if (embeddedJetty != null) {
try {
embeddedJetty.stop();
} catch (Exception e) {
logger.warn("Failure while shutting down embedded jetty server.");
}
}
Closeables.closeQuietly(engine);
AutoCloseables.close(storeProvider, logger);
Closeables.closeQuietly(coord);
AutoCloseables.close(manager, logger);
Closeables.closeQuietly(context);
logger.info("Shutdown completed.");
isClosed = true;
}
/**
* Shutdown hook for Drillbit. Closes the drillbit, and reports on errors that
* occur during closure, as well as the location the drillbit was started from.
*/
private static class ShutdownThread extends Thread {
private final static AtomicInteger idCounter = new AtomicInteger(0);
private final Drillbit drillbit;
private final StackTrace stackTrace;
/**
* Constructor.
*
* @param drillbit the drillbit to close down
* @param stackTrace the stack trace from where the Drillbit was started;
* use new StackTrace() to generate this
*/
public ShutdownThread(final Drillbit drillbit, final StackTrace stackTrace) {
this.drillbit = drillbit;
this.stackTrace = stackTrace;
/*
* TODO should we try to determine a test class name?
* See https://blogs.oracle.com/tor/entry/how_to_determine_the_junit
*/
setName("Drillbit-ShutdownHook#" + idCounter.getAndIncrement());
}
@Override
public void run() {
logger.info("Received shutdown request.");
try {
drillbit.close();
} catch(Exception e) {
throw new RuntimeException("Caught exception closing Drillbit started from\n" + stackTrace, e);
}
}
}
public DrillbitContext getContext() {
return manager.getContext();
}
}