blob: 9fcfca9a83eaabf8af956a40983afad59c26200b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.master;
import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.RackResolver;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.function.Function;
import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.engine.function.annotation.Description;
import org.apache.tajo.engine.function.annotation.ParamOptionTypes;
import org.apache.tajo.engine.function.annotation.ParamTypes;
import org.apache.tajo.master.metrics.CatalogMetricsGaugeSet;
import org.apache.tajo.master.metrics.WorkerResourceMetricsGaugeSet;
import org.apache.tajo.master.querymaster.QueryJobManager;
import org.apache.tajo.master.rm.TajoWorkerResourceManager;
import org.apache.tajo.master.rm.WorkerResourceManager;
import org.apache.tajo.rpc.RpcChannelFactory;
import org.apache.tajo.storage.AbstractStorageManager;
import org.apache.tajo.storage.StorageManagerFactory;
import org.apache.tajo.util.ClassUtil;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.metrics.TajoSystemMetrics;
import org.apache.tajo.webapp.QueryExecutorServlet;
import org.apache.tajo.webapp.StaticHttpServer;
import java.io.*;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.lang.reflect.Constructor;
import java.lang.reflect.Modifier;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
public class TajoMaster extends CompositeService {
private static final String METRICS_GROUP_NAME = "tajomaster";
/** Class Logger */
private static final Log LOG = LogFactory.getLog(TajoMaster.class);
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
/** rw-r--r-- */
@SuppressWarnings("OctalInteger")
final public static FsPermission TAJO_ROOT_DIR_PERMISSION = FsPermission.createImmutable((short) 0755);
/** rw-r--r-- */
@SuppressWarnings("OctalInteger")
final public static FsPermission SYSTEM_DIR_PERMISSION = FsPermission.createImmutable((short) 0755);
/** rw-r--r-- */
final public static FsPermission SYSTEM_RESOURCE_DIR_PERMISSION = FsPermission.createImmutable((short) 0755);
/** rw-r--r-- */
@SuppressWarnings("OctalInteger")
final public static FsPermission WAREHOUSE_DIR_PERMISSION = FsPermission.createImmutable((short) 0755);
/** rw-r--r-- */
@SuppressWarnings("OctalInteger")
final public static FsPermission STAGING_ROOTDIR_PERMISSION = FsPermission.createImmutable((short) 0755);
/** rw-r--r-- */
@SuppressWarnings("OctalInteger")
final public static FsPermission SYSTEM_CONF_FILE_PERMISSION = FsPermission.createImmutable((short) 0755);
private MasterContext context;
private TajoConf systemConf;
private FileSystem defaultFS;
private Clock clock;
private Path tajoRootPath;
private Path wareHousePath;
private CatalogServer catalogServer;
private CatalogService catalog;
private AbstractStorageManager storeManager;
private GlobalEngine globalEngine;
private AsyncDispatcher dispatcher;
private TajoMasterClientService tajoMasterClientService;
private TajoMasterService tajoMasterService;
private WorkerResourceManager resourceManager;
//Web Server
private StaticHttpServer webServer;
private QueryJobManager queryJobManager;
private ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
private TajoSystemMetrics systemMetrics;
public TajoMaster() throws Exception {
super(TajoMaster.class.getName());
}
public String getMasterName() {
return NetUtils.normalizeInetSocketAddress(tajoMasterService.getBindAddress());
}
public String getVersion() {
return TajoConstants.TAJO_VERSION;
}
public TajoMasterClientService getTajoMasterClientService() {
return tajoMasterClientService;
}
@Override
public void init(Configuration _conf) {
this.systemConf = (TajoConf) _conf;
context = new MasterContext(systemConf);
clock = new SystemClock();
try {
RackResolver.init(systemConf);
initResourceManager();
initWebServer();
this.dispatcher = new AsyncDispatcher();
addIfService(dispatcher);
// check the system directory and create if they are not created.
checkAndInitializeSystemDirectories();
this.storeManager = StorageManagerFactory.getStorageManager(systemConf);
catalogServer = new CatalogServer(initBuiltinFunctions());
addIfService(catalogServer);
catalog = new LocalCatalogWrapper(catalogServer, systemConf);
globalEngine = new GlobalEngine(context);
addIfService(globalEngine);
queryJobManager = new QueryJobManager(context);
addIfService(queryJobManager);
tajoMasterClientService = new TajoMasterClientService(context);
addIfService(tajoMasterClientService);
tajoMasterService = new TajoMasterService(context);
addIfService(tajoMasterService);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
super.init(systemConf);
LOG.info("Tajo Master is initialized.");
}
private void initSystemMetrics() {
systemMetrics = new TajoSystemMetrics(systemConf, METRICS_GROUP_NAME, getMasterName());
systemMetrics.start();
systemMetrics.register("resource", new WorkerResourceMetricsGaugeSet(context));
systemMetrics.register("catalog", new CatalogMetricsGaugeSet(context));
}
private void initResourceManager() throws Exception {
Class<WorkerResourceManager> resourceManagerClass = (Class<WorkerResourceManager>)
systemConf.getClass(ConfVars.RESOURCE_MANAGER_CLASS.varname, TajoWorkerResourceManager.class);
Constructor<WorkerResourceManager> constructor = resourceManagerClass.getConstructor(MasterContext.class);
resourceManager = constructor.newInstance(context);
addIfService(resourceManager);
}
private void initWebServer() throws Exception {
if (!systemConf.get(CommonTestingUtil.TAJO_TEST, "FALSE").equalsIgnoreCase("TRUE")) {
InetSocketAddress address = systemConf.getSocketAddrVar(ConfVars.TAJO_MASTER_INFO_ADDRESS);
webServer = StaticHttpServer.getInstance(this ,"admin", address.getHostName(), address.getPort(),
true, null, context.getConf(), null);
webServer.addServlet("queryServlet", "/query_exec", QueryExecutorServlet.class);
webServer.start();
}
}
private void checkAndInitializeSystemDirectories() throws IOException {
// Get Tajo root dir
this.tajoRootPath = TajoConf.getTajoRootDir(systemConf);
LOG.info("Tajo Root Directory: " + tajoRootPath);
// Check and Create Tajo root dir
this.defaultFS = tajoRootPath.getFileSystem(systemConf);
systemConf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultFS.getUri().toString());
LOG.info("FileSystem (" + this.defaultFS.getUri() + ") is initialized.");
if (!defaultFS.exists(tajoRootPath)) {
defaultFS.mkdirs(tajoRootPath, new FsPermission(TAJO_ROOT_DIR_PERMISSION));
LOG.info("Tajo Root Directory '" + tajoRootPath + "' is created.");
}
// Check and Create system and system resource dir
Path systemPath = TajoConf.getSystemDir(systemConf);
if (!defaultFS.exists(systemPath)) {
defaultFS.mkdirs(systemPath, new FsPermission(SYSTEM_DIR_PERMISSION));
LOG.info("System dir '" + systemPath + "' is created");
}
Path systemResourcePath = TajoConf.getSystemResourceDir(systemConf);
if (!defaultFS.exists(systemResourcePath)) {
defaultFS.mkdirs(systemResourcePath, new FsPermission(SYSTEM_RESOURCE_DIR_PERMISSION));
LOG.info("System resource dir '" + systemResourcePath + "' is created");
}
// Get Warehouse dir
this.wareHousePath = TajoConf.getWarehouseDir(systemConf);
LOG.info("Tajo Warehouse dir: " + wareHousePath);
// Check and Create Warehouse dir
if (!defaultFS.exists(wareHousePath)) {
defaultFS.mkdirs(wareHousePath, new FsPermission(WAREHOUSE_DIR_PERMISSION));
LOG.info("Warehouse dir '" + wareHousePath + "' is created");
}
Path stagingPath = TajoConf.getStagingDir(systemConf);
LOG.info("Staging dir: " + wareHousePath);
if (!defaultFS.exists(stagingPath)) {
defaultFS.mkdirs(stagingPath, new FsPermission(STAGING_ROOTDIR_PERMISSION));
LOG.info("Staging dir '" + stagingPath + "' is created");
}
}
@SuppressWarnings("unchecked")
public static List<FunctionDesc> initBuiltinFunctions() throws ServiceException {
List<FunctionDesc> sqlFuncs = new ArrayList<FunctionDesc>();
Set<Class> functionClasses = ClassUtil.findClasses(org.apache.tajo.catalog.function.Function.class,
"org.apache.tajo.engine.function");
for (Class eachClass : functionClasses) {
if(eachClass.isInterface() || Modifier.isAbstract(eachClass.getModifiers())) {
continue;
}
Function function = null;
try {
function = (Function)eachClass.newInstance();
} catch (Exception e) {
LOG.warn(eachClass + " cannot instantiate Function class because of " + e.getMessage());
continue;
}
String functionName = function.getClass().getAnnotation(Description.class).functionName();
String[] synonyms = function.getClass().getAnnotation(Description.class).synonyms();
String description = function.getClass().getAnnotation(Description.class).description();
String detail = function.getClass().getAnnotation(Description.class).detail();
String example = function.getClass().getAnnotation(Description.class).example();
Type returnType = function.getClass().getAnnotation(Description.class).returnType();
ParamTypes[] paramArray = function.getClass().getAnnotation(Description.class).paramTypes();
String[] allFunctionNames = null;
if(synonyms != null && synonyms.length > 0) {
allFunctionNames = new String[1 + synonyms.length];
allFunctionNames[0] = functionName;
System.arraycopy(synonyms, 0, allFunctionNames, 1, synonyms.length);
} else {
allFunctionNames = new String[]{functionName};
}
for(String eachFunctionName: allFunctionNames) {
for (ParamTypes params : paramArray) {
ParamOptionTypes[] paramOptionArray;
if(params.paramOptionTypes() == null ||
params.paramOptionTypes().getClass().getAnnotation(ParamTypes.class) == null) {
paramOptionArray = new ParamOptionTypes[0];
} else {
paramOptionArray = params.paramOptionTypes().getClass().getAnnotation(ParamTypes.class).paramOptionTypes();
}
Type[] paramTypes = params.paramTypes();
if (paramOptionArray.length > 0)
paramTypes = params.paramTypes().clone();
for (int i=0; i < paramOptionArray.length + 1; i++) {
FunctionDesc functionDesc = new FunctionDesc(eachFunctionName,
function.getClass(), function.getFunctionType(),
CatalogUtil.newSimpleDataType(returnType),
paramTypes.length == 0 ? CatalogUtil.newSimpleDataTypeArray() : CatalogUtil.newSimpleDataTypeArray(paramTypes));
functionDesc.setDescription(description);
functionDesc.setExample(example);
functionDesc.setDetail(detail);
sqlFuncs.add(functionDesc);
if (i != paramOptionArray.length) {
paramTypes = new Type[paramTypes.length +
paramOptionArray[i].paramOptionTypes().length];
System.arraycopy(params.paramTypes(), 0, paramTypes, 0, paramTypes.length);
System.arraycopy(paramOptionArray[i].paramOptionTypes(), 0, paramTypes, paramTypes.length,
paramOptionArray[i].paramOptionTypes().length);
}
}
}
}
}
return sqlFuncs;
}
public MasterContext getContext() {
return this.context;
}
@Override
public void start() {
LOG.info("TajoMaster startup");
super.start();
// Setting the system global configs
systemConf.setSocketAddr(ConfVars.CATALOG_ADDRESS.varname,
NetUtils.getConnectAddress(catalogServer.getBindAddress()));
try {
writeSystemConf();
} catch (IOException e) {
LOG.error(e.getMessage(), e);
}
initSystemMetrics();
}
private void writeSystemConf() throws IOException {
// Storing the system configs
Path systemConfPath = TajoConf.getSystemConfPath(systemConf);
if (!defaultFS.exists(systemConfPath.getParent())) {
defaultFS.mkdirs(systemConfPath.getParent());
}
if (defaultFS.exists(systemConfPath)) {
defaultFS.delete(systemConfPath, false);
}
FSDataOutputStream out = FileSystem.create(defaultFS, systemConfPath,
new FsPermission(SYSTEM_CONF_FILE_PERMISSION));
try {
systemConf.writeXml(out);
} finally {
out.close();
}
defaultFS.setReplication(systemConfPath, (short) systemConf.getIntVar(ConfVars.SYSTEM_CONF_REPLICA_COUNT));
}
@Override
public void stop() {
if (webServer != null) {
try {
webServer.stop();
} catch (Exception e) {
LOG.error(e);
}
}
IOUtils.cleanup(LOG, catalogServer);
if(systemMetrics != null) {
systemMetrics.stop();
}
RpcChannelFactory.shutdown();
super.stop();
LOG.info("Tajo Master main thread exiting");
}
public EventHandler getEventHandler() {
return dispatcher.getEventHandler();
}
public boolean isMasterRunning() {
return getServiceState() == STATE.STARTED;
}
public CatalogService getCatalog() {
return this.catalog;
}
public CatalogServer getCatalogServer() {
return this.catalogServer;
}
public AbstractStorageManager getStorageManager() {
return this.storeManager;
}
public class MasterContext {
private final TajoConf conf;
public MasterContext(TajoConf conf) {
this.conf = conf;
}
public TajoConf getConf() {
return conf;
}
public Clock getClock() {
return clock;
}
public QueryJobManager getQueryJobManager() {
return queryJobManager;
}
public WorkerResourceManager getResourceManager() {
return resourceManager;
}
public EventHandler getEventHandler() {
return dispatcher.getEventHandler();
}
public CatalogService getCatalog() {
return catalog;
}
public GlobalEngine getGlobalEngine() {
return globalEngine;
}
public AbstractStorageManager getStorageManager() {
return storeManager;
}
public TajoMasterService getTajoMasterService() {
return tajoMasterService;
}
public TajoSystemMetrics getSystemMetrics() {
return systemMetrics;
}
}
String getThreadTaskName(long id, String name) {
if (name == null) {
return Long.toString(id);
}
return id + " (" + name + ")";
}
public void dumpThread(Writer writer) {
PrintWriter stream = new PrintWriter(writer);
int STACK_DEPTH = 20;
boolean contention = threadBean.isThreadContentionMonitoringEnabled();
long[] threadIds = threadBean.getAllThreadIds();
stream.println("Process Thread Dump: Tajo Worker");
stream.println(threadIds.length + " active threads");
for (long tid : threadIds) {
ThreadInfo info = threadBean.getThreadInfo(tid, STACK_DEPTH);
if (info == null) {
stream.println(" Inactive");
continue;
}
stream.println("Thread " + getThreadTaskName(info.getThreadId(), info.getThreadName()) + ":");
Thread.State state = info.getThreadState();
stream.println(" State: " + state + ", Blocked count: " + info.getBlockedCount() +
", Waited count: " + info.getWaitedCount());
if (contention) {
stream.println(" Blocked time: " + info.getBlockedTime() + ", Waited time: " + info.getWaitedTime());
}
if (state == Thread.State.WAITING) {
stream.println(" Waiting on " + info.getLockName());
} else if (state == Thread.State.BLOCKED) {
stream.println(" Blocked on " + info.getLockName() +
", Blocked by " + getThreadTaskName(info.getLockOwnerId(), info.getLockOwnerName()));
}
stream.println(" Stack:");
for (StackTraceElement frame : info.getStackTrace()) {
stream.println(" " + frame.toString());
}
stream.println("");
}
}
public static List<File> getMountPath() throws Exception {
BufferedReader mountOutput = null;
try {
Process mountProcess = Runtime.getRuntime ().exec("mount");
mountOutput = new BufferedReader(new InputStreamReader(mountProcess.getInputStream()));
List<File> mountPaths = new ArrayList<File>();
while (true) {
String line = mountOutput.readLine();
if (line == null) {
break;
}
int indexStart = line.indexOf(" on /");
int indexEnd = line.indexOf(" ", indexStart + 4);
mountPaths.add(new File(line.substring (indexStart + 4, indexEnd)));
}
return mountPaths;
} catch (Exception e) {
e.printStackTrace();
throw e;
} finally {
if(mountOutput != null) {
mountOutput.close();
}
}
}
public static void main(String[] args) throws Exception {
StringUtils.startupShutdownMessage(TajoMaster.class, args, LOG);
try {
TajoMaster master = new TajoMaster();
ShutdownHookManager.get().addShutdownHook(new CompositeServiceShutdownHook(master), SHUTDOWN_HOOK_PRIORITY);
TajoConf conf = new TajoConf(new YarnConfiguration());
master.init(conf);
master.start();
} catch (Throwable t) {
LOG.fatal("Error starting TajoMaster", t);
System.exit(-1);
}
}
}