blob: f81271521d7e54443da60a3462484f74459e12f6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.master.querymaster;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.tajo.*;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.algebra.JsonHelper;
import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.LogicalOptimizer;
import org.apache.tajo.engine.planner.LogicalPlan;
import org.apache.tajo.engine.planner.LogicalPlanner;
import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.engine.planner.logical.NodeType;
import org.apache.tajo.engine.planner.logical.ScanNode;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.exception.UnimplementedException;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.GlobalEngine;
import org.apache.tajo.master.TajoAsyncDispatcher;
import org.apache.tajo.master.TajoContainerProxy;
import org.apache.tajo.master.event.*;
import org.apache.tajo.master.rm.TajoWorkerResourceManager;
import org.apache.tajo.master.session.Session;
import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.RpcConnectionPool;
import org.apache.tajo.storage.AbstractStorageManager;
import org.apache.tajo.util.metrics.TajoMetrics;
import org.apache.tajo.util.metrics.reporter.MetricsConsoleReporter;
import org.apache.tajo.worker.AbstractResourceAllocator;
import org.apache.tajo.worker.TajoResourceAllocator;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.tajo.TajoProtos.QueryState;
public class QueryMasterTask extends CompositeService {
private static final Log LOG = LogFactory.getLog(QueryMasterTask.class.getName());
// query submission directory is private!
final public static FsPermission STAGING_DIR_PERMISSION =
FsPermission.createImmutable((short) 0700); // rwx--------
private QueryId queryId;
private Session session;
private QueryContext queryContext;
private QueryMasterTaskContext queryTaskContext;
private QueryMaster.QueryMasterContext queryMasterContext;
private Query query;
private MasterPlan masterPlan;
private String jsonExpr;
private String logicalPlanJson;
private TajoAsyncDispatcher dispatcher;
private final long querySubmitTime;
private Map<String, TableDesc> tableDescMap = new HashMap<String, TableDesc>();
private TajoConf systemConf;
private AtomicLong lastClientHeartbeat = new AtomicLong(-1);
private AbstractResourceAllocator resourceAllocator;
private AtomicBoolean stopped = new AtomicBoolean(false);
private TajoMetrics queryMetrics;
private Throwable initError;
private final List<TajoWorkerProtocol.TaskFatalErrorReport> diagnostics =
new ArrayList<TajoWorkerProtocol.TaskFatalErrorReport>();
public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext,
QueryId queryId, Session session, QueryContext queryContext, String jsonExpr,
String logicalPlanJson) {
super(QueryMasterTask.class.getName());
this.queryMasterContext = queryMasterContext;
this.queryId = queryId;
this.session = session;
this.queryContext = queryContext;
this.jsonExpr = jsonExpr;
this.logicalPlanJson = logicalPlanJson;
this.querySubmitTime = System.currentTimeMillis();
}
@Override
public void init(Configuration conf) {
systemConf = (TajoConf)conf;
try {
queryTaskContext = new QueryMasterTaskContext();
String resourceManagerClassName = systemConf.getVar(TajoConf.ConfVars.RESOURCE_MANAGER_CLASS);
if(resourceManagerClassName.indexOf(TajoWorkerResourceManager.class.getName()) >= 0) {
resourceAllocator = new TajoResourceAllocator(queryTaskContext);
} else {
throw new UnimplementedException(resourceManagerClassName + " is not supported yet");
}
addService(resourceAllocator);
dispatcher = new TajoAsyncDispatcher(queryId.toString());
addService(dispatcher);
dispatcher.register(SubQueryEventType.class, new SubQueryEventDispatcher());
dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher());
dispatcher.register(QueryMasterQueryCompletedEvent.EventType.class, new QueryFinishEventHandler());
dispatcher.register(TaskSchedulerEvent.EventType.class, new TaskSchedulerDispatcher());
dispatcher.register(LocalTaskEventType.class, new LocalTaskEventHandler());
initStagingDir();
queryMetrics = new TajoMetrics(queryId.toString());
super.init(systemConf);
} catch (Throwable t) {
LOG.error(t.getMessage(), t);
initError = t;
}
}
public boolean isStopped() {
return stopped.get();
}
@Override
public void start() {
startQuery();
super.start();
}
@Override
public void stop() {
if(stopped.getAndSet(true)) {
return;
}
LOG.info("Stopping QueryMasterTask:" + queryId);
CallFuture future = new CallFuture();
RpcConnectionPool connPool = RpcConnectionPool.getPool(queryMasterContext.getConf());
NettyClientBase tmClient = null;
try {
tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
TajoMasterProtocol.class, true);
TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
masterClientService.stopQueryMaster(null, queryId.getProto(), future);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
} finally {
connPool.releaseConnection(tmClient);
}
try {
future.get(3, TimeUnit.SECONDS);
} catch (Throwable t) {
LOG.warn(t);
}
super.stop();
//TODO change report to tajo master
queryMetrics.report(new MetricsConsoleReporter());
LOG.info("Stopped QueryMasterTask:" + queryId);
}
public void handleTaskRequestEvent(TaskRequestEvent event) {
ExecutionBlockId id = event.getExecutionBlockId();
query.getSubQuery(id).handleTaskRequestEvent(event);
}
public void handleTaskFailed(TajoWorkerProtocol.TaskFatalErrorReport report) {
synchronized(diagnostics) {
if (diagnostics.size() < 10) {
diagnostics.add(report);
}
}
getEventHandler().handle(new TaskFatalErrorEvent(report));
}
public Collection<TajoWorkerProtocol.TaskFatalErrorReport> getDiagnostics() {
synchronized(diagnostics) {
return Collections.unmodifiableCollection(diagnostics);
}
}
private class SubQueryEventDispatcher implements EventHandler<SubQueryEvent> {
public void handle(SubQueryEvent event) {
ExecutionBlockId id = event.getSubQueryId();
if(LOG.isDebugEnabled()) {
LOG.debug("SubQueryEventDispatcher:" + id + "," + event.getType());
}
query.getSubQuery(id).handle(event);
}
}
private class TaskEventDispatcher
implements EventHandler<TaskEvent> {
public void handle(TaskEvent event) {
QueryUnitId taskId = event.getTaskId();
if(LOG.isDebugEnabled()) {
LOG.debug("TaskEventDispatcher>" + taskId + "," + event.getType());
}
QueryUnit task = query.getSubQuery(taskId.getExecutionBlockId()).
getQueryUnit(taskId);
task.handle(event);
}
}
private class TaskAttemptEventDispatcher
implements EventHandler<TaskAttemptEvent> {
public void handle(TaskAttemptEvent event) {
QueryUnitAttemptId attemptId = event.getTaskAttemptId();
SubQuery subQuery = query.getSubQuery(attemptId.getQueryUnitId().getExecutionBlockId());
QueryUnit task = subQuery.getQueryUnit(attemptId.getQueryUnitId());
QueryUnitAttempt attempt = task.getAttempt(attemptId);
attempt.handle(event);
}
}
private class TaskSchedulerDispatcher
implements EventHandler<TaskSchedulerEvent> {
public void handle(TaskSchedulerEvent event) {
SubQuery subQuery = query.getSubQuery(event.getExecutionBlockId());
subQuery.getTaskScheduler().handle(event);
}
}
private class LocalTaskEventHandler implements EventHandler<LocalTaskEvent> {
@Override
public void handle(LocalTaskEvent event) {
TajoContainerProxy proxy = (TajoContainerProxy) resourceAllocator.getContainers().get(event.getContainerId());
if (proxy != null) {
proxy.killTaskAttempt(event.getTaskAttemptId());
}
}
}
private class QueryFinishEventHandler implements EventHandler<QueryMasterQueryCompletedEvent> {
@Override
public void handle(QueryMasterQueryCompletedEvent event) {
QueryId queryId = event.getQueryId();
LOG.info("Query completion notified from " + queryId);
while (!isTerminatedState(query.getState())) {
try {
synchronized (this) {
wait(10);
}
} catch (InterruptedException e) {
LOG.error(e);
}
}
LOG.info("Query final state: " + query.getState());
queryMasterContext.stopQuery(queryId);
}
private boolean isTerminatedState(QueryState state) {
return
state == QueryState.QUERY_SUCCEEDED ||
state == QueryState.QUERY_FAILED ||
state == QueryState.QUERY_KILLED ||
state == QueryState.QUERY_ERROR;
}
}
public synchronized void startQuery() {
try {
if (query != null) {
LOG.warn("Query already started");
return;
}
CatalogService catalog = getQueryTaskContext().getQueryMasterContext().getWorkerContext().getCatalog();
LogicalPlanner planner = new LogicalPlanner(catalog);
LogicalOptimizer optimizer = new LogicalOptimizer(systemConf);
Expr expr = JsonHelper.fromJson(jsonExpr, Expr.class);
LogicalPlan plan = planner.createPlan(session, expr);
optimizer.optimize(plan);
GlobalEngine.DistributedQueryHookManager hookManager = new GlobalEngine.DistributedQueryHookManager();
hookManager.addHook(new GlobalEngine.InsertHook());
hookManager.doHooks(queryContext, plan);
for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
LogicalNode[] scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.SCAN);
if (scanNodes != null) {
for (LogicalNode eachScanNode : scanNodes) {
ScanNode scanNode = (ScanNode) eachScanNode;
tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
}
}
scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.PARTITIONS_SCAN);
if (scanNodes != null) {
for (LogicalNode eachScanNode : scanNodes) {
ScanNode scanNode = (ScanNode) eachScanNode;
tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
}
}
}
MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
queryMasterContext.getGlobalPlanner().build(masterPlan);
query = new Query(queryTaskContext, queryId, querySubmitTime,
"", queryTaskContext.getEventHandler(), masterPlan);
dispatcher.register(QueryEventType.class, query);
queryTaskContext.getEventHandler().handle(new QueryEvent(queryId, QueryEventType.START));
} catch (Throwable t) {
LOG.error(t.getMessage(), t);
initError = t;
}
}
private void initStagingDir() throws IOException {
Path stagingDir = null;
Path outputDir = null;
FileSystem defaultFS = TajoConf.getWarehouseDir(systemConf).getFileSystem(systemConf);
try {
stagingDir = initStagingDir(systemConf, defaultFS, queryId.toString());
defaultFS.mkdirs(new Path(stagingDir, TajoConstants.RESULT_DIR_NAME));
// Create a subdirectories
LOG.info("The staging dir '" + stagingDir + "' is created.");
queryContext.setStagingDir(stagingDir);
/////////////////////////////////////////////////
// Check and Create Output Directory If Necessary
/////////////////////////////////////////////////
if (queryContext.hasOutputPath()) {
outputDir = queryContext.getOutputPath();
if (!queryContext.isOutputOverwrite()) {
if (defaultFS.exists(outputDir)) {
throw new IOException("The output directory '" + outputDir + " already exists.");
}
}
}
} catch (IOException ioe) {
if (stagingDir != null && defaultFS.exists(stagingDir)) {
defaultFS.delete(stagingDir, true);
LOG.info("The staging directory '" + stagingDir + "' is deleted");
}
throw ioe;
}
}
/**
* It initializes the final output and staging directory and sets
* them to variables.
*/
public static Path initStagingDir(TajoConf conf, FileSystem fs, String queryId) throws IOException {
String realUser;
String currentUser;
UserGroupInformation ugi;
ugi = UserGroupInformation.getLoginUser();
realUser = ugi.getShortUserName();
currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
Path stagingDir = null;
////////////////////////////////////////////
// Create Output Directory
////////////////////////////////////////////
stagingDir = new Path(TajoConf.getStagingDir(conf), queryId);
if (fs.exists(stagingDir)) {
throw new IOException("The staging directory '" + stagingDir + "' already exists");
}
fs.mkdirs(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
FileStatus fsStatus = fs.getFileStatus(stagingDir);
String owner = fsStatus.getOwner();
if (!owner.isEmpty() && !(owner.equals(currentUser) || owner.equals(realUser))) {
throw new IOException("The ownership on the user's query " +
"directory " + stagingDir + " is not as expected. " +
"It is owned by " + owner + ". The directory must " +
"be owned by the submitter " + currentUser + " or " +
"by " + realUser);
}
if (!fsStatus.getPermission().equals(STAGING_DIR_PERMISSION)) {
LOG.info("Permissions on staging directory " + stagingDir + " are " +
"incorrect: " + fsStatus.getPermission() + ". Fixing permissions " +
"to correct value " + STAGING_DIR_PERMISSION);
fs.setPermission(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
}
return stagingDir;
}
public Query getQuery() {
return query;
}
public void expiredSessionTimeout() {
stop();
}
public QueryMasterTaskContext getQueryTaskContext() {
return queryTaskContext;
}
public EventHandler getEventHandler() {
return queryTaskContext.getEventHandler();
}
public void touchSessionTime() {
this.lastClientHeartbeat.set(System.currentTimeMillis());
}
public long getLastClientHeartbeat() {
return this.lastClientHeartbeat.get();
}
public QueryId getQueryId() {
return queryId;
}
public boolean isInitError() {
return initError != null;
}
public QueryState getState() {
if(query == null) {
if (isInitError()) {
return QueryState.QUERY_ERROR;
} else {
return QueryState.QUERY_NOT_ASSIGNED;
}
} else {
return query.getState();
}
}
public String getErrorMessage() {
if (isInitError()) {
return StringUtils.stringifyException(initError);
} else {
return null;
}
}
public long getQuerySubmitTime() {
return this.querySubmitTime;
}
public class QueryMasterTaskContext {
EventHandler eventHandler;
public QueryMaster.QueryMasterContext getQueryMasterContext() {
return queryMasterContext;
}
public Session getSession() {
return session;
}
public QueryContext getQueryContext() {
return queryContext;
}
public TajoConf getConf() {
return systemConf;
}
public Clock getClock() {
return queryMasterContext.getClock();
}
public Query getQuery() {
return query;
}
public QueryId getQueryId() {
return queryId;
}
public AbstractStorageManager getStorageManager() {
return queryMasterContext.getStorageManager();
}
public Path getStagingDir() {
return queryContext.getStagingDir();
}
public synchronized EventHandler getEventHandler() {
if(eventHandler == null) {
eventHandler = dispatcher.getEventHandler();
}
return eventHandler;
}
public TajoAsyncDispatcher getDispatcher() {
return dispatcher;
}
public SubQuery getSubQuery(ExecutionBlockId id) {
return query.getSubQuery(id);
}
public Map<String, TableDesc> getTableDescMap() {
return tableDescMap;
}
public float getProgress() {
if(query == null) {
return 0.0f;
}
return query.getProgress();
}
public AbstractResourceAllocator getResourceAllocator() {
return resourceAllocator;
}
public TajoMetrics getQueryMetrics() {
return queryMetrics;
}
}
}