blob: bcfb9388b16c0019ae427ca23aae62c24221dabb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.querymaster;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.tajo.*;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.algebra.JsonHelper;
import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.master.event.*;
import org.apache.tajo.plan.LogicalOptimizer;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.LogicalPlanner;
import org.apache.tajo.plan.logical.LogicalNode;
import org.apache.tajo.plan.logical.LogicalRootNode;
import org.apache.tajo.plan.logical.NodeType;
import org.apache.tajo.plan.logical.ScanNode;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.resource.NodeResource;
import org.apache.tajo.rpc.*;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.session.Session;
import org.apache.tajo.storage.FormatProperty;
import org.apache.tajo.storage.Tablespace;
import org.apache.tajo.storage.TablespaceManager;
import org.apache.tajo.util.RpcParameterFactory;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.worker.event.NodeResourceDeallocateEvent;
import org.apache.tajo.worker.event.NodeResourceEvent;
import org.apache.tajo.worker.event.NodeStatusEvent;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.tajo.ResourceProtos.TaskFatalErrorReport;
import static org.apache.tajo.TajoProtos.QueryState;
public class QueryMasterTask extends CompositeService {
private static final Log LOG = LogFactory.getLog(QueryMasterTask.class.getName());
private QueryId queryId;
private Session session;
private QueryContext queryContext;
private QueryMasterTaskContext queryTaskContext;
private QueryMaster.QueryMasterContext queryMasterContext;
private Query query;
private String jsonExpr;
private AsyncDispatcher dispatcher;
private final long querySubmitTime;
private final Map<Integer, TableDesc> tableDescMap = new HashMap<>();
private TajoConf systemConf;
private Properties rpcParams;
private AtomicLong lastClientHeartbeat = new AtomicLong(-1);
private volatile boolean isStopped;
private Throwable initError;
private NodeResource allocation;
private final List<TaskFatalErrorReport> diagnostics = new ArrayList<TaskFatalErrorReport>();
private final ConcurrentMap<Integer, WorkerConnectionInfo> workerMap = Maps.newConcurrentMap();
public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext,
QueryId queryId, Session session, QueryContext queryContext,
String jsonExpr, NodeResource allocation, AsyncDispatcher dispatcher) {
super(QueryMasterTask.class.getName());
this.queryMasterContext = queryMasterContext;
this.queryId = queryId;
this.session = session;
this.queryContext = queryContext;
this.jsonExpr = jsonExpr;
this.allocation = allocation;
this.querySubmitTime = System.currentTimeMillis();
this.dispatcher = dispatcher;
}
public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext,
QueryId queryId, Session session, QueryContext queryContext,
String jsonExpr,
NodeResource allocation) {
this(queryMasterContext, queryId, session, queryContext, jsonExpr, allocation, new AsyncDispatcher());
}
@Override
public void serviceInit(Configuration conf) throws Exception {
systemConf = TUtil.checkTypeAndGet(conf, TajoConf.class);
rpcParams = RpcParameterFactory.get(systemConf);
queryTaskContext = new QueryMasterTaskContext();
addService(dispatcher);
dispatcher.register(StageEventType.class, new StageEventDispatcher());
dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher());
dispatcher.register(QueryMasterQueryCompletedEvent.EventType.class, new QueryFinishEventHandler());
dispatcher.register(TaskSchedulerEvent.EventType.class, new TaskSchedulerDispatcher());
dispatcher.register(LocalTaskEventType.class, new LocalTaskEventHandler());
super.serviceInit(systemConf);
}
public boolean isStopped() {
return isStopped;
}
@Override
public void serviceStart() throws Exception {
startQuery();
List<TajoProtos.WorkerConnectionInfoProto> workersProto = queryMasterContext.getQueryMaster().getAllWorker();
for (TajoProtos.WorkerConnectionInfoProto worker : workersProto) {
workerMap.put(worker.getId(), new WorkerConnectionInfo(worker));
}
super.serviceStart();
}
@Override
public void serviceStop() throws Exception {
isStopped = true;
LOG.info("Stopping QueryMasterTask:" + queryId);
//release QM resource
EventHandler handler = getQueryTaskContext().getQueryMasterContext().getWorkerContext().
getNodeResourceManager().getDispatcher().getEventHandler();
handler.handle(new NodeResourceDeallocateEvent(allocation, NodeResourceEvent.ResourceType.QUERY_MASTER));
//flush current node resource
handler.handle(new NodeStatusEvent(NodeStatusEvent.EventType.FLUSH_REPORTS));
if (!queryContext.getBool(SessionVars.DEBUG_ENABLED)) {
cleanupQuery(getQueryId());
}
super.serviceStop();
LOG.info("Stopped QueryMasterTask:" + queryId);
}
public void handleTaskFailed(TaskFatalErrorReport report) {
synchronized(diagnostics) {
if (diagnostics.size() < 10) {
diagnostics.add(report);
}
}
getEventHandler().handle(new TaskFatalErrorEvent(report));
}
public Collection<TaskFatalErrorReport> getDiagnostics() {
synchronized(diagnostics) {
return Collections.unmodifiableCollection(diagnostics);
}
}
private class StageEventDispatcher implements EventHandler<StageEvent> {
public void handle(StageEvent event) {
ExecutionBlockId id = event.getStageId();
if(LOG.isDebugEnabled()) {
LOG.debug("StageEventDispatcher:" + id + "," + event.getType());
}
query.getStage(id).handle(event);
}
}
private class TaskEventDispatcher
implements EventHandler<TaskEvent> {
public void handle(TaskEvent event) {
TaskId taskId = event.getTaskId();
if(LOG.isDebugEnabled()) {
LOG.debug("TaskEventDispatcher>" + taskId + "," + event.getType());
}
Task task = query.getStage(taskId.getExecutionBlockId()).
getTask(taskId);
task.handle(event);
}
}
private class TaskAttemptEventDispatcher
implements EventHandler<TaskAttemptEvent> {
public void handle(TaskAttemptEvent event) {
TaskAttemptId attemptId = event.getTaskAttemptId();
Stage stage = query.getStage(attemptId.getTaskId().getExecutionBlockId());
Task task = stage.getTask(attemptId.getTaskId());
TaskAttempt attempt = task.getAttempt(attemptId);
attempt.handle(event);
}
}
private class TaskSchedulerDispatcher
implements EventHandler<TaskSchedulerEvent> {
public void handle(TaskSchedulerEvent event) {
Stage stage = query.getStage(event.getExecutionBlockId());
stage.getTaskScheduler().handle(event);
}
}
/**
* It sends a kill RPC request to a corresponding worker.
*
* @param workerId worker unique Id.
* @param taskAttemptId The TaskAttemptId to be killed.
*/
protected void killTaskAttempt(int workerId, TaskAttemptId taskAttemptId) {
NettyClientBase tajoWorkerRpc;
ExecutionBlockId ebId = taskAttemptId.getTaskId().getExecutionBlockId();
InetSocketAddress workerAddress = getQuery().getStage(ebId).getAssignedWorkerMap().get(workerId);
try {
tajoWorkerRpc = RpcClientManager.getInstance().getClient(workerAddress, TajoWorkerProtocol.class, true,
rpcParams);
TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub();
CallFuture<PrimitiveProtos.BoolProto> callFuture = new CallFuture<PrimitiveProtos.BoolProto>();
tajoWorkerRpcClient.killTaskAttempt(null, taskAttemptId.getProto(), callFuture);
if(!callFuture.get().getValue()){
getEventHandler().handle(
new TaskFatalErrorEvent(taskAttemptId, "Can't kill task :" + taskAttemptId));
}
} catch (Exception e) {
/* Node RPC failure */
LOG.error(e.getMessage(), e);
getEventHandler().handle(new TaskFatalErrorEvent(taskAttemptId, e.getMessage()));
}
}
private class LocalTaskEventHandler implements EventHandler<LocalTaskEvent> {
@Override
public void handle(final LocalTaskEvent event) {
queryMasterContext.getEventExecutor().submit(new Runnable() {
@Override
public void run() {
killTaskAttempt(event.getWorkerId(), event.getTaskAttemptId());
}
});
}
}
private class QueryFinishEventHandler implements EventHandler<QueryMasterQueryCompletedEvent> {
@Override
public void handle(QueryMasterQueryCompletedEvent event) {
QueryId queryId = event.getQueryId();
LOG.info("Query completion notified from " + queryId + " final state: " + query.getSynchronizedState());
queryMasterContext.getEventHandler().handle(new QueryStopEvent(queryId));
}
}
private static boolean isTerminatedState(QueryState state) {
return
state == QueryState.QUERY_SUCCEEDED ||
state == QueryState.QUERY_FAILED ||
state == QueryState.QUERY_KILLED ||
state == QueryState.QUERY_ERROR;
}
private LogicalPlan plan;
public synchronized void startQuery() {
Tablespace space = null;
try {
if (query != null) {
LOG.warn("Query already started");
return;
}
LOG.info(SessionVars.INDEX_ENABLED.keyname() + " : " + queryContext.getBool(SessionVars.INDEX_ENABLED));
CatalogService catalog = getQueryTaskContext().getQueryMasterContext().getWorkerContext().getCatalog();
LogicalPlanner planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
LogicalOptimizer optimizer = new LogicalOptimizer(systemConf, catalog);
Expr expr = JsonHelper.fromJson(jsonExpr, Expr.class);
jsonExpr = null; // remove the possible OOM
plan = planner.createPlan(queryContext, expr);
optimizer.optimize(queryContext, plan);
// when a given uri is null, TablespaceManager.get will return the default tablespace.
space = TablespaceManager.get(queryContext.get(QueryVars.OUTPUT_TABLE_URI, ""));
space.rewritePlan(queryContext, plan);
initStagingDir();
for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
LogicalNode[] scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.SCAN);
if (scanNodes != null) {
for (LogicalNode eachScanNode : scanNodes) {
ScanNode scanNode = (ScanNode) eachScanNode;
tableDescMap.put(scanNode.getPID(), scanNode.getTableDesc());
}
}
scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.PARTITIONS_SCAN);
if (scanNodes != null) {
for (LogicalNode eachScanNode : scanNodes) {
ScanNode scanNode = (ScanNode) eachScanNode;
tableDescMap.put(scanNode.getPID(), scanNode.getTableDesc());
}
}
scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.INDEX_SCAN);
if (scanNodes != null) {
for (LogicalNode eachScanNode : scanNodes) {
ScanNode scanNode = (ScanNode) eachScanNode;
tableDescMap.put(scanNode.getPID(), scanNode.getTableDesc());
}
}
}
MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
queryMasterContext.getGlobalPlanner().build(queryContext, masterPlan);
query = new Query(queryTaskContext, queryId, querySubmitTime,
"", queryTaskContext.getEventHandler(), masterPlan);
dispatcher.register(QueryEventType.class, query);
queryTaskContext.getEventHandler().handle(new QueryEvent(queryId, QueryEventType.START));
} catch (Throwable t) {
LOG.error(t.getMessage(), t);
initError = t;
if (plan != null && space != null) {
LogicalRootNode rootNode = plan.getRootBlock().getRoot();
try {
space.rollbackTable(rootNode.getChild());
} catch (Throwable e) {
LOG.warn(query.getId() + ", failed processing cleanup storage when query failed:" + e.getMessage(), e);
}
}
}
}
private void initStagingDir() throws IOException {
URI stagingDir;
try {
Tablespace tablespace = TablespaceManager.get(queryContext.get(QueryVars.OUTPUT_TABLE_URI, ""));
TableDesc desc = PlannerUtil.getOutputTableDesc(plan);
FormatProperty formatProperty = tablespace.getFormatProperty(desc.getMeta());
if (formatProperty.isStagingSupport()) {
stagingDir = tablespace.prepareStagingSpace(systemConf, queryId.toString(), queryContext, desc.getMeta());
// Create a staging space
LOG.info("The staging dir '" + stagingDir + "' is created.");
queryContext.setStagingDir(stagingDir);
}
} catch (IOException ioe) {
LOG.warn("Creating staging space has been failed.", ioe);
throw ioe;
}
}
public Query getQuery() {
return query;
}
protected void expireQuerySession() {
if(!isTerminatedState(query.getState()) && !(query.getState() == QueryState.QUERY_KILL_WAIT)){
query.handle(new QueryEvent(queryId, QueryEventType.KILL));
}
}
public QueryMasterTaskContext getQueryTaskContext() {
return queryTaskContext;
}
public EventHandler getEventHandler() {
return queryTaskContext.getEventHandler();
}
public void touchSessionTime() {
this.lastClientHeartbeat.set(System.currentTimeMillis());
}
public long getLastClientHeartbeat() {
return this.lastClientHeartbeat.get();
}
public QueryId getQueryId() {
return queryId;
}
public boolean isInitError() {
return initError != null;
}
public QueryState getState() {
if(query == null) {
if (isInitError()) {
return QueryState.QUERY_ERROR;
} else {
return QueryState.QUERY_NOT_ASSIGNED;
}
} else {
return query.getState();
}
}
public Throwable getInitError() {
return initError;
}
public String getErrorMessage() {
if (isInitError()) {
return StringUtils.stringifyException(initError);
} else {
return null;
}
}
public long getQuerySubmitTime() {
return this.querySubmitTime;
}
private void cleanupQuery(final QueryId queryId) {
if (getQuery() != null) {
Set<InetSocketAddress> workers = Sets.newHashSet();
for (Stage stage : getQuery().getStages()) {
workers.addAll(stage.getAssignedWorkerMap().values());
}
LOG.info("Cleanup resources of all workers. Query: " + queryId + ", workers: " + workers.size());
for (final InetSocketAddress worker : workers) {
queryMasterContext.getEventExecutor().submit(new Runnable() {
@Override
public void run() {
try {
AsyncRpcClient rpc = RpcClientManager.getInstance().getClient(worker, TajoWorkerProtocol.class, true,
rpcParams);
TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub();
tajoWorkerProtocolService.stopQuery(null, queryId.getProto(), NullCallback.get());
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
}
}
});
}
}
}
public class QueryMasterTaskContext {
EventHandler eventHandler;
public QueryMaster.QueryMasterContext getQueryMasterContext() {
return queryMasterContext;
}
public Session getSession() {
return session;
}
public QueryContext getQueryContext() {
return queryContext;
}
public TajoConf getConf() {
return systemConf;
}
public Clock getClock() {
return queryMasterContext.getClock();
}
public Query getQuery() {
return query;
}
public QueryId getQueryId() {
return queryId;
}
public Path getStagingDir() {
return queryContext.getStagingDir();
}
public synchronized EventHandler getEventHandler() {
if(eventHandler == null) {
eventHandler = dispatcher.getEventHandler();
}
return eventHandler;
}
public AsyncDispatcher getDispatcher() {
return dispatcher;
}
public Stage getStage(ExecutionBlockId id) {
return query.getStage(id);
}
public TableDesc getTableDesc(ScanNode scanNode) {
return tableDescMap.get(scanNode.getPID());
}
public float getProgress() {
if(query == null) {
return 0.0f;
}
return query.getProgress();
}
/**
* A key is worker id, and a value is a worker connection information.
*/
public ConcurrentMap<Integer, WorkerConnectionInfo> getWorkerMap() {
return workerMap;
}
}
}