| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tajo.master; |
| |
| import com.google.common.base.Preconditions; |
| import com.google.protobuf.ByteString; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.service.AbstractService; |
| import org.apache.hadoop.util.StringUtils; |
| import org.apache.tajo.QueryId; |
| import org.apache.tajo.QueryIdFactory; |
| import org.apache.tajo.TajoConstants; |
| import org.apache.tajo.algebra.AlterTablespaceSetType; |
| import org.apache.tajo.algebra.Expr; |
| import org.apache.tajo.algebra.JsonHelper; |
| import org.apache.tajo.annotation.Nullable; |
| import org.apache.tajo.catalog.*; |
| import org.apache.tajo.catalog.exception.*; |
| import org.apache.tajo.catalog.partition.PartitionMethodDesc; |
| import org.apache.tajo.catalog.proto.CatalogProtos; |
| import org.apache.tajo.catalog.statistics.TableStats; |
| import org.apache.tajo.common.TajoDataTypes; |
| import org.apache.tajo.conf.TajoConf; |
| import org.apache.tajo.datum.DatumFactory; |
| import org.apache.tajo.engine.eval.EvalNode; |
| import org.apache.tajo.engine.exception.IllegalQueryStatusException; |
| import org.apache.tajo.engine.exception.VerifyException; |
| import org.apache.tajo.engine.parser.SQLAnalyzer; |
| import org.apache.tajo.engine.planner.*; |
| import org.apache.tajo.engine.planner.logical.*; |
| import org.apache.tajo.engine.planner.physical.EvalExprExec; |
| import org.apache.tajo.engine.planner.physical.StoreTableExec; |
| import org.apache.tajo.engine.query.QueryContext; |
| import org.apache.tajo.ipc.ClientProtos; |
| import org.apache.tajo.master.TajoMaster.MasterContext; |
| import org.apache.tajo.master.querymaster.Query; |
| import org.apache.tajo.master.querymaster.QueryInfo; |
| import org.apache.tajo.master.querymaster.QueryJobManager; |
| import org.apache.tajo.master.querymaster.QueryMasterTask; |
| import org.apache.tajo.master.session.Session; |
| import org.apache.tajo.storage.*; |
| import org.apache.tajo.worker.TaskAttemptContext; |
| |
| import java.io.IOException; |
| import java.sql.SQLException; |
| import java.util.ArrayList; |
| import java.util.List; |
| |
| import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; |
| import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto; |
| import static org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse; |
| import static org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse.SerializedResultSet; |
| |
| public class GlobalEngine extends AbstractService { |
| /** Class Logger */ |
| private final static Log LOG = LogFactory.getLog(GlobalEngine.class); |
| |
| private final MasterContext context; |
| private final AbstractStorageManager sm; |
| |
| private SQLAnalyzer analyzer; |
| private CatalogService catalog; |
| private PreLogicalPlanVerifier preVerifier; |
| private LogicalPlanner planner; |
| private LogicalOptimizer optimizer; |
| private LogicalPlanVerifier annotatedPlanVerifier; |
| private DistributedQueryHookManager hookManager; |
| |
| public GlobalEngine(final MasterContext context) { |
| super(GlobalEngine.class.getName()); |
| this.context = context; |
| this.catalog = context.getCatalog(); |
| this.sm = context.getStorageManager(); |
| } |
| |
| public void start() { |
| try { |
| analyzer = new SQLAnalyzer(); |
| preVerifier = new PreLogicalPlanVerifier(context.getCatalog()); |
| planner = new LogicalPlanner(context.getCatalog()); |
| optimizer = new LogicalOptimizer(context.getConf()); |
| annotatedPlanVerifier = new LogicalPlanVerifier(context.getConf(), context.getCatalog()); |
| |
| hookManager = new DistributedQueryHookManager(); |
| hookManager.addHook(new CreateTableHook()); |
| hookManager.addHook(new InsertHook()); |
| } catch (Throwable t) { |
| LOG.error(t.getMessage(), t); |
| } |
| super.start(); |
| } |
| |
| public void stop() { |
| super.stop(); |
| } |
| |
| public SubmitQueryResponse executeQuery(Session session, String query, boolean isJson) { |
| LOG.info("Query: " + query); |
| QueryContext queryContext = new QueryContext(); |
| queryContext.putAll(session.getAllVariables()); |
| Expr planningContext; |
| |
| try { |
| if (isJson) { |
| planningContext = buildExpressionFromJson(query); |
| } else { |
| // setting environment variables |
| String [] cmds = query.split(" "); |
| if(cmds != null) { |
| if(cmds[0].equalsIgnoreCase("set")) { |
| String[] params = cmds[1].split("="); |
| context.getConf().set(params[0], params[1]); |
| SubmitQueryResponse.Builder responseBuilder = SubmitQueryResponse.newBuilder(); |
| responseBuilder.setUserName(context.getConf().getVar(TajoConf.ConfVars.USERNAME)); |
| responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); |
| responseBuilder.setResultCode(ClientProtos.ResultCode.OK); |
| return responseBuilder.build(); |
| } |
| } |
| |
| planningContext = buildExpressionFromSql(queryContext, query); |
| } |
| |
| String jsonExpr = planningContext.toJson(); |
| LogicalPlan plan = createLogicalPlan(session, planningContext); |
| SubmitQueryResponse response = executeQueryInternal(queryContext, session, plan, query, jsonExpr); |
| return response; |
| } catch (Throwable t) { |
| context.getSystemMetrics().counter("Query", "errorQuery").inc(); |
| LOG.error("\nStack Trace:\n" + StringUtils.stringifyException(t)); |
| SubmitQueryResponse.Builder responseBuilder = SubmitQueryResponse.newBuilder(); |
| responseBuilder.setUserName(context.getConf().getVar(TajoConf.ConfVars.USERNAME)); |
| responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); |
| responseBuilder.setIsForwarded(true); |
| responseBuilder.setResultCode(ClientProtos.ResultCode.ERROR); |
| String errorMessage = t.getMessage(); |
| if (t.getMessage() == null) { |
| errorMessage = t.getClass().getName(); |
| } |
| responseBuilder.setErrorMessage(errorMessage); |
| responseBuilder.setErrorTrace(StringUtils.stringifyException(t)); |
| return responseBuilder.build(); |
| } |
| } |
| |
| public Expr buildExpressionFromJson(String json) { |
| return JsonHelper.fromJson(json, Expr.class); |
| } |
| |
| public Expr buildExpressionFromSql(QueryContext queryContext, String sql) |
| throws InterruptedException, IOException, IllegalQueryStatusException { |
| context.getSystemMetrics().counter("Query", "totalQuery").inc(); |
| return analyzer.parse(sql); |
| } |
| |
| private SubmitQueryResponse executeQueryInternal(QueryContext queryContext, |
| Session session, |
| LogicalPlan plan, |
| String sql, |
| String jsonExpr) throws Exception { |
| |
| LogicalRootNode rootNode = plan.getRootBlock().getRoot(); |
| |
| SubmitQueryResponse.Builder responseBuilder = SubmitQueryResponse.newBuilder(); |
| responseBuilder.setIsForwarded(false); |
| responseBuilder.setUserName(context.getConf().getVar(TajoConf.ConfVars.USERNAME)); |
| |
| if (PlannerUtil.checkIfDDLPlan(rootNode)) { |
| context.getSystemMetrics().counter("Query", "numDDLQuery").inc(); |
| updateQuery(session, rootNode.getChild()); |
| responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); |
| responseBuilder.setResultCode(ClientProtos.ResultCode.OK); |
| |
| } else if (plan.isExplain()) { // explain query |
| String explainStr = PlannerUtil.buildExplainString(plan.getRootBlock().getRoot()); |
| Schema schema = new Schema(); |
| schema.addColumn("explain", TajoDataTypes.Type.TEXT); |
| RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema); |
| |
| SerializedResultSet.Builder serializedResBuilder = SerializedResultSet.newBuilder(); |
| |
| VTuple tuple = new VTuple(1); |
| String[] lines = explainStr.split("\n"); |
| int bytesNum = 0; |
| for (String line : lines) { |
| tuple.put(0, DatumFactory.createText(line)); |
| byte [] encodedData = encoder.toBytes(tuple); |
| bytesNum += encodedData.length; |
| serializedResBuilder.addSerializedTuples(ByteString.copyFrom(encodedData)); |
| } |
| serializedResBuilder.setSchema(schema.getProto()); |
| serializedResBuilder.setBytesNum(bytesNum); |
| |
| responseBuilder.setResultSet(serializedResBuilder.build()); |
| responseBuilder.setMaxRowNum(lines.length); |
| responseBuilder.setResultCode(ClientProtos.ResultCode.OK); |
| responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); |
| |
| // Simple query indicates a form of 'select * from tb_name [LIMIT X];'. |
| } else if (PlannerUtil.checkIfSimpleQuery(plan)) { |
| ScanNode scanNode = plan.getRootBlock().getNode(NodeType.SCAN); |
| TableDesc desc = scanNode.getTableDesc(); |
| if (plan.getRootBlock().hasNode(NodeType.LIMIT)) { |
| LimitNode limitNode = plan.getRootBlock().getNode(NodeType.LIMIT); |
| responseBuilder.setMaxRowNum((int) limitNode.getFetchFirstNum()); |
| } else { |
| if (desc.getStats().getNumBytes() > 0 && desc.getStats().getNumRows() == 0) { |
| responseBuilder.setMaxRowNum(Integer.MAX_VALUE); |
| } |
| } |
| responseBuilder.setTableDesc(desc.getProto()); |
| responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); |
| responseBuilder.setResultCode(ClientProtos.ResultCode.OK); |
| |
| // NonFromQuery indicates a form of 'select a, x+y;' |
| } else if (PlannerUtil.checkIfNonFromQuery(plan)) { |
| Target [] targets = plan.getRootBlock().getRawTargets(); |
| if (targets == null) { |
| throw new PlanningException("No targets"); |
| } |
| final Tuple outTuple = new VTuple(targets.length); |
| for (int i = 0; i < targets.length; i++) { |
| EvalNode eval = targets[i].getEvalTree(); |
| outTuple.put(i, eval.eval(null, null)); |
| } |
| boolean isInsert = rootNode.getChild() != null && rootNode.getChild().getType() == NodeType.INSERT; |
| if (isInsert) { |
| InsertNode insertNode = rootNode.getChild(); |
| insertNonFromQuery(queryContext, insertNode, responseBuilder); |
| } else { |
| Schema schema = PlannerUtil.targetToSchema(targets); |
| RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema); |
| byte[] serializedBytes = encoder.toBytes(outTuple); |
| SerializedResultSet.Builder serializedResBuilder = SerializedResultSet.newBuilder(); |
| serializedResBuilder.addSerializedTuples(ByteString.copyFrom(serializedBytes)); |
| serializedResBuilder.setSchema(schema.getProto()); |
| serializedResBuilder.setBytesNum(serializedBytes.length); |
| |
| responseBuilder.setResultSet(serializedResBuilder); |
| responseBuilder.setMaxRowNum(1); |
| responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); |
| responseBuilder.setResultCode(ClientProtos.ResultCode.OK); |
| } |
| } else { // it requires distributed execution. So, the query is forwarded to a query master. |
| context.getSystemMetrics().counter("Query", "numDMLQuery").inc(); |
| hookManager.doHooks(queryContext, plan); |
| |
| QueryJobManager queryJobManager = this.context.getQueryJobManager(); |
| QueryInfo queryInfo; |
| |
| queryInfo = queryJobManager.createNewQueryJob(session, queryContext, sql, jsonExpr, rootNode); |
| |
| if(queryInfo == null) { |
| responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); |
| responseBuilder.setResultCode(ClientProtos.ResultCode.ERROR); |
| responseBuilder.setErrorMessage("Fail starting QueryMaster."); |
| } else { |
| responseBuilder.setIsForwarded(true); |
| responseBuilder.setQueryId(queryInfo.getQueryId().getProto()); |
| responseBuilder.setResultCode(ClientProtos.ResultCode.OK); |
| if(queryInfo.getQueryMasterHost() != null) { |
| responseBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost()); |
| } |
| responseBuilder.setQueryMasterPort(queryInfo.getQueryMasterClientPort()); |
| LOG.info("Query is forwarded to " + queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort()); |
| } |
| } |
| SubmitQueryResponse response = responseBuilder.build(); |
| return response; |
| } |
| |
| private void insertNonFromQuery(QueryContext queryContext, InsertNode insertNode, SubmitQueryResponse.Builder responseBuilder) |
| throws Exception { |
| String nodeUniqName = insertNode.getTableName() == null ? insertNode.getPath().getName() : insertNode.getTableName(); |
| String queryId = nodeUniqName + "_" + System.currentTimeMillis(); |
| |
| FileSystem fs = TajoConf.getWarehouseDir(context.getConf()).getFileSystem(context.getConf()); |
| Path stagingDir = QueryMasterTask.initStagingDir(context.getConf(), fs, queryId.toString()); |
| |
| Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); |
| fs.mkdirs(stagingResultDir); |
| |
| TableDesc tableDesc = null; |
| Path finalOutputDir = null; |
| if (insertNode.getTableName() != null) { |
| tableDesc = this.catalog.getTableDesc(insertNode.getTableName()); |
| finalOutputDir = tableDesc.getPath(); |
| } else { |
| finalOutputDir = insertNode.getPath(); |
| } |
| |
| TaskAttemptContext taskAttemptContext = |
| new TaskAttemptContext(context.getConf(), queryContext, null, (CatalogProtos.FragmentProto[]) null, stagingDir); |
| taskAttemptContext.setOutputPath(new Path(stagingResultDir, "part-01-000000")); |
| |
| EvalExprExec evalExprExec = new EvalExprExec(taskAttemptContext, (EvalExprNode) insertNode.getChild()); |
| StoreTableExec exec = new StoreTableExec(taskAttemptContext, insertNode, evalExprExec); |
| try { |
| exec.init(); |
| exec.next(); |
| } finally { |
| exec.close(); |
| } |
| |
| if (insertNode.isOverwrite()) { // INSERT OVERWRITE INTO |
| // it moves the original table into the temporary location. |
| // Then it moves the new result table into the original table location. |
| // Upon failed, it recovers the original table if possible. |
| boolean movedToOldTable = false; |
| boolean committed = false; |
| Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME); |
| try { |
| if (fs.exists(finalOutputDir)) { |
| fs.rename(finalOutputDir, oldTableDir); |
| movedToOldTable = fs.exists(oldTableDir); |
| } else { // if the parent does not exist, make its parent directory. |
| fs.mkdirs(finalOutputDir.getParent()); |
| } |
| fs.rename(stagingResultDir, finalOutputDir); |
| committed = fs.exists(finalOutputDir); |
| } catch (IOException ioe) { |
| // recover the old table |
| if (movedToOldTable && !committed) { |
| fs.rename(oldTableDir, finalOutputDir); |
| } |
| } |
| } else { |
| FileStatus[] files = fs.listStatus(stagingResultDir); |
| for (FileStatus eachFile: files) { |
| Path targetFilePath = new Path(finalOutputDir, eachFile.getPath().getName()); |
| if (fs.exists(targetFilePath)) { |
| targetFilePath = new Path(finalOutputDir, eachFile.getPath().getName() + "_" + System.currentTimeMillis()); |
| } |
| fs.rename(eachFile.getPath(), targetFilePath); |
| } |
| } |
| |
| if (insertNode.hasTargetTable()) { |
| TableStats stats = tableDesc.getStats(); |
| long volume = Query.getTableVolume(context.getConf(), finalOutputDir); |
| stats.setNumBytes(volume); |
| stats.setNumRows(1); |
| |
| catalog.dropTable(insertNode.getTableName()); |
| catalog.createTable(tableDesc); |
| |
| responseBuilder.setTableDesc(tableDesc.getProto()); |
| } else { |
| TableStats stats = new TableStats(); |
| long volume = Query.getTableVolume(context.getConf(), finalOutputDir); |
| stats.setNumBytes(volume); |
| stats.setNumRows(1); |
| |
| // Empty TableDesc |
| List<CatalogProtos.ColumnProto> columns = new ArrayList<CatalogProtos.ColumnProto>(); |
| CatalogProtos.TableDescProto tableDescProto = CatalogProtos.TableDescProto.newBuilder() |
| .setTableName(nodeUniqName) |
| .setMeta(CatalogProtos.TableProto.newBuilder().setStoreType(CatalogProtos.StoreType.CSV).build()) |
| .setSchema(CatalogProtos.SchemaProto.newBuilder().addAllFields(columns).build()) |
| .setStats(stats.getProto()) |
| .build(); |
| |
| responseBuilder.setTableDesc(tableDescProto); |
| } |
| |
| // If queryId == NULL_QUERY_ID and MaxRowNum == -1, TajoCli prints only number of inserted rows. |
| responseBuilder.setMaxRowNum(-1); |
| responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); |
| responseBuilder.setResultCode(ClientProtos.ResultCode.OK); |
| } |
| |
| |
| public QueryId updateQuery(Session session, String sql, boolean isJson) throws IOException, SQLException, PlanningException { |
| try { |
| LOG.info("SQL: " + sql); |
| |
| Expr expr; |
| if (isJson) { |
| expr = JsonHelper.fromJson(sql, Expr.class); |
| } else { |
| // parse the query |
| expr = analyzer.parse(sql); |
| } |
| |
| LogicalPlan plan = createLogicalPlan(session, expr); |
| LogicalRootNode rootNode = plan.getRootBlock().getRoot(); |
| |
| if (!PlannerUtil.checkIfDDLPlan(rootNode)) { |
| throw new SQLException("This is not update query:\n" + sql); |
| } else { |
| updateQuery(session, rootNode.getChild()); |
| return QueryIdFactory.NULL_QUERY_ID; |
| } |
| } catch (Exception e) { |
| LOG.error(e.getMessage(), e); |
| throw new IOException(e.getMessage(), e); |
| } |
| } |
| |
| private boolean updateQuery(Session session, LogicalNode root) throws IOException { |
| |
| switch (root.getType()) { |
| case CREATE_DATABASE: |
| CreateDatabaseNode createDatabase = (CreateDatabaseNode) root; |
| createDatabase(session, createDatabase.getDatabaseName(), null, createDatabase.isIfNotExists()); |
| return true; |
| case DROP_DATABASE: |
| DropDatabaseNode dropDatabaseNode = (DropDatabaseNode) root; |
| dropDatabase(session, dropDatabaseNode.getDatabaseName(), dropDatabaseNode.isIfExists()); |
| return true; |
| case CREATE_TABLE: |
| CreateTableNode createTable = (CreateTableNode) root; |
| createTable(session, createTable, createTable.isIfNotExists()); |
| return true; |
| case DROP_TABLE: |
| DropTableNode dropTable = (DropTableNode) root; |
| dropTable(session, dropTable.getTableName(), dropTable.isIfExists(), dropTable.isPurge()); |
| return true; |
| case ALTER_TABLESPACE: |
| AlterTablespaceNode alterTablespace = (AlterTablespaceNode) root; |
| alterTablespace(session, alterTablespace); |
| return true; |
| case ALTER_TABLE: |
| AlterTableNode alterTable = (AlterTableNode) root; |
| alterTable(session,alterTable); |
| return true; |
| case TRUNCATE_TABLE: |
| TruncateTableNode truncateTable = (TruncateTableNode) root; |
| truncateTable(session, truncateTable); |
| return true; |
| default: |
| throw new InternalError("updateQuery cannot handle such query: \n" + root.toJson()); |
| } |
| } |
| |
| private LogicalPlan createLogicalPlan(Session session, Expr expression) throws PlanningException { |
| |
| VerificationState state = new VerificationState(); |
| preVerifier.verify(session, state, expression); |
| if (!state.verified()) { |
| StringBuilder sb = new StringBuilder(); |
| for (String error : state.getErrorMessages()) { |
| sb.append(error).append("\n"); |
| } |
| throw new VerifyException(sb.toString()); |
| } |
| |
| LogicalPlan plan = planner.createPlan(session, expression); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("============================================="); |
| LOG.debug("Non Optimized Query: \n" + plan.toString()); |
| LOG.debug("============================================="); |
| } |
| optimizer.optimize(session, plan); |
| LOG.info("============================================="); |
| LOG.info("Optimized Query: \n" + plan.toString()); |
| LOG.info("============================================="); |
| |
| annotatedPlanVerifier.verify(session, state, plan); |
| |
| if (!state.verified()) { |
| StringBuilder sb = new StringBuilder(); |
| for (String error : state.getErrorMessages()) { |
| sb.append(error).append("\n"); |
| } |
| throw new VerifyException(sb.toString()); |
| } |
| |
| return plan; |
| } |
| |
| /** |
| * Alter a given table |
| */ |
| public void alterTablespace(final Session session, final AlterTablespaceNode alterTablespace) { |
| |
| final CatalogService catalog = context.getCatalog(); |
| final String spaceName = alterTablespace.getTablespaceName(); |
| |
| AlterTablespaceProto.Builder builder = AlterTablespaceProto.newBuilder(); |
| builder.setSpaceName(spaceName); |
| if (alterTablespace.getSetType() == AlterTablespaceSetType.LOCATION) { |
| AlterTablespaceProto.AlterTablespaceCommand.Builder commandBuilder = |
| AlterTablespaceProto.AlterTablespaceCommand.newBuilder(); |
| commandBuilder.setType(AlterTablespaceProto.AlterTablespaceType.LOCATION); |
| commandBuilder.setLocation(AlterTablespaceProto.SetLocation.newBuilder().setUri(alterTablespace.getLocation())); |
| commandBuilder.build(); |
| builder.addCommand(commandBuilder); |
| } else { |
| throw new RuntimeException("This 'ALTER TABLESPACE' is not supported yet."); |
| } |
| |
| catalog.alterTablespace(builder.build()); |
| } |
| |
| /** |
| * Alter a given table |
| */ |
| public void alterTable(final Session session, final AlterTableNode alterTable) throws IOException { |
| |
| final CatalogService catalog = context.getCatalog(); |
| final String tableName = alterTable.getTableName(); |
| |
| String databaseName; |
| String simpleTableName; |
| if (CatalogUtil.isFQTableName(tableName)) { |
| String[] split = CatalogUtil.splitFQTableName(tableName); |
| databaseName = split[0]; |
| simpleTableName = split[1]; |
| } else { |
| databaseName = session.getCurrentDatabase(); |
| simpleTableName = tableName; |
| } |
| final String qualifiedName = CatalogUtil.buildFQName(databaseName, simpleTableName); |
| |
| if (!catalog.existsTable(databaseName, simpleTableName)) { |
| throw new NoSuchTableException(qualifiedName); |
| } |
| |
| switch (alterTable.getAlterTableOpType()) { |
| case RENAME_TABLE: |
| if (!catalog.existsTable(databaseName, simpleTableName)) { |
| throw new NoSuchTableException(alterTable.getTableName()); |
| } |
| if (catalog.existsTable(databaseName, alterTable.getNewTableName())) { |
| throw new AlreadyExistsTableException(alterTable.getNewTableName()); |
| } |
| |
| TableDesc desc = catalog.getTableDesc(databaseName, simpleTableName); |
| |
| if (!desc.isExternal()) { // if the table is the managed table |
| Path oldPath = StorageUtil.concatPath(context.getConf().getVar(TajoConf.ConfVars.WAREHOUSE_DIR), |
| databaseName, simpleTableName); |
| Path newPath = StorageUtil.concatPath(context.getConf().getVar(TajoConf.ConfVars.WAREHOUSE_DIR), |
| databaseName, alterTable.getNewTableName()); |
| FileSystem fs = oldPath.getFileSystem(context.getConf()); |
| |
| if (!fs.exists(oldPath)) { |
| throw new IOException("No such a table directory: " + oldPath); |
| } |
| if (fs.exists(newPath)) { |
| throw new IOException("Already table directory exists: " + newPath); |
| } |
| |
| fs.rename(oldPath, newPath); |
| } |
| catalog.alterTable(CatalogUtil.renameTable(qualifiedName, alterTable.getNewTableName(), |
| AlterTableType.RENAME_TABLE)); |
| break; |
| case RENAME_COLUMN: |
| if (existColumnName(qualifiedName, alterTable.getNewColumnName())) { |
| throw new ColumnNameAlreadyExistException(alterTable.getNewColumnName()); |
| } |
| catalog.alterTable(CatalogUtil.renameColumn(qualifiedName, alterTable.getColumnName(), alterTable.getNewColumnName(), AlterTableType.RENAME_COLUMN)); |
| break; |
| case ADD_COLUMN: |
| if (existColumnName(qualifiedName, alterTable.getAddNewColumn().getSimpleName())) { |
| throw new ColumnNameAlreadyExistException(alterTable.getAddNewColumn().getSimpleName()); |
| } |
| catalog.alterTable(CatalogUtil.addNewColumn(qualifiedName, alterTable.getAddNewColumn(), AlterTableType.ADD_COLUMN)); |
| break; |
| default: |
| //TODO |
| } |
| } |
| |
| /** |
| * Truncate table a given table |
| */ |
| public void truncateTable(final Session session, final TruncateTableNode truncateTableNode) throws IOException { |
| List<String> tableNames = truncateTableNode.getTableNames(); |
| final CatalogService catalog = context.getCatalog(); |
| |
| String databaseName; |
| String simpleTableName; |
| |
| List<TableDesc> tableDescList = new ArrayList<TableDesc>(); |
| for (String eachTableName: tableNames) { |
| if (CatalogUtil.isFQTableName(eachTableName)) { |
| String[] split = CatalogUtil.splitFQTableName(eachTableName); |
| databaseName = split[0]; |
| simpleTableName = split[1]; |
| } else { |
| databaseName = session.getCurrentDatabase(); |
| simpleTableName = eachTableName; |
| } |
| final String qualifiedName = CatalogUtil.buildFQName(databaseName, simpleTableName); |
| |
| if (!catalog.existsTable(databaseName, simpleTableName)) { |
| throw new NoSuchTableException(qualifiedName); |
| } |
| |
| Path warehousePath = new Path(TajoConf.getWarehouseDir(context.getConf()), databaseName); |
| TableDesc tableDesc = catalog.getTableDesc(databaseName, simpleTableName); |
| Path tablePath = tableDesc.getPath(); |
| if (tablePath.getParent() == null || |
| !tablePath.getParent().toUri().getPath().equals(warehousePath.toUri().getPath())) { |
| throw new IOException("Can't truncate external table:" + eachTableName + ", data dir=" + tablePath + |
| ", warehouse dir=" + warehousePath); |
| } |
| tableDescList.add(tableDesc); |
| } |
| |
| for (TableDesc eachTable: tableDescList) { |
| Path path = eachTable.getPath(); |
| LOG.info("Truncate table: " + eachTable.getName() + ", delete all data files in " + path); |
| FileSystem fs = path.getFileSystem(context.getConf()); |
| |
| FileStatus[] files = fs.listStatus(path); |
| if (files != null) { |
| for (FileStatus eachFile: files) { |
| fs.delete(eachFile.getPath(), true); |
| } |
| } |
| } |
| } |
| |
| private boolean existColumnName(String tableName, String columnName) { |
| final TableDesc tableDesc = catalog.getTableDesc(tableName); |
| return tableDesc.getSchema().containsByName(columnName) ? true : false; |
| } |
| |
| private TableDesc createTable(Session session, CreateTableNode createTable, boolean ifNotExists) throws IOException { |
| TableMeta meta; |
| |
| if (createTable.hasOptions()) { |
| meta = CatalogUtil.newTableMeta(createTable.getStorageType(), createTable.getOptions()); |
| } else { |
| meta = CatalogUtil.newTableMeta(createTable.getStorageType()); |
| } |
| |
| if(createTable.isExternal()){ |
| Preconditions.checkState(createTable.hasPath(), "ERROR: LOCATION must be given."); |
| } else { |
| String databaseName; |
| String tableName; |
| if (CatalogUtil.isFQTableName(createTable.getTableName())) { |
| databaseName = CatalogUtil.extractQualifier(createTable.getTableName()); |
| tableName = CatalogUtil.extractSimpleName(createTable.getTableName()); |
| } else { |
| databaseName = session.getCurrentDatabase(); |
| tableName = createTable.getTableName(); |
| } |
| |
| // create a table directory (i.e., ${WAREHOUSE_DIR}/${DATABASE_NAME}/${TABLE_NAME} ) |
| Path tablePath = StorageUtil.concatPath(sm.getWarehouseDir(), databaseName, tableName); |
| createTable.setPath(tablePath); |
| } |
| |
| return createTableOnPath(session, createTable.getTableName(), createTable.getTableSchema(), |
| meta, createTable.getPath(), createTable.isExternal(), createTable.getPartitionMethod(), ifNotExists); |
| } |
| |
| public TableDesc createTableOnPath(Session session, String tableName, Schema schema, TableMeta meta, |
| Path path, boolean isExternal, PartitionMethodDesc partitionDesc, |
| boolean ifNotExists) |
| throws IOException { |
| String databaseName; |
| String simpleTableName; |
| if (CatalogUtil.isFQTableName(tableName)) { |
| String [] splitted = CatalogUtil.splitFQTableName(tableName); |
| databaseName = splitted[0]; |
| simpleTableName = splitted[1]; |
| } else { |
| databaseName = session.getCurrentDatabase(); |
| simpleTableName = tableName; |
| } |
| String qualifiedName = CatalogUtil.buildFQName(databaseName, simpleTableName); |
| |
| boolean exists = catalog.existsTable(databaseName, simpleTableName); |
| |
| if (exists) { |
| if (ifNotExists) { |
| LOG.info("relation \"" + qualifiedName + "\" is already exists." ); |
| return catalog.getTableDesc(databaseName, simpleTableName); |
| } else { |
| throw new AlreadyExistsTableException(CatalogUtil.buildFQName(databaseName, tableName)); |
| } |
| } |
| |
| FileSystem fs = path.getFileSystem(context.getConf()); |
| |
| if (isExternal) { |
| if(!fs.exists(path)) { |
| LOG.error("ERROR: " + path.toUri() + " does not exist"); |
| throw new IOException("ERROR: " + path.toUri() + " does not exist"); |
| } |
| } else { |
| fs.mkdirs(path); |
| } |
| |
| long totalSize = 0; |
| |
| try { |
| totalSize = sm.calculateSize(path); |
| } catch (IOException e) { |
| LOG.warn("Cannot calculate the size of the relation", e); |
| } |
| |
| TableStats stats = new TableStats(); |
| stats.setNumBytes(totalSize); |
| TableDesc desc = new TableDesc(CatalogUtil.buildFQName(databaseName, simpleTableName), |
| schema, meta, path, isExternal); |
| desc.setStats(stats); |
| if (partitionDesc != null) { |
| desc.setPartitionMethod(partitionDesc); |
| } |
| |
| if (catalog.createTable(desc)) { |
| LOG.info("Table " + desc.getName() + " is created (" + desc.getStats().getNumBytes() + ")"); |
| return desc; |
| } else { |
| LOG.info("Table creation " + tableName + " is failed."); |
| throw new CatalogException("Cannot create table \"" + tableName + "\"."); |
| } |
| } |
| |
| public boolean createDatabase(@Nullable Session session, String databaseName, |
| @Nullable String tablespace, |
| boolean ifNotExists) throws IOException { |
| |
| String tablespaceName; |
| if (tablespace == null) { |
| tablespaceName = DEFAULT_TABLESPACE_NAME; |
| } else { |
| tablespaceName = tablespace; |
| } |
| |
| // CREATE DATABASE IF NOT EXISTS |
| boolean exists = catalog.existDatabase(databaseName); |
| if (exists) { |
| if (ifNotExists) { |
| LOG.info("database \"" + databaseName + "\" is already exists." ); |
| return true; |
| } else { |
| throw new AlreadyExistsDatabaseException(databaseName); |
| } |
| } |
| |
| if (catalog.createDatabase(databaseName, tablespaceName)) { |
| String normalized = databaseName; |
| Path databaseDir = StorageUtil.concatPath(context.getConf().getVar(TajoConf.ConfVars.WAREHOUSE_DIR), normalized); |
| FileSystem fs = databaseDir.getFileSystem(context.getConf()); |
| fs.mkdirs(databaseDir); |
| } |
| |
| return true; |
| } |
| |
| public boolean dropDatabase(Session session, String databaseName, boolean ifExists) { |
| |
| boolean exists = catalog.existDatabase(databaseName); |
| if(!exists) { |
| if (ifExists) { // DROP DATABASE IF EXISTS |
| LOG.info("database \"" + databaseName + "\" does not exists." ); |
| return true; |
| } else { // Otherwise, it causes an exception. |
| throw new NoSuchDatabaseException(databaseName); |
| } |
| } |
| |
| if (session.getCurrentDatabase().equals(databaseName)) { |
| throw new RuntimeException("ERROR: Cannot drop the current open database"); |
| } |
| |
| boolean result = catalog.dropDatabase(databaseName); |
| LOG.info("database " + databaseName + " is dropped."); |
| return result; |
| } |
| |
| /** |
| * Drop a given named table |
| * |
| * @param tableName to be dropped |
| * @param purge Remove all data if purge is true. |
| */ |
| public boolean dropTable(Session session, String tableName, boolean ifExists, boolean purge) { |
| CatalogService catalog = context.getCatalog(); |
| |
| String databaseName; |
| String simpleTableName; |
| if (CatalogUtil.isFQTableName(tableName)) { |
| String [] splitted = CatalogUtil.splitFQTableName(tableName); |
| databaseName = splitted[0]; |
| simpleTableName = splitted[1]; |
| } else { |
| databaseName = session.getCurrentDatabase(); |
| simpleTableName = tableName; |
| } |
| String qualifiedName = CatalogUtil.buildFQName(databaseName, simpleTableName); |
| |
| boolean exists = catalog.existsTable(qualifiedName); |
| if(!exists) { |
| if (ifExists) { // DROP TABLE IF EXISTS |
| LOG.info("relation \"" + qualifiedName + "\" is already exists." ); |
| return true; |
| } else { // Otherwise, it causes an exception. |
| throw new NoSuchTableException(qualifiedName); |
| } |
| } |
| |
| Path path = catalog.getTableDesc(qualifiedName).getPath(); |
| catalog.dropTable(qualifiedName); |
| |
| if (purge) { |
| try { |
| FileSystem fs = path.getFileSystem(context.getConf()); |
| fs.delete(path, true); |
| } catch (IOException e) { |
| throw new InternalError(e.getMessage()); |
| } |
| } |
| |
| LOG.info(String.format("relation \"%s\" is " + (purge ? " purged." : " dropped."), qualifiedName)); |
| return true; |
| } |
| |
| public interface DistributedQueryHook { |
| boolean isEligible(QueryContext queryContext, LogicalPlan plan); |
| void hook(QueryContext queryContext, LogicalPlan plan) throws Exception; |
| } |
| |
| public static class DistributedQueryHookManager { |
| private List<DistributedQueryHook> hooks = new ArrayList<DistributedQueryHook>(); |
| public void addHook(DistributedQueryHook hook) { |
| hooks.add(hook); |
| } |
| |
| public void doHooks(QueryContext queryContext, LogicalPlan plan) { |
| for (DistributedQueryHook hook : hooks) { |
| if (hook.isEligible(queryContext, plan)) { |
| try { |
| hook.hook(queryContext, plan); |
| } catch (Throwable t) { |
| t.printStackTrace(); |
| } |
| } |
| } |
| } |
| } |
| |
| public class CreateTableHook implements DistributedQueryHook { |
| |
| @Override |
| public boolean isEligible(QueryContext queryContext, LogicalPlan plan) { |
| LogicalRootNode rootNode = plan.getRootBlock().getRoot(); |
| return rootNode.getChild().getType() == NodeType.CREATE_TABLE; |
| } |
| |
| @Override |
| public void hook(QueryContext queryContext, LogicalPlan plan) throws Exception { |
| LogicalRootNode rootNode = plan.getRootBlock().getRoot(); |
| CreateTableNode createTableNode = rootNode.getChild(); |
| String [] splitted = CatalogUtil.splitFQTableName(createTableNode.getTableName()); |
| String databaseName = splitted[0]; |
| String tableName = splitted[1]; |
| queryContext.setOutputTable(tableName); |
| queryContext.setOutputPath( |
| StorageUtil.concatPath(TajoConf.getWarehouseDir(context.getConf()), databaseName, tableName)); |
| if(createTableNode.getPartitionMethod() != null) { |
| queryContext.setPartitionMethod(createTableNode.getPartitionMethod()); |
| } |
| queryContext.setCreateTable(); |
| } |
| } |
| |
| public static class InsertHook implements DistributedQueryHook { |
| |
| @Override |
| public boolean isEligible(QueryContext queryContext, LogicalPlan plan) { |
| return plan.getRootBlock().getRootType() == NodeType.INSERT; |
| } |
| |
| @Override |
| public void hook(QueryContext queryContext, LogicalPlan plan) throws Exception { |
| queryContext.setInsert(); |
| |
| InsertNode insertNode = plan.getRootBlock().getNode(NodeType.INSERT); |
| |
| // Set QueryContext settings, such as output table name and output path. |
| // It also remove data files if overwrite is true. |
| Path outputPath; |
| if (insertNode.hasTargetTable()) { // INSERT INTO [TB_NAME] |
| queryContext.setOutputTable(insertNode.getTableName()); |
| queryContext.setOutputPath(insertNode.getPath()); |
| } else { // INSERT INTO LOCATION ... |
| // When INSERT INTO LOCATION, must not set output table. |
| outputPath = insertNode.getPath(); |
| queryContext.setFileOutput(); |
| queryContext.setOutputPath(outputPath); |
| } |
| |
| if (insertNode.isOverwrite()) { |
| queryContext.setOutputOverwrite(); |
| } |
| } |
| } |
| } |