blob: a8f3324216efef581870c8a9e522a7f04732ad79 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.master.exec;
import com.google.protobuf.ByteString;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.BuiltinStorages;
import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.SessionVars;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.global.GlobalPlanner;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.planner.physical.EvalExprExec;
import org.apache.tajo.engine.planner.physical.InsertRowsExec;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.exception.DuplicateIndexException;
import org.apache.tajo.exception.TajoException;
import org.apache.tajo.exception.TajoInternalError;
import org.apache.tajo.exception.UnsupportedException;
import org.apache.tajo.ipc.ClientProtos.SerializedResultSet;
import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse.ResultType;
import org.apache.tajo.master.QueryInfo;
import org.apache.tajo.master.QueryManager;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.master.exec.prehook.CreateTableHook;
import org.apache.tajo.master.exec.prehook.DistributedQueryHookManager;
import org.apache.tajo.master.exec.prehook.InsertIntoHook;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.Target;
import org.apache.tajo.plan.expr.EvalContext;
import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.plan.expr.GeneralFunctionEval;
import org.apache.tajo.plan.function.python.PythonScriptEngine;
import org.apache.tajo.plan.function.python.TajoScriptEngine;
import org.apache.tajo.plan.logical.*;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.session.Session;
import org.apache.tajo.storage.*;
import org.apache.tajo.tuple.memory.MemoryBlock;
import org.apache.tajo.tuple.memory.MemoryRowBlock;
import org.apache.tajo.util.KeyValueSet;
import org.apache.tajo.util.ProtoUtil;
import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import static org.apache.tajo.exception.ReturnStateUtil.OK;
import static org.apache.tajo.exception.ReturnStateUtil.errUndefinedDatabase;
public class QueryExecutor {
private static final Log LOG = LogFactory.getLog(QueryExecutor.class);
private final TajoMaster.MasterContext context;
private final CatalogService catalog;
private final DistributedQueryHookManager hookManager;
private final DDLExecutor ddlExecutor;
public QueryExecutor(TajoMaster.MasterContext context, DDLExecutor ddlExecutor) {
this.context = context;
this.catalog = context.getCatalog();
this.ddlExecutor = ddlExecutor;
this.hookManager = new DistributedQueryHookManager();
this.hookManager.addHook(new CreateTableHook());
this.hookManager.addHook(new InsertIntoHook());
}
public SubmitQueryResponse execute(QueryContext queryContext, Session session, String sql, String jsonExpr,
LogicalPlan plan) throws Exception {
SubmitQueryResponse.Builder response = SubmitQueryResponse.newBuilder();
response.setUserName(queryContext.get(SessionVars.USERNAME));
LogicalRootNode rootNode = plan.getRootBlock().getRoot();
if (PlannerUtil.checkIfSetSession(rootNode)) {
execSetSession(session, plan, response);
} else if (PlannerUtil.checkIfDDLPlan(rootNode)) {
if (PlannerUtil.isDistExecDDL(rootNode)) {
if (rootNode.getChild().getType() == NodeType.CREATE_INDEX) {
checkIndexExistence(queryContext, (CreateIndexNode) rootNode.getChild());
}
executeDistributedQuery(queryContext, session, plan, sql, jsonExpr, response);
} else {
ddlExecutor.execute(queryContext, plan);
response.setState(OK);
response.setResultType(ResultType.NO_RESULT);
}
} else if (plan.isExplain()) { // explain query
execExplain(session, sql, plan, queryContext, plan.isExplainGlobal(), response);
} else if (PlannerUtil.checkIfQueryTargetIsVirtualTable(plan)) {
execQueryOnVirtualTable(queryContext, session, sql, plan, response);
// Simple query indicates a form of 'select * from tb_name [LIMIT X];'.
} else if (PlannerUtil.checkIfSimpleQuery(plan)) {
execSimpleQuery(queryContext, session, sql, plan, response);
// NonFromQuery indicates a form of 'select a, x+y;'
} else if (PlannerUtil.checkIfNonFromQuery(plan)) {
execNonFromQuery(queryContext, session, sql, plan, response);
} else { // it requires distributed execution. So, the query is forwarded to a query master.
// Checking if CTAS is already finished due to 'IF NOT EXISTS' option
if (checkIfCtasAlreadyDone(rootNode)) {
response.setState(OK);
response.setResultType(ResultType.NO_RESULT);
} else {
executeDistributedQuery(queryContext, session, plan, sql, jsonExpr, response);
}
}
response.setSessionVars(ProtoUtil.convertFromMap(session.getAllVariables()));
return response.build();
}
/**
* Check if CTAS is already done
* @param rootNode
* @return
*/
private boolean checkIfCtasAlreadyDone(LogicalNode rootNode) {
if (rootNode.getChild(0).getType() == NodeType.CREATE_TABLE) {
CreateTableNode createTable = (CreateTableNode) rootNode.getChild(0);
if (createTable.isIfNotExists() && catalog.existsTable(createTable.getTableName())) {
return true;
}
}
return false;
}
public void execSetSession(Session session, LogicalPlan plan,
SubmitQueryResponse.Builder response) {
SetSessionNode setSessionNode = ((LogicalRootNode) plan.getRootBlock().getRoot()).getChild();
final String varName = setSessionNode.getName();
// SET CATALOG 'XXX'
if (varName.equals(SessionVars.CURRENT_DATABASE.name())) {
String databaseName = setSessionNode.getValue();
if (catalog.existDatabase(databaseName)) {
session.selectDatabase(setSessionNode.getValue());
} else {
response.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
response.setState(errUndefinedDatabase(databaseName));
}
// others
} else {
if (setSessionNode.hasValue()) {
session.setVariable(varName, setSessionNode.getValue());
} else {
session.removeVariable(varName);
}
}
response.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
response.setState(OK);
}
public void execExplain(Session session, String query, LogicalPlan plan, QueryContext queryContext, boolean isGlobal,
SubmitQueryResponse.Builder response) throws Exception {
String explainStr;
boolean isTest = queryContext.getBool(SessionVars.TEST_PLAN_SHAPE_FIX_ENABLED);
if (isTest) {
ExplainPlanPreprocessorForTest preprocessorForTest = new ExplainPlanPreprocessorForTest();
preprocessorForTest.prepareTest(plan);
}
if (isGlobal) {
GlobalPlanner planner = new GlobalPlanner(context.getConf(), context.getCatalog());
MasterPlan masterPlan = compileMasterPlan(plan, queryContext, planner);
if (isTest) {
ExplainGlobalPlanPreprocessorForTest globalPlanPreprocessorForTest = new ExplainGlobalPlanPreprocessorForTest();
globalPlanPreprocessorForTest.prepareTest(masterPlan);
}
explainStr = masterPlan.toString();
} else {
explainStr = PlannerUtil.buildExplainString(plan.getRootBlock().getRoot());
}
Schema schema = new Schema();
schema.addColumn("explain", TajoDataTypes.Type.TEXT);
SerializedResultSet.Builder serializedResBuilder = SerializedResultSet.newBuilder();
MemoryRowBlock rowBlock = new MemoryRowBlock(SchemaUtil.toDataTypes(schema));
String[] lines = explainStr.split("\n");
try {
for (String line : lines) {
rowBlock.getWriter().startRow();
rowBlock.getWriter().putText(line);
rowBlock.getWriter().endRow();
}
MemoryBlock memoryBlock = rowBlock.getMemory();
ByteBuffer uncompressed = memoryBlock.getBuffer().nioBuffer(0, memoryBlock.readableBytes());
int uncompressedLength = uncompressed.remaining();
serializedResBuilder.setDecompressedLength(uncompressedLength);
serializedResBuilder.setSerializedTuples(ByteString.copyFrom(uncompressed));
serializedResBuilder.setSchema(schema.getProto());
serializedResBuilder.setRows(rowBlock.rows());
} finally {
rowBlock.release();
}
QueryInfo queryInfo = context.getQueryJobManager().createNewSimpleQuery(queryContext, session, query,
(LogicalRootNode) plan.getRootBlock().getRoot());
response.setState(OK);
response.setQueryId(queryInfo.getQueryId().getProto());
response.setResultType(ResultType.ENCLOSED);
response.setResultSet(serializedResBuilder.build());
response.setMaxRowNum(lines.length);
}
public void execQueryOnVirtualTable(QueryContext queryContext, Session session, String query, LogicalPlan plan,
SubmitQueryResponse.Builder response) throws Exception {
int maxRow = Integer.MAX_VALUE;
if (plan.getRootBlock().hasNode(NodeType.LIMIT)) {
LimitNode limitNode = plan.getRootBlock().getNode(NodeType.LIMIT);
maxRow = (int) limitNode.getFetchFirstNum();
}
QueryInfo queryInfo = context.getQueryJobManager().createNewSimpleQuery(queryContext, session, query,
(LogicalRootNode) plan.getRootBlock().getRoot());
NonForwardQueryResultScanner queryResultScanner = new NonForwardQueryResultSystemScanner(
context,
plan,
queryInfo.getQueryId(),
session.getSessionId(),
maxRow);
queryResultScanner.init();
session.addNonForwardQueryResultScanner(queryResultScanner);
response.setState(OK);
response.setQueryId(queryInfo.getQueryId().getProto());
response.setResultType(ResultType.ENCLOSED);
response.setMaxRowNum(maxRow);
response.setTableDesc(queryResultScanner.getTableDesc().getProto());
}
public void execSimpleQuery(QueryContext queryContext, Session session, String query, LogicalPlan plan,
SubmitQueryResponse.Builder response) throws Exception {
ScanNode scanNode = plan.getRootBlock().getNode(NodeType.SCAN);
final TableDesc table = scanNode.getTableDesc();
if (table.hasPartition()) {
scanNode = plan.getRootBlock().getNode(NodeType.PARTITIONS_SCAN);
}
final TableDesc resultDesc = new TableDesc("", scanNode.getOutSchema(),
new TableMeta(BuiltinStorages.DRAW, table.getMeta().getOptions()), null);
// push down limit
int maxRow = Integer.MAX_VALUE;
if (plan.getRootBlock().hasNode(NodeType.LIMIT)) {
LimitNode limitNode = plan.getRootBlock().getNode(NodeType.LIMIT);
maxRow = (int) limitNode.getFetchFirstNum();
scanNode.setLimit(maxRow);
}
final QueryInfo queryInfo = context.getQueryJobManager().createNewSimpleQuery(queryContext, session, query,
(LogicalRootNode) plan.getRootBlock().getRoot());
final NonForwardQueryResultScanner queryResultScanner = new NonForwardQueryResultFileScanner(
context.getConf(), session.getSessionId(), queryInfo.getQueryId(), scanNode, maxRow);
queryResultScanner.init();
session.addNonForwardQueryResultScanner(queryResultScanner);
response.setState(OK);
response.setQueryId(queryInfo.getQueryId().getProto());
response.setResultType(ResultType.ENCLOSED);
response.setMaxRowNum(maxRow);
response.setTableDesc(resultDesc.getProto());
}
public void execNonFromQuery(QueryContext queryContext, Session session, String query, LogicalPlan plan, SubmitQueryResponse.Builder responseBuilder)
throws Exception {
LogicalRootNode rootNode = plan.getRootBlock().getRoot();
EvalContext evalContext = new EvalContext();
Target[] targets = plan.getRootBlock().getRawTargets();
if (targets == null) {
throw new TajoInternalError("no targets");
}
try {
// start script executor
startScriptExecutors(queryContext, evalContext, targets);
final VTuple outTuple = new VTuple(targets.length);
for (int i = 0; i < targets.length; i++) {
EvalNode eval = targets[i].getEvalTree();
eval.bind(evalContext, null);
outTuple.put(i, eval.eval(null));
}
boolean isInsert = rootNode.getChild() != null && rootNode.getChild().getType() == NodeType.INSERT;
if (isInsert) {
InsertNode insertNode = rootNode.getChild();
insertRowValues(queryContext, insertNode, responseBuilder);
} else {
Schema schema = PlannerUtil.targetToSchema(targets);
SerializedResultSet.Builder serializedResBuilder = SerializedResultSet.newBuilder();
MemoryRowBlock rowBlock = new MemoryRowBlock(SchemaUtil.toDataTypes(schema));
try {
rowBlock.getWriter().addTuple(outTuple);
MemoryBlock memoryBlock = rowBlock.getMemory();
ByteBuffer uncompressed = memoryBlock.getBuffer().nioBuffer(0, memoryBlock.readableBytes());
int uncompressedLength = uncompressed.remaining();
serializedResBuilder.setDecompressedLength(uncompressedLength);
serializedResBuilder.setSerializedTuples(ByteString.copyFrom(uncompressed));
serializedResBuilder.setSchema(schema.getProto());
serializedResBuilder.setRows(rowBlock.rows());
} finally {
rowBlock.release();
}
QueryInfo queryInfo = context.getQueryJobManager().createNewSimpleQuery(queryContext, session, query,
(LogicalRootNode) plan.getRootBlock().getRoot());
responseBuilder.setState(OK);
responseBuilder.setResultType(ResultType.ENCLOSED);
responseBuilder.setQueryId(queryInfo.getQueryId().getProto());
responseBuilder.setResultSet(serializedResBuilder);
responseBuilder.setMaxRowNum(1);
}
} finally {
// stop script executor
stopScriptExecutors(evalContext);
}
}
public static void startScriptExecutors(QueryContext queryContext, EvalContext evalContext, Target[] targets)
throws IOException {
for (int i = 0; i < targets.length; i++) {
EvalNode eval = targets[i].getEvalTree();
if (eval instanceof GeneralFunctionEval) {
GeneralFunctionEval functionEval = (GeneralFunctionEval) eval;
if (functionEval.getFuncDesc().getInvocation().hasPython()) {
TajoScriptEngine scriptExecutor = new PythonScriptEngine(functionEval.getFuncDesc());
evalContext.addScriptEngine(eval, scriptExecutor);
scriptExecutor.start(queryContext.getConf());
}
}
}
}
public static void stopScriptExecutors(EvalContext evalContext) {
for (TajoScriptEngine executor : evalContext.getAllScriptEngines()) {
executor.shutdown();
}
}
/**
* Insert rows through staging phase
*/
private void insertRowsThroughStaging(TaskAttemptContext taskAttemptContext,
InsertNode insertNode,
Path finalOutputPath,
Path stagingDir,
Path stagingResultDir)
throws IOException {
EvalExprExec evalExprExec = new EvalExprExec(taskAttemptContext, (EvalExprNode) insertNode.getChild());
InsertRowsExec exec = new InsertRowsExec(taskAttemptContext, insertNode, evalExprExec);
try {
exec.init();
exec.next();
} finally {
exec.close();
}
FileSystem fs = TajoConf.getWarehouseDir(context.getConf()).getFileSystem(context.getConf());
if (insertNode.isOverwrite()) { // INSERT OVERWRITE INTO
// it moves the original table into the temporary location.
// Then it moves the new result table into the original table location.
// Upon failed, it recovers the original table if possible.
boolean movedToOldTable = false;
boolean committed = false;
Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME);
try {
if (fs.exists(finalOutputPath)) {
fs.rename(finalOutputPath, oldTableDir);
movedToOldTable = fs.exists(oldTableDir);
} else { // if the parent does not exist, make its parent directory.
fs.mkdirs(finalOutputPath.getParent());
}
fs.rename(stagingResultDir, finalOutputPath);
committed = fs.exists(finalOutputPath);
} catch (IOException ioe) {
// recover the old table
if (movedToOldTable && !committed) {
fs.rename(oldTableDir, finalOutputPath);
}
}
} else {
FileStatus[] files = fs.listStatus(stagingResultDir);
for (FileStatus eachFile : files) {
Path targetFilePath = new Path(finalOutputPath, eachFile.getPath().getName());
if (fs.exists(targetFilePath)) {
targetFilePath = new Path(finalOutputPath, eachFile.getPath().getName() + "_" + System.currentTimeMillis());
}
fs.rename(eachFile.getPath(), targetFilePath);
}
}
}
/**
* Insert row values
*/
private void insertRowValues(QueryContext queryContext,
InsertNode insertNode, SubmitQueryResponse.Builder responseBuilder) {
try {
String nodeUniqName = insertNode.getTableName() == null ? new Path(insertNode.getUri()).getName() :
insertNode.getTableName();
String queryId = nodeUniqName + "_" + System.currentTimeMillis();
URI finalOutputUri = insertNode.getUri();
Tablespace space = TablespaceManager.get(finalOutputUri);
TableMeta tableMeta = new TableMeta(insertNode.getStorageType(), insertNode.getOptions());
tableMeta.putOption(StorageConstants.INSERT_DIRECTLY, Boolean.TRUE.toString());
FormatProperty formatProperty = space.getFormatProperty(tableMeta);
TaskAttemptContext taskAttemptContext;
if (formatProperty.directInsertSupported()) { // if this format and storage supports direct insertion
taskAttemptContext = new TaskAttemptContext(queryContext, null, null, null, null);
taskAttemptContext.setOutputPath(new Path(finalOutputUri));
EvalExprExec evalExprExec = new EvalExprExec(taskAttemptContext, (EvalExprNode) insertNode.getChild());
InsertRowsExec exec = new InsertRowsExec(taskAttemptContext, insertNode, evalExprExec);
try {
exec.init();
exec.next();
} finally {
exec.close();
}
} else {
URI stagingSpaceUri = space.prepareStagingSpace(context.getConf(), queryId, queryContext, tableMeta);
Path stagingDir = new Path(stagingSpaceUri);
Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
taskAttemptContext = new TaskAttemptContext(queryContext, null, null, null, stagingDir);
taskAttemptContext.setOutputPath(new Path(stagingResultDir, "part-01-000000"));
insertRowsThroughStaging(taskAttemptContext, insertNode, new Path(finalOutputUri), stagingDir, stagingResultDir);
}
// set insert stats (how many rows and bytes)
TableStats stats = new TableStats();
stats.setNumBytes(taskAttemptContext.getResultStats().getNumBytes());
stats.setNumRows(taskAttemptContext.getResultStats().getNumRows());
if (insertNode.hasTargetTable()) {
CatalogProtos.UpdateTableStatsProto.Builder builder = CatalogProtos.UpdateTableStatsProto.newBuilder();
builder.setTableName(insertNode.getTableName());
builder.setStats(stats.getProto());
catalog.updateTableStats(builder.build());
TableDesc desc = new TableDesc(
insertNode.getTableName(),
insertNode.getTargetSchema(),
tableMeta,
finalOutputUri);
responseBuilder.setTableDesc(desc.getProto());
} else { // If INSERT INTO LOCATION
// Empty TableDesc
List<CatalogProtos.ColumnProto> columns = new ArrayList<CatalogProtos.ColumnProto>();
CatalogProtos.TableDescProto tableDescProto = CatalogProtos.TableDescProto.newBuilder()
.setTableName(nodeUniqName)
.setMeta(CatalogProtos.TableProto.newBuilder().setDataFormat(BuiltinStorages.TEXT).build())
.setSchema(CatalogProtos.SchemaProto.newBuilder().addAllFields(columns).build())
.setStats(stats.getProto())
.build();
responseBuilder.setTableDesc(tableDescProto);
}
// If queryId == NULL_QUERY_ID and MaxRowNum == -1, TajoCli prints only number of inserted rows.
responseBuilder.setMaxRowNum(-1);
responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
responseBuilder.setResultType(ResultType.NO_RESULT);
responseBuilder.setState(OK);
} catch (Throwable t) {
throw new RuntimeException(t);
}
}
public void executeDistributedQuery(QueryContext queryContext, Session session,
LogicalPlan plan,
String sql,
String jsonExpr,
SubmitQueryResponse.Builder responseBuilder) throws Exception {
LogicalRootNode rootNode = plan.getRootBlock().getRoot();
prepareForCreateTableOrInsert(catalog, plan);
hookManager.doHooks(queryContext, plan);
QueryManager queryManager = this.context.getQueryJobManager();
QueryInfo queryInfo;
queryInfo = queryManager.scheduleQuery(session, queryContext, sql, jsonExpr, rootNode);
responseBuilder.setState(OK);
responseBuilder.setQueryId(queryInfo.getQueryId().getProto());
responseBuilder.setResultType(ResultType.FETCH);
if (queryInfo.getQueryMasterHost() != null) {
responseBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost());
}
responseBuilder.setQueryMasterPort(queryInfo.getQueryMasterClientPort());
LOG.info("Query " + queryInfo.getQueryId().toString() + "," + queryInfo.getSql() + "," +
" is forwarded to " + queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort());
}
private void prepareForCreateTableOrInsert(CatalogService catalog, LogicalPlan plan)
throws TajoException, IOException {
LogicalRootNode rootNode = plan.getRootBlock().getRoot();
TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, plan.getRootBlock().getRoot());
if (tableDesc != null) {
Tablespace space = TablespaceManager.get(tableDesc.getUri());
FormatProperty formatProperty = space.getFormatProperty(tableDesc.getMeta());
if (!formatProperty.isInsertable()) {
throw new UnsupportedException (
String.format("INSERT operation on %s tablespace", tableDesc.getUri().toString()));
}
space.prepareTable(rootNode.getChild());
}
}
private void checkIndexExistence(final QueryContext queryContext, final CreateIndexNode createIndexNode)
throws DuplicateIndexException {
String databaseName, simpleIndexName, qualifiedIndexName;
if (CatalogUtil.isFQTableName(createIndexNode.getIndexName())) {
String[] splits = CatalogUtil.splitFQTableName(createIndexNode.getIndexName());
databaseName = splits[0];
simpleIndexName = splits[1];
qualifiedIndexName = createIndexNode.getIndexName();
} else {
databaseName = queryContext.getCurrentDatabase();
simpleIndexName = createIndexNode.getIndexName();
qualifiedIndexName = CatalogUtil.buildFQName(databaseName, simpleIndexName);
}
if (catalog.existIndexByName(databaseName, simpleIndexName)) {
throw new DuplicateIndexException(qualifiedIndexName);
}
}
public MasterPlan compileMasterPlan(LogicalPlan plan, QueryContext context, GlobalPlanner planner)
throws Exception {
LogicalRootNode rootNode = plan.getRootBlock().getRoot();
TableDesc tableDesc = PlannerUtil.getTableDesc(planner.getCatalog(), rootNode.getChild());
if (tableDesc != null) {
Tablespace space = TablespaceManager.get(tableDesc.getUri());
space.rewritePlan(context, plan);
}
MasterPlan masterPlan = new MasterPlan(QueryIdFactory.NULL_QUERY_ID, context, plan);
planner.build(context, masterPlan);
return masterPlan;
}
}