/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.tajo.master.exec;

import com.google.protobuf.ByteString;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.BuiltinStorages;
import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.SessionVars;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.global.GlobalPlanner;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.planner.physical.EvalExprExec;
import org.apache.tajo.engine.planner.physical.InsertRowsExec;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.exception.DuplicateIndexException;
import org.apache.tajo.exception.TajoException;
import org.apache.tajo.exception.TajoInternalError;
import org.apache.tajo.exception.UnsupportedException;
import org.apache.tajo.ipc.ClientProtos.SerializedResultSet;
import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse.ResultType;
import org.apache.tajo.master.QueryInfo;
import org.apache.tajo.master.QueryManager;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.master.exec.prehook.CreateTableHook;
import org.apache.tajo.master.exec.prehook.DistributedQueryHookManager;
import org.apache.tajo.master.exec.prehook.InsertIntoHook;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.Target;
import org.apache.tajo.plan.expr.EvalContext;
import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.plan.expr.GeneralFunctionEval;
import org.apache.tajo.plan.function.python.PythonScriptEngine;
import org.apache.tajo.plan.function.python.TajoScriptEngine;
import org.apache.tajo.plan.logical.*;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.session.Session;
import org.apache.tajo.storage.*;
import org.apache.tajo.tuple.memory.MemoryBlock;
import org.apache.tajo.tuple.memory.MemoryRowBlock;
import org.apache.tajo.util.KeyValueSet;
import org.apache.tajo.util.ProtoUtil;
import org.apache.tajo.worker.TaskAttemptContext;

import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import static org.apache.tajo.exception.ReturnStateUtil.OK;
import static org.apache.tajo.exception.ReturnStateUtil.errUndefinedDatabase;

public class QueryExecutor {
  private static final Log LOG = LogFactory.getLog(QueryExecutor.class);

  private final TajoMaster.MasterContext context;
  private final CatalogService catalog;
  private final DistributedQueryHookManager hookManager;
  private final DDLExecutor ddlExecutor;

  public QueryExecutor(TajoMaster.MasterContext context, DDLExecutor ddlExecutor) {
    this.context = context;
    this.catalog = context.getCatalog();

    this.ddlExecutor = ddlExecutor;
    this.hookManager = new DistributedQueryHookManager();
    this.hookManager.addHook(new CreateTableHook());
    this.hookManager.addHook(new InsertIntoHook());
  }

  public SubmitQueryResponse execute(QueryContext queryContext, Session session, String sql, String jsonExpr,
                      LogicalPlan plan) throws Exception {

    SubmitQueryResponse.Builder response = SubmitQueryResponse.newBuilder();
    response.setUserName(queryContext.get(SessionVars.USERNAME));

    LogicalRootNode rootNode = plan.getRootBlock().getRoot();

    if (PlannerUtil.checkIfSetSession(rootNode)) {
      execSetSession(session, plan, response);


    } else if (PlannerUtil.checkIfDDLPlan(rootNode)) {

      if (PlannerUtil.isDistExecDDL(rootNode)) {
        if (rootNode.getChild().getType() == NodeType.CREATE_INDEX) {
          checkIndexExistence(queryContext, (CreateIndexNode) rootNode.getChild());
        }
        executeDistributedQuery(queryContext, session, plan, sql, jsonExpr, response);
      } else {
        ddlExecutor.execute(queryContext, plan);
        response.setState(OK);
        response.setResultType(ResultType.NO_RESULT);
      }

    } else if (plan.isExplain()) { // explain query
      execExplain(session, sql, plan, queryContext, plan.isExplainGlobal(), response);

    } else if (PlannerUtil.checkIfQueryTargetIsVirtualTable(plan)) {
      execQueryOnVirtualTable(queryContext, session, sql, plan, response);

      // Simple query indicates a form of 'select * from tb_name [LIMIT X];'.
    } else if (PlannerUtil.checkIfSimpleQuery(plan)) {
      execSimpleQuery(queryContext, session, sql, plan, response);

      // NonFromQuery indicates a form of 'select a, x+y;'
    } else if (PlannerUtil.checkIfNonFromQuery(plan)) {
      execNonFromQuery(queryContext, session, sql, plan, response);

    } else { // it requires distributed execution. So, the query is forwarded to a query master.

      // Checking if CTAS is already finished due to 'IF NOT EXISTS' option
      if (checkIfCtasAlreadyDone(rootNode)) {
        response.setState(OK);
        response.setResultType(ResultType.NO_RESULT);
      } else {
        executeDistributedQuery(queryContext, session, plan, sql, jsonExpr, response);
      }
    }

    response.setSessionVars(ProtoUtil.convertFromMap(session.getAllVariables()));

    return response.build();
  }

  /**
   * Check if CTAS is already done
   * @param rootNode
   * @return
   */
  private boolean checkIfCtasAlreadyDone(LogicalNode rootNode) {
    if (rootNode.getChild(0).getType() == NodeType.CREATE_TABLE) {
      CreateTableNode createTable = (CreateTableNode) rootNode.getChild(0);
      if (createTable.isIfNotExists() && catalog.existsTable(createTable.getTableName())) {
        return true;
      }
    }

    return false;
  }

  public void execSetSession(Session session, LogicalPlan plan,
                             SubmitQueryResponse.Builder response) {
    SetSessionNode setSessionNode = ((LogicalRootNode) plan.getRootBlock().getRoot()).getChild();

    final String varName = setSessionNode.getName();

    // SET CATALOG 'XXX'
    if (varName.equals(SessionVars.CURRENT_DATABASE.name())) {
      String databaseName = setSessionNode.getValue();

      if (catalog.existDatabase(databaseName)) {
        session.selectDatabase(setSessionNode.getValue());
      } else {
        response.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
        response.setState(errUndefinedDatabase(databaseName));
      }

      // others
    } else {
      if (setSessionNode.hasValue()) {
        session.setVariable(varName, setSessionNode.getValue());
      } else {
        session.removeVariable(varName);
      }
    }

    response.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
    response.setState(OK);
  }

  public void execExplain(Session session, String query, LogicalPlan plan, QueryContext queryContext, boolean isGlobal,
                          SubmitQueryResponse.Builder response) throws Exception {

    String explainStr;
    boolean isTest = queryContext.getBool(SessionVars.TEST_PLAN_SHAPE_FIX_ENABLED);
    if (isTest) {
      ExplainPlanPreprocessorForTest preprocessorForTest = new ExplainPlanPreprocessorForTest();
      preprocessorForTest.prepareTest(plan);
    }

    if (isGlobal) {
      GlobalPlanner planner = new GlobalPlanner(context.getConf(), context.getCatalog());
      MasterPlan masterPlan = compileMasterPlan(plan, queryContext, planner);
      if (isTest) {
        ExplainGlobalPlanPreprocessorForTest globalPlanPreprocessorForTest = new ExplainGlobalPlanPreprocessorForTest();
        globalPlanPreprocessorForTest.prepareTest(masterPlan);
      }
      explainStr = masterPlan.toString();
    } else {
      explainStr = PlannerUtil.buildExplainString(plan.getRootBlock().getRoot());
    }

    Schema schema = new Schema();
    schema.addColumn("explain", TajoDataTypes.Type.TEXT);

    SerializedResultSet.Builder serializedResBuilder = SerializedResultSet.newBuilder();
    MemoryRowBlock rowBlock = new MemoryRowBlock(SchemaUtil.toDataTypes(schema));
    String[] lines = explainStr.split("\n");
    try {
      for (String line : lines) {
        rowBlock.getWriter().startRow();
        rowBlock.getWriter().putText(line);
        rowBlock.getWriter().endRow();
      }
      MemoryBlock memoryBlock = rowBlock.getMemory();
      ByteBuffer uncompressed = memoryBlock.getBuffer().nioBuffer(0, memoryBlock.readableBytes());
      int uncompressedLength = uncompressed.remaining();

      serializedResBuilder.setDecompressedLength(uncompressedLength);
      serializedResBuilder.setSerializedTuples(ByteString.copyFrom(uncompressed));
      serializedResBuilder.setSchema(schema.getProto());
      serializedResBuilder.setRows(rowBlock.rows());
    } finally {
      rowBlock.release();
    }

    QueryInfo queryInfo = context.getQueryJobManager().createNewSimpleQuery(queryContext, session, query,
        (LogicalRootNode) plan.getRootBlock().getRoot());

    response.setState(OK);
    response.setQueryId(queryInfo.getQueryId().getProto());
    response.setResultType(ResultType.ENCLOSED);
    response.setResultSet(serializedResBuilder.build());
    response.setMaxRowNum(lines.length);
  }

  public void execQueryOnVirtualTable(QueryContext queryContext, Session session, String query, LogicalPlan plan,
                              SubmitQueryResponse.Builder response) throws Exception {
    int maxRow = Integer.MAX_VALUE;
    if (plan.getRootBlock().hasNode(NodeType.LIMIT)) {
      LimitNode limitNode = plan.getRootBlock().getNode(NodeType.LIMIT);
      maxRow = (int) limitNode.getFetchFirstNum();
    }
    QueryInfo queryInfo = context.getQueryJobManager().createNewSimpleQuery(queryContext, session, query,
        (LogicalRootNode) plan.getRootBlock().getRoot());

    NonForwardQueryResultScanner queryResultScanner = new NonForwardQueryResultSystemScanner(
        context,
        plan,
        queryInfo.getQueryId(),
        session.getSessionId(),
        maxRow);

    queryResultScanner.init();
    session.addNonForwardQueryResultScanner(queryResultScanner);

    response.setState(OK);
    response.setQueryId(queryInfo.getQueryId().getProto());
    response.setResultType(ResultType.ENCLOSED);
    response.setMaxRowNum(maxRow);
    response.setTableDesc(queryResultScanner.getTableDesc().getProto());
  }

  public void execSimpleQuery(QueryContext queryContext, Session session, String query, LogicalPlan plan,
                              SubmitQueryResponse.Builder response) throws Exception {

    ScanNode scanNode = plan.getRootBlock().getNode(NodeType.SCAN);
    final TableDesc table = scanNode.getTableDesc();
    if (table.hasPartition()) {
      scanNode = plan.getRootBlock().getNode(NodeType.PARTITIONS_SCAN);
    }

    final TableDesc resultDesc = new TableDesc("", scanNode.getOutSchema(),
        new TableMeta(BuiltinStorages.DRAW, table.getMeta().getOptions()), null);

    // push down limit
    int maxRow = Integer.MAX_VALUE;
    if (plan.getRootBlock().hasNode(NodeType.LIMIT)) {
      LimitNode limitNode = plan.getRootBlock().getNode(NodeType.LIMIT);
      maxRow = (int) limitNode.getFetchFirstNum();
      scanNode.setLimit(maxRow);
    }

    final QueryInfo queryInfo = context.getQueryJobManager().createNewSimpleQuery(queryContext, session, query,
        (LogicalRootNode) plan.getRootBlock().getRoot());

    final NonForwardQueryResultScanner queryResultScanner = new NonForwardQueryResultFileScanner(
        context.getConf(), session.getSessionId(), queryInfo.getQueryId(), scanNode, maxRow);
    queryResultScanner.init();

    session.addNonForwardQueryResultScanner(queryResultScanner);

    response.setState(OK);
    response.setQueryId(queryInfo.getQueryId().getProto());
    response.setResultType(ResultType.ENCLOSED);
    response.setMaxRowNum(maxRow);
    response.setTableDesc(resultDesc.getProto());
  }

  public void execNonFromQuery(QueryContext queryContext, Session session, String query, LogicalPlan plan, SubmitQueryResponse.Builder responseBuilder)
      throws Exception {
    LogicalRootNode rootNode = plan.getRootBlock().getRoot();

    EvalContext evalContext = new EvalContext();
    Target[] targets = plan.getRootBlock().getRawTargets();
    if (targets == null) {
      throw new TajoInternalError("no targets");
    }
    try {
      // start script executor
      startScriptExecutors(queryContext, evalContext, targets);
      final VTuple outTuple = new VTuple(targets.length);
      for (int i = 0; i < targets.length; i++) {
        EvalNode eval = targets[i].getEvalTree();
        eval.bind(evalContext, null);
        outTuple.put(i, eval.eval(null));
      }
      boolean isInsert = rootNode.getChild() != null && rootNode.getChild().getType() == NodeType.INSERT;
      if (isInsert) {
        InsertNode insertNode = rootNode.getChild();
        insertRowValues(queryContext, insertNode, responseBuilder);
      } else {
        Schema schema = PlannerUtil.targetToSchema(targets);
        SerializedResultSet.Builder serializedResBuilder = SerializedResultSet.newBuilder();
        MemoryRowBlock rowBlock = new MemoryRowBlock(SchemaUtil.toDataTypes(schema));

        try {
          rowBlock.getWriter().addTuple(outTuple);

          MemoryBlock memoryBlock = rowBlock.getMemory();
          ByteBuffer uncompressed = memoryBlock.getBuffer().nioBuffer(0, memoryBlock.readableBytes());
          int uncompressedLength = uncompressed.remaining();

          serializedResBuilder.setDecompressedLength(uncompressedLength);
          serializedResBuilder.setSerializedTuples(ByteString.copyFrom(uncompressed));
          serializedResBuilder.setSchema(schema.getProto());
          serializedResBuilder.setRows(rowBlock.rows());
        } finally {
          rowBlock.release();
        }

        QueryInfo queryInfo = context.getQueryJobManager().createNewSimpleQuery(queryContext, session, query,
            (LogicalRootNode) plan.getRootBlock().getRoot());

        responseBuilder.setState(OK);
        responseBuilder.setResultType(ResultType.ENCLOSED);
        responseBuilder.setQueryId(queryInfo.getQueryId().getProto());
        responseBuilder.setResultSet(serializedResBuilder);
        responseBuilder.setMaxRowNum(1);
      }
    } finally {
      // stop script executor
      stopScriptExecutors(evalContext);
    }
  }

  public static void startScriptExecutors(QueryContext queryContext, EvalContext evalContext, Target[] targets)
      throws IOException {
    for (int i = 0; i < targets.length; i++) {
      EvalNode eval = targets[i].getEvalTree();
      if (eval instanceof GeneralFunctionEval) {
        GeneralFunctionEval functionEval = (GeneralFunctionEval) eval;
        if (functionEval.getFuncDesc().getInvocation().hasPython()) {
          TajoScriptEngine scriptExecutor = new PythonScriptEngine(functionEval.getFuncDesc());
          evalContext.addScriptEngine(eval, scriptExecutor);
          scriptExecutor.start(queryContext.getConf());
        }
      }
    }
  }

  public static void stopScriptExecutors(EvalContext evalContext) {
    for (TajoScriptEngine executor : evalContext.getAllScriptEngines()) {
      executor.shutdown();
    }
  }

  /**
   * Insert rows through staging phase
   */
  private void insertRowsThroughStaging(TaskAttemptContext taskAttemptContext,
                                        InsertNode insertNode,
                                        Path finalOutputPath,
                                        Path stagingDir,
                                        Path stagingResultDir)
      throws IOException {

    EvalExprExec evalExprExec = new EvalExprExec(taskAttemptContext, (EvalExprNode) insertNode.getChild());
    InsertRowsExec exec = new InsertRowsExec(taskAttemptContext, insertNode, evalExprExec);

    try {
      exec.init();
      exec.next();
    } finally {
      exec.close();
    }

    FileSystem fs = TajoConf.getWarehouseDir(context.getConf()).getFileSystem(context.getConf());

    if (insertNode.isOverwrite()) { // INSERT OVERWRITE INTO
      // it moves the original table into the temporary location.
      // Then it moves the new result table into the original table location.
      // Upon failed, it recovers the original table if possible.
      boolean movedToOldTable = false;
      boolean committed = false;
      Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME);
      try {
        if (fs.exists(finalOutputPath)) {
          fs.rename(finalOutputPath, oldTableDir);
          movedToOldTable = fs.exists(oldTableDir);
        } else { // if the parent does not exist, make its parent directory.
          fs.mkdirs(finalOutputPath.getParent());
        }
        fs.rename(stagingResultDir, finalOutputPath);
        committed = fs.exists(finalOutputPath);
      } catch (IOException ioe) {
        // recover the old table
        if (movedToOldTable && !committed) {
          fs.rename(oldTableDir, finalOutputPath);
        }
      }
    } else {
      FileStatus[] files = fs.listStatus(stagingResultDir);
      for (FileStatus eachFile : files) {
        Path targetFilePath = new Path(finalOutputPath, eachFile.getPath().getName());
        if (fs.exists(targetFilePath)) {
          targetFilePath = new Path(finalOutputPath, eachFile.getPath().getName() + "_" + System.currentTimeMillis());
        }
        fs.rename(eachFile.getPath(), targetFilePath);
      }
    }
  }

  /**
   * Insert row values
   */
  private void insertRowValues(QueryContext queryContext,
                               InsertNode insertNode, SubmitQueryResponse.Builder responseBuilder) {
    try {
      String nodeUniqName = insertNode.getTableName() == null ? new Path(insertNode.getUri()).getName() :
          insertNode.getTableName();
      String queryId = nodeUniqName + "_" + System.currentTimeMillis();

      URI finalOutputUri = insertNode.getUri();
      Tablespace space = TablespaceManager.get(finalOutputUri);
      TableMeta tableMeta = new TableMeta(insertNode.getStorageType(), insertNode.getOptions());
      tableMeta.putOption(StorageConstants.INSERT_DIRECTLY, Boolean.TRUE.toString());

      FormatProperty formatProperty = space.getFormatProperty(tableMeta);

      TaskAttemptContext taskAttemptContext;
      if (formatProperty.directInsertSupported()) { // if this format and storage supports direct insertion
        taskAttemptContext = new TaskAttemptContext(queryContext, null, null, null, null);
        taskAttemptContext.setOutputPath(new Path(finalOutputUri));

        EvalExprExec evalExprExec = new EvalExprExec(taskAttemptContext, (EvalExprNode) insertNode.getChild());
        InsertRowsExec exec = new InsertRowsExec(taskAttemptContext, insertNode, evalExprExec);

        try {
          exec.init();
          exec.next();
        } finally {
          exec.close();
        }
      } else {
        URI stagingSpaceUri = space.prepareStagingSpace(context.getConf(), queryId, queryContext, tableMeta);
        Path stagingDir = new Path(stagingSpaceUri);
        Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);

        taskAttemptContext = new TaskAttemptContext(queryContext, null, null, null, stagingDir);
        taskAttemptContext.setOutputPath(new Path(stagingResultDir, "part-01-000000"));
        insertRowsThroughStaging(taskAttemptContext, insertNode, new Path(finalOutputUri), stagingDir, stagingResultDir);
      }

      // set insert stats (how many rows and bytes)
      TableStats stats = new TableStats();
      stats.setNumBytes(taskAttemptContext.getResultStats().getNumBytes());
      stats.setNumRows(taskAttemptContext.getResultStats().getNumRows());

      if (insertNode.hasTargetTable()) {
        CatalogProtos.UpdateTableStatsProto.Builder builder = CatalogProtos.UpdateTableStatsProto.newBuilder();
        builder.setTableName(insertNode.getTableName());
        builder.setStats(stats.getProto());

        catalog.updateTableStats(builder.build());

        TableDesc desc = new TableDesc(
            insertNode.getTableName(),
            insertNode.getTargetSchema(),
            tableMeta,
            finalOutputUri);
        responseBuilder.setTableDesc(desc.getProto());

      } else { // If INSERT INTO LOCATION

        // Empty TableDesc
        List<CatalogProtos.ColumnProto> columns = new ArrayList<CatalogProtos.ColumnProto>();
        CatalogProtos.TableDescProto tableDescProto = CatalogProtos.TableDescProto.newBuilder()
            .setTableName(nodeUniqName)
            .setMeta(CatalogProtos.TableProto.newBuilder().setDataFormat(BuiltinStorages.TEXT).build())
            .setSchema(CatalogProtos.SchemaProto.newBuilder().addAllFields(columns).build())
            .setStats(stats.getProto())
            .build();

        responseBuilder.setTableDesc(tableDescProto);
      }

      // If queryId == NULL_QUERY_ID and MaxRowNum == -1, TajoCli prints only number of inserted rows.
      responseBuilder.setMaxRowNum(-1);
      responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
      responseBuilder.setResultType(ResultType.NO_RESULT);
      responseBuilder.setState(OK);
    } catch (Throwable t) {
      throw new RuntimeException(t);
    }
  }

  public void executeDistributedQuery(QueryContext queryContext, Session session,
                                      LogicalPlan plan,
                                      String sql,
                                      String jsonExpr,
                                      SubmitQueryResponse.Builder responseBuilder) throws Exception {
    LogicalRootNode rootNode = plan.getRootBlock().getRoot();

    prepareForCreateTableOrInsert(catalog, plan);

    hookManager.doHooks(queryContext, plan);

    QueryManager queryManager = this.context.getQueryJobManager();
    QueryInfo queryInfo;

    queryInfo = queryManager.scheduleQuery(session, queryContext, sql, jsonExpr, rootNode);

    responseBuilder.setState(OK);
    responseBuilder.setQueryId(queryInfo.getQueryId().getProto());
    responseBuilder.setResultType(ResultType.FETCH);
    if (queryInfo.getQueryMasterHost() != null) {
      responseBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost());
    }
    responseBuilder.setQueryMasterPort(queryInfo.getQueryMasterClientPort());
    LOG.info("Query " + queryInfo.getQueryId().toString() + "," + queryInfo.getSql() + "," +
        " is forwarded to " + queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort());
  }

  private void prepareForCreateTableOrInsert(CatalogService catalog, LogicalPlan plan)
      throws TajoException, IOException {
    LogicalRootNode rootNode = plan.getRootBlock().getRoot();
    TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, plan.getRootBlock().getRoot());
    if (tableDesc != null) {

      Tablespace space = TablespaceManager.get(tableDesc.getUri());
      FormatProperty formatProperty = space.getFormatProperty(tableDesc.getMeta());

      if (!formatProperty.isInsertable()) {
        throw new UnsupportedException (
            String.format("INSERT operation on %s tablespace", tableDesc.getUri().toString()));
      }

      space.prepareTable(rootNode.getChild());
    }
  }

  private void checkIndexExistence(final QueryContext queryContext, final CreateIndexNode createIndexNode)
      throws DuplicateIndexException {

    String databaseName, simpleIndexName, qualifiedIndexName;
    if (CatalogUtil.isFQTableName(createIndexNode.getIndexName())) {
      String[] splits = CatalogUtil.splitFQTableName(createIndexNode.getIndexName());
      databaseName = splits[0];
      simpleIndexName = splits[1];
      qualifiedIndexName = createIndexNode.getIndexName();
    } else {
      databaseName = queryContext.getCurrentDatabase();
      simpleIndexName = createIndexNode.getIndexName();
      qualifiedIndexName = CatalogUtil.buildFQName(databaseName, simpleIndexName);
    }

    if (catalog.existIndexByName(databaseName, simpleIndexName)) {
      throw new DuplicateIndexException(qualifiedIndexName);
    }
  }

  public MasterPlan compileMasterPlan(LogicalPlan plan, QueryContext context, GlobalPlanner planner)
      throws Exception {

    LogicalRootNode rootNode = plan.getRootBlock().getRoot();
    TableDesc tableDesc = PlannerUtil.getTableDesc(planner.getCatalog(), rootNode.getChild());

    if (tableDesc != null) {
      Tablespace space = TablespaceManager.get(tableDesc.getUri());
      space.rewritePlan(context, plan);
    }

    MasterPlan masterPlan = new MasterPlan(QueryIdFactory.NULL_QUERY_ID, context, plan);
    planner.build(context, masterPlan);

    return masterPlan;
  }
}
