| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.spark.sql.execution.strategy |
| |
| import org.apache.commons.lang3.StringUtils |
| import org.apache.spark.sql.{CarbonEnv, InsertIntoCarbonTable, SparkSession} |
| import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil |
| import org.apache.spark.sql.catalyst.TableIdentifier |
| import org.apache.spark.sql.catalyst.analysis.NoSuchTableException |
| import org.apache.spark.sql.execution.command.{ExecutedCommandExec, RunnableCommand} |
| import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand, CarbonInsertIntoCommand} |
| import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableColRenameDataTypeChangeCommand, CarbonAlterTableDropColumnCommand} |
| import org.apache.spark.sql.execution.SparkPlan |
| import org.apache.spark.sql.types.StructField |
| import org.apache.spark.util.{CarbonReflectionUtils, SparkUtil} |
| |
| import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException |
| import org.apache.carbondata.core.util.DataTypeUtil |
| |
| object CarbonPlanHelper { |
| |
| def insertInto(insertInto: InsertIntoCarbonTable): CarbonInsertIntoCommand = { |
| CarbonInsertIntoCommand( |
| databaseNameOp = Some(insertInto.table.carbonRelation.databaseName), |
| tableName = insertInto.table.carbonRelation.tableName, |
| options = scala.collection.immutable |
| .Map("fileheader" -> insertInto.table.tableSchema.get.fields.map(_.name).mkString(",")), |
| isOverwriteTable = insertInto.overwrite, |
| logicalPlan = insertInto.child, |
| tableInfo = insertInto.table.carbonRelation.carbonTable.getTableInfo, |
| partition = insertInto.partition.map(entry => (entry._1.toLowerCase, entry._2))) |
| } |
| |
| def addColumn( |
| addColumnCommand: CarbonAlterTableAddColumnCommand, |
| sparkSession: SparkSession |
| ): Seq[SparkPlan] = { |
| val alterTableAddColumnsModel = addColumnCommand.alterTableAddColumnsModel |
| if (isCarbonTable(TableIdentifier(alterTableAddColumnsModel.tableName, |
| alterTableAddColumnsModel.databaseName), sparkSession)) { |
| requireTransactionalTable(alterTableAddColumnsModel.databaseName, |
| alterTableAddColumnsModel.tableName, sparkSession) |
| ExecutedCommandExec(addColumnCommand) :: Nil |
| // TODO: remove this else if check once the 2.1 version is unsupported by carbon |
| } else if (SparkUtil.isSparkVersionXAndAbove("2.2")) { |
| val structField = (alterTableAddColumnsModel.dimCols ++ alterTableAddColumnsModel.msrCols) |
| .map { f => |
| val structField = StructField(f.column, |
| CarbonSparkDataSourceUtil |
| .convertCarbonToSparkDataType(DataTypeUtil.valueOf(f.dataType.get))) |
| if (StringUtils.isNotEmpty(f.columnComment)) { |
| structField.withComment(f.columnComment) |
| } else { |
| structField |
| } |
| } |
| val identifier = TableIdentifier( |
| alterTableAddColumnsModel.tableName, |
| alterTableAddColumnsModel.databaseName) |
| ExecutedCommandExec(CarbonReflectionUtils |
| .invokeAlterTableAddColumn(identifier, structField).asInstanceOf[RunnableCommand]) :: |
| Nil |
| // TODO: remove this else check once the 2.1 version is unsupported by carbon |
| } else { |
| throw new MalformedCarbonCommandException("Unsupported alter operation on hive table") |
| } |
| } |
| |
| def changeColumn( |
| changeColumnCommand: CarbonAlterTableColRenameDataTypeChangeCommand, |
| sparkSession: SparkSession |
| ): Seq[SparkPlan] = { |
| val model = changeColumnCommand.alterTableColRenameAndDataTypeChangeModel |
| if (isCarbonTable(TableIdentifier(model.tableName, model.databaseName), sparkSession)) { |
| requireTransactionalTable(model.databaseName, model.tableName, sparkSession) |
| ExecutedCommandExec(changeColumnCommand) :: Nil |
| } else { |
| throw new MalformedCarbonCommandException( |
| String.format("Table or view '%s' not found in database '%s' or not carbon fileformat", |
| model.tableName, |
| model.databaseName.getOrElse("default"))) |
| } |
| } |
| |
| def dropColumn( |
| dropColumnCommand: CarbonAlterTableDropColumnCommand, |
| sparkSession: SparkSession |
| ): Seq[SparkPlan] = { |
| val alterTableDropColumnModel = dropColumnCommand.alterTableDropColumnModel |
| requireTransactionalTable(alterTableDropColumnModel.databaseName, |
| alterTableDropColumnModel.tableName, sparkSession) |
| ExecutedCommandExec(dropColumnCommand) :: Nil |
| } |
| |
| def compact( |
| compactionCommand: CarbonAlterTableCompactionCommand, |
| sparkSession: SparkSession |
| ): Seq[SparkPlan] = { |
| val alterTableModel = compactionCommand.alterTableModel |
| if (isCarbonTable(TableIdentifier(alterTableModel.tableName, alterTableModel.dbName), |
| sparkSession)) { |
| ExecutedCommandExec(compactionCommand) :: Nil |
| } else { |
| throw new MalformedCarbonCommandException( |
| String.format("Table or view '%s' not found in database '%s' or not carbon fileformat", |
| alterTableModel.tableName, |
| alterTableModel.dbName.getOrElse("default"))) |
| } |
| } |
| |
| def isCarbonTable(tableIdent: TableIdentifier, sparkSession: SparkSession): Boolean = { |
| val dbOption = tableIdent.database.map(_.toLowerCase) |
| val tableIdentifier = TableIdentifier(tableIdent.table.toLowerCase(), dbOption) |
| CarbonEnv |
| .getInstance(sparkSession) |
| .carbonMetaStore |
| .tableExists(tableIdentifier)(sparkSession) |
| } |
| |
| def isTableExists(tableIdent: TableIdentifier, sparkSession: SparkSession): Boolean = { |
| val dbOption = tableIdent.database.map(_.toLowerCase) |
| val tableIdentifier = TableIdentifier(tableIdent.table.toLowerCase(), dbOption) |
| sparkSession.sessionState.catalog.tableExists(tableIdentifier) |
| } |
| |
| /** |
| * only carbon transaction table support: drop column, add column, change column |
| */ |
| def requireTransactionalTable(databaseName: Option[String], tableName: String, |
| sparkSession: SparkSession): Unit = { |
| val carbonTable = CarbonEnv.getCarbonTable(databaseName, tableName)(sparkSession) |
| if (carbonTable != null && carbonTable.isFileLevelFormat) { |
| throw new MalformedCarbonCommandException( |
| "Unsupported alter operation on Carbon external fileformat table") |
| } else if (carbonTable != null && !carbonTable.getTableInfo.isTransactionalTable) { |
| throw new MalformedCarbonCommandException( |
| "Unsupported operation on non transactional table") |
| } |
| } |
| |
| def validateCarbonTable( |
| tableIdentifier: TableIdentifier, |
| sparkSession: SparkSession, |
| message: String |
| ): Unit = { |
| if (!CarbonPlanHelper.isTableExists(tableIdentifier, sparkSession)) { |
| throw new NoSuchTableException( |
| tableIdentifier.database.getOrElse( |
| CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession)), |
| tableIdentifier.table) |
| } |
| if (!CarbonPlanHelper.isCarbonTable(tableIdentifier, sparkSession)) { |
| throw new UnsupportedOperationException(message) |
| } |
| } |
| } |