blob: 2f046a1f0cb3241891d749a1049017db88e3b1d7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.master;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.RackResolver;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tajo.catalog.CatalogServer;
import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.catalog.FunctionDesc;
import org.apache.tajo.catalog.LocalCatalogWrapper;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto;
import org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto.AlterTablespaceCommand;
import org.apache.tajo.catalog.store.AbstractDBStore;
import org.apache.tajo.catalog.store.DerbyStore;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.engine.function.FunctionLoader;
import org.apache.tajo.engine.function.hiveudf.HiveFunctionLoader;
import org.apache.tajo.exception.*;
import org.apache.tajo.io.AsyncTaskService;
import org.apache.tajo.master.rm.TajoResourceManager;
import org.apache.tajo.metrics.ClusterResourceMetricSet;
import org.apache.tajo.metrics.Master;
import org.apache.tajo.plan.function.python.PythonScriptEngine;
import org.apache.tajo.rpc.RpcClientManager;
import org.apache.tajo.rule.EvaluationContext;
import org.apache.tajo.rule.EvaluationFailedException;
import org.apache.tajo.rule.SelfDiagnosisRuleEngine;
import org.apache.tajo.rule.SelfDiagnosisRuleSession;
import org.apache.tajo.service.ServiceTracker;
import org.apache.tajo.service.ServiceTrackerFactory;
import org.apache.tajo.session.SessionManager;
import org.apache.tajo.storage.TablespaceManager;
import org.apache.tajo.util.*;
import org.apache.tajo.util.history.HistoryReader;
import org.apache.tajo.util.history.HistoryWriter;
import org.apache.tajo.util.metrics.TajoSystemMetrics;
import org.apache.tajo.webapp.QueryExecutorServlet;
import org.apache.tajo.webapp.StaticHttpServer;
import org.apache.tajo.ws.rs.TajoRestService;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.Writer;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.net.InetSocketAddress;
import java.util.*;
import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
public class TajoMaster extends CompositeService {
/** Class Logger */
private static final Log LOG = LogFactory.getLog(TajoMaster.class);
public static final int SHUTDOWN_HOOK_PRIORITY = 10;
/** rw-r--r-- */
@SuppressWarnings("OctalInteger")
final public static FsPermission TAJO_ROOT_DIR_PERMISSION = FsPermission.createImmutable((short) 0755);
/** rw-r--r-- */
@SuppressWarnings("OctalInteger")
final public static FsPermission SYSTEM_DIR_PERMISSION = FsPermission.createImmutable((short) 0755);
/** rw-r--r-- */
@SuppressWarnings("OctalInteger")
final public static FsPermission SYSTEM_RESOURCE_DIR_PERMISSION = FsPermission.createImmutable((short) 0755);
/** rw-r--r-- */
@SuppressWarnings("OctalInteger")
final public static FsPermission WAREHOUSE_DIR_PERMISSION = FsPermission.createImmutable((short) 0755);
/** rw-r--r-- */
@SuppressWarnings("OctalInteger")
final public static FsPermission STAGING_ROOTDIR_PERMISSION = FsPermission.createImmutable((short) 0755);
/** rw-r--r-- */
@SuppressWarnings("OctalInteger")
final public static FsPermission SYSTEM_CONF_FILE_PERMISSION = FsPermission.createImmutable((short) 0755);
private MasterContext context;
private TajoConf systemConf;
private FileSystem defaultFS;
private Clock clock;
private CatalogServer catalogServer;
private CatalogService catalog;
private GlobalEngine globalEngine;
private AsyncTaskService asyncTaskService;
private AsyncDispatcher dispatcher;
private TajoMasterClientService tajoMasterClientService;
private QueryCoordinatorService tajoMasterService;
private SessionManager sessionManager;
private TajoResourceManager resourceManager;
//Web Server
private StaticHttpServer webServer;
private TajoRestService restServer;
private QueryManager queryManager;
private ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
private TajoSystemMetrics systemMetrics;
private ServiceTracker haService;
private JvmPauseMonitor pauseMonitor;
private HistoryWriter historyWriter;
private HistoryReader historyReader;
private static final long CLUSTER_STARTUP_TIME = System.currentTimeMillis();
public TajoMaster() throws Exception {
super(TajoMaster.class.getName());
}
public String getMasterName() {
return NetUtils.normalizeInetSocketAddress(tajoMasterService.getBindAddress());
}
public String getVersion() {
return VersionInfo.getDisplayVersion();
}
public TajoMasterClientService getTajoMasterClientService() {
return tajoMasterClientService;
}
@Override
public void serviceInit(Configuration conf) throws Exception {
this.systemConf = TUtil.checkTypeAndGet(conf, TajoConf.class);
ShutdownHookManager.get().addShutdownHook(new ShutdownHook(), SHUTDOWN_HOOK_PRIORITY);
context = new MasterContext(systemConf);
clock = new SystemClock();
RackResolver.init(systemConf);
initResourceManager();
this.dispatcher = new AsyncDispatcher();
addIfService(dispatcher);
// check the system directory and create if they are not created.
checkAndInitializeSystemDirectories();
diagnoseTajoMaster();
catalogServer = new CatalogServer(TablespaceManager.getMetadataProviders(), loadFunctions());
addIfService(catalogServer);
catalog = new LocalCatalogWrapper(catalogServer, systemConf);
sessionManager = new SessionManager(dispatcher);
addIfService(sessionManager);
globalEngine = new GlobalEngine(context);
addIfService(globalEngine);
queryManager = new QueryManager(context);
addIfService(queryManager);
tajoMasterClientService = new TajoMasterClientService(context);
addIfService(tajoMasterClientService);
asyncTaskService = new AsyncTaskService(context);
addIfService(asyncTaskService);
tajoMasterService = new QueryCoordinatorService(context);
addIfService(tajoMasterService);
restServer = new TajoRestService(context);
addIfService(restServer);
PythonScriptEngine.initPythonScriptEngineFiles();
// Try to start up all services in TajoMaster.
// If anyone is failed, the master prints out the errors and immediately should shutdowns
try {
super.serviceInit(systemConf);
} catch (Throwable t) {
t.printStackTrace();
Runtime.getRuntime().halt(-1);
}
LOG.info("Tajo Master is initialized.");
}
private Collection<FunctionDesc> loadFunctions() throws IOException, AmbiguousFunctionException {
List<FunctionDesc> functionList = new ArrayList<>(FunctionLoader.loadBuiltinFunctions().values());
HashMap<Integer, FunctionDesc> funcSet = new HashMap<>();
for (FunctionDesc desc: functionList) {
funcSet.put(desc.hashCodeWithoutType(), desc);
}
checkUDFduplicateAndMerge(FunctionLoader.loadUserDefinedFunctions(systemConf).orElse(Collections.emptyList()), funcSet, functionList);
checkUDFduplicateAndMerge(HiveFunctionLoader.loadHiveUDFs(systemConf).orElse(Collections.emptyList()), funcSet, functionList);
return functionList;
}
/**
* Checks duplicates between pre-loaded functions and UDFs. And they are meged to funcList.
*
* @param udfs UDF list
* @param funcSet set for pre-loaded functions to match signature
* @param funcList list to be merged
* @throws AmbiguousFunctionException
*/
private static void checkUDFduplicateAndMerge(List<FunctionDesc> udfs, HashMap<Integer, FunctionDesc> funcSet, List<FunctionDesc> funcList)
throws AmbiguousFunctionException {
for (FunctionDesc desc: udfs) {
// check
if (funcSet.containsKey(desc.hashCodeWithoutType())) {
throw new AmbiguousFunctionException(String.format("UDF %s", desc.toString()));
}
// merge
funcSet.put(desc.hashCodeWithoutType(), desc);
funcList.add(desc);
}
}
private void initSystemMetrics() {
systemMetrics = new TajoSystemMetrics(systemConf, Master.class, getMasterName());
systemMetrics.start();
systemMetrics.register(Master.Cluster.UPTIME, () -> context.getClusterUptime());
systemMetrics.register(Master.Cluster.class, new ClusterResourceMetricSet(context));
}
private void initResourceManager() throws Exception {
resourceManager = new TajoResourceManager(context);
addIfService(resourceManager);
}
private void initWebServer() throws Exception {
if (!systemConf.getBoolVar(ConfVars.$TEST_MODE)) {
InetSocketAddress address = systemConf.getSocketAddrVar(ConfVars.TAJO_MASTER_INFO_ADDRESS);
webServer = StaticHttpServer.getInstance(this ,"admin", address.getHostName(), address.getPort(),
true, null, context.getConf(), null);
webServer.addServlet("queryServlet", "/query_exec", QueryExecutorServlet.class);
webServer.start();
}
}
private void checkAndInitializeSystemDirectories() throws IOException {
// Get Tajo root dir
Path tajoRootPath = TajoConf.getTajoRootDir(systemConf);
LOG.info("Tajo Root Directory: " + tajoRootPath);
// Check and Create Tajo root dir
this.defaultFS = tajoRootPath.getFileSystem(systemConf);
systemConf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultFS.getUri().toString());
LOG.info("FileSystem (" + this.defaultFS.getUri() + ") is initialized.");
if (!defaultFS.exists(tajoRootPath)) {
defaultFS.mkdirs(tajoRootPath, new FsPermission(TAJO_ROOT_DIR_PERMISSION));
LOG.info("Tajo Root Directory '" + tajoRootPath + "' is created.");
}
// Check and Create system and system resource dir
Path systemPath = TajoConf.getSystemDir(systemConf);
if (!defaultFS.exists(systemPath)) {
defaultFS.mkdirs(systemPath, new FsPermission(SYSTEM_DIR_PERMISSION));
LOG.info("System dir '" + systemPath + "' is created");
}
Path systemResourcePath = TajoConf.getSystemResourceDir(systemConf);
if (!defaultFS.exists(systemResourcePath)) {
defaultFS.mkdirs(systemResourcePath, new FsPermission(SYSTEM_RESOURCE_DIR_PERMISSION));
LOG.info("System resource dir '" + systemResourcePath + "' is created");
}
// Get Warehouse dir
Path wareHousePath = TajoConf.getWarehouseDir(systemConf);
LOG.info("Tajo Warehouse dir: " + wareHousePath);
// Check and Create Warehouse dir
if (!defaultFS.exists(wareHousePath)) {
defaultFS.mkdirs(wareHousePath, new FsPermission(WAREHOUSE_DIR_PERMISSION));
LOG.info("Warehouse dir '" + wareHousePath + "' is created");
}
Path stagingPath = TajoConf.getDefaultRootStagingDir(systemConf);
LOG.info("Staging dir: " + wareHousePath);
if (!defaultFS.exists(stagingPath)) {
defaultFS.mkdirs(stagingPath, new FsPermission(STAGING_ROOTDIR_PERMISSION));
LOG.info("Staging dir '" + stagingPath + "' is created");
}
}
private void diagnoseTajoMaster() throws EvaluationFailedException {
SelfDiagnosisRuleEngine ruleEngine = SelfDiagnosisRuleEngine.getInstance();
SelfDiagnosisRuleSession ruleSession = ruleEngine.newRuleSession();
EvaluationContext context = new EvaluationContext();
context.addParameter(TajoConf.class.getName(), systemConf);
ruleSession.withCategoryNames("base", "master").fireRules(context);
}
private void startJvmPauseMonitor(){
pauseMonitor = new JvmPauseMonitor(systemConf);
pauseMonitor.start();
}
public MasterContext getContext() {
return this.context;
}
@Override
public void serviceStart() throws Exception {
LOG.info("TajoMaster is starting up");
startJvmPauseMonitor();
// check base tablespace and databases
checkBaseTBSpaceAndDatabase();
super.serviceStart();
initSystemMetrics();
// Setting the system global configs
systemConf.setVar(ConfVars.CATALOG_ADDRESS, NetUtils.getHostPortString(catalogServer.getBindAddress()));
try {
writeSystemConf();
} catch (IOException e) {
LOG.error(e.getMessage(), e);
}
initWebServer();
haService = ServiceTrackerFactory.get(systemConf);
haService.register();
historyWriter = new HistoryWriter(getMasterName(), true);
historyWriter.init(getConfig());
addIfService(historyWriter);
historyWriter.start();
historyReader = new HistoryReader(getMasterName(), context.getConf());
}
private void writeSystemConf() throws IOException {
// Storing the system configs
Path systemConfPath = TajoConf.getSystemConfPath(systemConf);
if (!defaultFS.exists(systemConfPath.getParent())) {
defaultFS.mkdirs(systemConfPath.getParent());
}
if (defaultFS.exists(systemConfPath)) {
defaultFS.delete(systemConfPath, false);
}
// In TajoMaster HA, some master might see LeaseExpiredException because of lease mismatch. Thus,
// we need to create below xml file at HdfsServiceTracker::writeSystemConf.
if (!systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
try (FSDataOutputStream out = FileSystem.create(defaultFS, systemConfPath,
new FsPermission(SYSTEM_CONF_FILE_PERMISSION))) {
systemConf.writeXml(out);
}
defaultFS.setReplication(systemConfPath, (short) systemConf.getIntVar(ConfVars.SYSTEM_CONF_REPLICA_COUNT));
}
}
private void checkBaseTBSpaceAndDatabase()
throws IOException, DuplicateDatabaseException, DuplicateTablespaceException {
if (catalog.existTablespace(DEFAULT_TABLESPACE_NAME)) { // if default tablespace already exists
CatalogProtos.TablespaceProto tablespace = null;
try {
tablespace = catalog.getTablespace(DEFAULT_TABLESPACE_NAME);
} catch (UndefinedTablespaceException e) {
throw new TajoInternalError(e);
}
// if warehouse directory and the location of default tablespace are different from each other
if (!tablespace.getUri().equals(context.getConf().getVar(ConfVars.WAREHOUSE_DIR))) {
AlterTablespaceCommand.Builder alterCommand =
AlterTablespaceCommand.newBuilder()
.setType(AlterTablespaceProto.AlterTablespaceType.LOCATION)
.setLocation(context.getConf().getVar(ConfVars.WAREHOUSE_DIR));
AlterTablespaceProto alterTablespace = AlterTablespaceProto.newBuilder()
.setSpaceName(DEFAULT_TABLESPACE_NAME)
.addCommand(alterCommand).build();
// update the location of default tablespace
try {
catalog.alterTablespace(alterTablespace);
} catch (TajoException e) {
throw new TajoInternalError(e);
}
LOG.warn(
"The location of default tablespace has been changed. " +
"You may not accept existing managed tables stored in the previous default tablespace");
}
} else if (!catalog.existTablespace(DEFAULT_TABLESPACE_NAME)) { // if the default tablespace does not exists
catalog.createTablespace(DEFAULT_TABLESPACE_NAME, context.getConf().getVar(ConfVars.WAREHOUSE_DIR));
} else {
LOG.info(String.format("Default tablespace (%s) is already prepared.", DEFAULT_TABLESPACE_NAME));
}
if (!catalog.existDatabase(DEFAULT_DATABASE_NAME)) {
globalEngine.getDDLExecutor().createDatabase(null, DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME, false);
} else {
LOG.info(String.format("Default database (%s) is already prepared.", DEFAULT_DATABASE_NAME));
}
}
@Override
public void serviceStop() throws Exception {
if (haService != null) haService.delete();
if (restServer != null) restServer.stop();
if (webServer != null) webServer.stop();
if (systemMetrics != null) systemMetrics.stop();
if (pauseMonitor != null) pauseMonitor.stop();
super.serviceStop();
LOG.info("Tajo Master main thread exiting");
}
/**
* This is only for unit tests.
*/
@VisibleForTesting
public void refresh() {
catalogServer.refresh(TablespaceManager.getMetadataProviders());
}
public EventHandler getEventHandler() {
return dispatcher.getEventHandler();
}
public boolean isMasterRunning() {
return getServiceState() == STATE.STARTED;
}
public CatalogService getCatalog() {
return this.catalog;
}
public CatalogServer getCatalogServer() {
return this.catalogServer;
}
public class MasterContext {
private final TajoConf conf;
public MasterContext(TajoConf conf) {
this.conf = conf;
}
public TajoConf getConf() {
return conf;
}
public Clock getClock() {
return clock;
}
public long getClusterUptime() {
return getClock().getTime() - CLUSTER_STARTUP_TIME;
}
public QueryManager getQueryJobManager() {
return queryManager;
}
public TajoResourceManager getResourceManager() {
return resourceManager;
}
public EventHandler getEventHandler() {
return dispatcher.getEventHandler();
}
public CatalogService getCatalog() {
return catalog;
}
public SessionManager getSessionManager() {
return sessionManager;
}
public GlobalEngine getGlobalEngine() {
return globalEngine;
}
public AsyncTaskService asyncTaskExecutor() {
return asyncTaskService;
}
public QueryCoordinatorService getTajoMasterService() {
return tajoMasterService;
}
public TajoSystemMetrics getMetrics() {
return systemMetrics;
}
public ServiceTracker getHAService() {
return haService;
}
public HistoryWriter getHistoryWriter() {
return historyWriter;
}
public HistoryReader getHistoryReader() {
return historyReader;
}
public TajoRestService getRestServer() {
return restServer;
}
}
private String getThreadTaskName(long id, String name) {
if (name == null) {
return Long.toString(id);
}
return id + " (" + name + ")";
}
public void dumpThread(Writer writer) {
PrintWriter stream = new PrintWriter(writer);
int STACK_DEPTH = 20;
boolean contention = threadBean.isThreadContentionMonitoringEnabled();
long[] threadIds = threadBean.getAllThreadIds();
stream.println("Process Thread Dump: Tajo Worker");
stream.println(threadIds.length + " active threads");
for (long tid : threadIds) {
ThreadInfo info = threadBean.getThreadInfo(tid, STACK_DEPTH);
if (info == null) {
stream.println(" Inactive");
continue;
}
stream.println("Thread " + getThreadTaskName(info.getThreadId(), info.getThreadName()) + ":");
Thread.State state = info.getThreadState();
stream.println(" State: " + state + ", Blocked count: " + info.getBlockedCount() +
", Waited count: " + info.getWaitedCount());
if (contention) {
stream.println(" Blocked time: " + info.getBlockedTime() + ", Waited time: " + info.getWaitedTime());
}
if (state == Thread.State.WAITING) {
stream.println(" Waiting on " + info.getLockName());
} else if (state == Thread.State.BLOCKED) {
stream.println(" Blocked on " + info.getLockName() +
", Blocked by " + getThreadTaskName(info.getLockOwnerId(), info.getLockOwnerName()));
}
stream.println(" Stack:");
for (StackTraceElement frame : info.getStackTrace()) {
stream.println(" " + frame.toString());
}
stream.println("");
}
}
private class ShutdownHook implements Runnable {
@Override
public void run() {
if(!isInState(STATE.STOPPED)) {
LOG.info("============================================");
LOG.info("TajoMaster received SIGINT Signal");
LOG.info("============================================");
stop();
// If embedded derby is used as catalog, shutdown it.
if (catalogServer != null &&
catalogServer.getStoreClassName().equals("org.apache.tajo.catalog.store.DerbyStore")
&& AbstractDBStore.needShutdown(catalogServer.getStoreUri())) {
DerbyStore.shutdown();
}
RpcClientManager.shutdown();
}
}
}
public static void main(String[] args) throws Exception {
Thread.setDefaultUncaughtExceptionHandler(new TajoUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(TajoMaster.class, args, LOG);
try {
TajoMaster master = new TajoMaster();
TajoConf conf = new TajoConf();
master.init(conf);
master.start();
} catch (Throwable t) {
LOG.fatal("Error starting TajoMaster", t);
System.exit(-1);
}
}
}