blob: 7c8993e34eb85f93d14439183d5236354b6ba35b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.strategy
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand, CarbonInsertIntoCommand, CarbonLoadDataCommand, RefreshCarbonTableCommand}
import org.apache.spark.sql.execution.command.partition.{CarbonAlterTableAddHivePartitionCommand, CarbonAlterTableDropHivePartitionCommand}
import org.apache.spark.sql.execution.command.schema._
import org.apache.spark.sql.execution.command.table.{CarbonCreateTableLikeCommand, CarbonDescribeFormattedCommand, CarbonDropTableCommand}
import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand}
import org.apache.spark.sql.CarbonExpressions.{CarbonDescribeTable => DescribeTableCommand}
import org.apache.spark.sql.execution.datasources.{RefreshResource, RefreshTable}
import org.apache.spark.sql.hive.{CarbonRelation, CreateCarbonSourceTableAsSelectCommand}
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
import org.apache.spark.sql.types.StructField
import org.apache.spark.util.{CarbonReflectionUtils, DataMapUtil, FileUtils, SparkUtil}
import org.apache.carbondata.common.exceptions.DeprecatedFeatureException
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo}
import org.apache.carbondata.spark.util.Util
/**
* Carbon strategies for ddl commands
* CreateDataSourceTableAsSelectCommand class has extra argument in
* 2.3, so need to add wrapper to match the case
*/
object MatchCreateDataSourceTable {
def unapply(plan: LogicalPlan): Option[(CatalogTable, SaveMode, LogicalPlan)] = plan match {
case t: CreateDataSourceTableAsSelectCommand => Some(t.table, t.mode, t.query)
case _ => None
}
}
class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
def apply(plan: LogicalPlan): Seq[SparkPlan] = {
plan match {
case LoadDataCommand(identifier, path, isLocal, isOverwrite, partition)
if CarbonEnv.getInstance(sparkSession).carbonMetaStore
.tableExists(identifier)(sparkSession) =>
ExecutedCommandExec(
CarbonLoadDataCommand(
databaseNameOp = identifier.database,
tableName = identifier.table.toLowerCase,
factPathFromUser = path,
dimFilesPath = Seq(),
options = Map(),
isOverwriteTable = isOverwrite,
inputSqlString = null,
dataFrame = None,
updateModel = None,
tableInfoOp = None,
internalOptions = Map.empty,
partition = partition.getOrElse(Map.empty).map { case (col, value) =>
(col, Some(value))})) :: Nil
case alter@AlterTableRenameCommand(oldTableIdentifier, newTableIdentifier, _) =>
val dbOption = oldTableIdentifier.database.map(_.toLowerCase)
val tableIdentifier = TableIdentifier(oldTableIdentifier.table.toLowerCase(), dbOption)
val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetaStore
.tableExists(tableIdentifier)(sparkSession)
if (isCarbonTable) {
val renameModel = AlterTableRenameModel(tableIdentifier, newTableIdentifier)
ExecutedCommandExec(CarbonAlterTableRenameCommand(renameModel)) :: Nil
} else {
ExecutedCommandExec(alter) :: Nil
}
case DropTableCommand(identifier, ifNotExists, isView, _)
if CarbonEnv.getInstance(sparkSession).carbonMetaStore
.isTablePathExists(identifier)(sparkSession) =>
ExecutedCommandExec(
CarbonDropTableCommand(ifNotExists, identifier.database,
identifier.table.toLowerCase)) :: Nil
case createLikeTable: CreateTableLikeCommand =>
val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetaStore
.tableExists(createLikeTable.sourceTable)(sparkSession)
if (isCarbonTable) {
ExecutedCommandExec(CarbonCreateTableLikeCommand(createLikeTable.sourceTable,
createLikeTable.targetTable, createLikeTable.ifNotExists)) :: Nil
} else {
ExecutedCommandExec(createLikeTable) :: Nil
}
case InsertIntoCarbonTable(relation: CarbonDatasourceHadoopRelation,
partition, child: LogicalPlan, overwrite, _) =>
ExecutedCommandExec(CarbonInsertIntoCommand(relation, child, overwrite, partition)) :: Nil
case createDb@CreateDatabaseCommand(dbName, ifNotExists, _, _, _) =>
val dbLocation = try {
CarbonEnv.getDatabaseLocation(dbName, sparkSession)
} catch {
case e: NoSuchDatabaseException =>
CarbonProperties.getStorePath
}
ThreadLocalSessionInfo
.setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
FileUtils.createDatabaseDirectory(dbName, dbLocation, sparkSession.sparkContext)
ExecutedCommandExec(createDb) :: Nil
case drop@DropDatabaseCommand(dbName,
ifExists, isCascade) if CarbonEnv.databaseLocationExists(dbName, sparkSession, ifExists) =>
ExecutedCommandExec(CarbonDropDatabaseCommand(drop)) :: Nil
case alterTable@CarbonAlterTableCompactionCommand(altertablemodel, _, _) =>
val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetaStore
.tableExists(TableIdentifier(altertablemodel.tableName,
altertablemodel.dbName))(sparkSession)
if (isCarbonTable) {
ExecutedCommandExec(alterTable) :: Nil
} else {
throw new MalformedCarbonCommandException(
String.format("Table or view '%s' not found in database '%s' or not carbon fileformat",
altertablemodel.tableName,
altertablemodel.dbName.getOrElse("default")))
}
case colRenameDataTypeChange@CarbonAlterTableColRenameDataTypeChangeCommand(
alterTableColRenameAndDataTypeChangeModel, _) =>
val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetaStore
.tableExists(TableIdentifier(alterTableColRenameAndDataTypeChangeModel.tableName,
alterTableColRenameAndDataTypeChangeModel.databaseName))(sparkSession)
if (isCarbonTable) {
val carbonTable = CarbonEnv
.getCarbonTable(alterTableColRenameAndDataTypeChangeModel.databaseName,
alterTableColRenameAndDataTypeChangeModel.tableName)(sparkSession)
if (carbonTable != null && carbonTable.isFileLevelFormat) {
throw new MalformedCarbonCommandException(
"Unsupported alter operation on Carbon external fileformat table")
} else if (carbonTable != null && !carbonTable.getTableInfo.isTransactionalTable) {
throw new MalformedCarbonCommandException(
"Unsupported operation on non transactional table")
} else {
ExecutedCommandExec(colRenameDataTypeChange) :: Nil
}
} else {
throw new MalformedCarbonCommandException(
String.format("Table or view '%s' not found in database '%s' or not carbon fileformat",
alterTableColRenameAndDataTypeChangeModel.tableName,
alterTableColRenameAndDataTypeChangeModel.
databaseName.getOrElse("default")))
}
case addColumn@CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel) =>
val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetaStore
.tableExists(TableIdentifier(alterTableAddColumnsModel.tableName,
alterTableAddColumnsModel.databaseName))(sparkSession)
if (isCarbonTable) {
val carbonTable = CarbonEnv.getCarbonTable(alterTableAddColumnsModel.databaseName,
alterTableAddColumnsModel.tableName)(sparkSession)
if (carbonTable != null && carbonTable.isFileLevelFormat) {
throw new MalformedCarbonCommandException(
"Unsupported alter operation on Carbon external fileformat table")
} else if (carbonTable != null && !carbonTable.getTableInfo.isTransactionalTable) {
throw new MalformedCarbonCommandException(
"Unsupported operation on non transactional table")
} else {
ExecutedCommandExec(addColumn) :: Nil
}
// TODO: remove this else if check once the 2.1 version is unsupported by carbon
} else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
val structField = (alterTableAddColumnsModel.dimCols ++ alterTableAddColumnsModel.msrCols)
.map {
a =>
StructField(a.column,
Util.convertCarbonToSparkDataType(DataTypeUtil.valueOf(a.dataType.get)))
}
val identifier = TableIdentifier(
alterTableAddColumnsModel.tableName,
alterTableAddColumnsModel.databaseName)
ExecutedCommandExec(CarbonReflectionUtils
.invokeAlterTableAddColumn(identifier, structField).asInstanceOf[RunnableCommand]) ::
Nil
// TODO: remove this else check once the 2.1 version is unsupported by carbon
} else {
throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
}
case dropColumn@CarbonAlterTableDropColumnCommand(alterTableDropColumnModel) =>
val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetaStore
.tableExists(TableIdentifier(alterTableDropColumnModel.tableName,
alterTableDropColumnModel.databaseName))(sparkSession)
if (isCarbonTable) {
val carbonTable = CarbonEnv.getCarbonTable(alterTableDropColumnModel.databaseName,
alterTableDropColumnModel.tableName)(sparkSession)
if (carbonTable != null && carbonTable.isFileLevelFormat) {
throw new MalformedCarbonCommandException(
"Unsupported alter operation on Carbon external fileformat table")
} else if (carbonTable != null && !carbonTable.getTableInfo.isTransactionalTable) {
throw new MalformedCarbonCommandException(
"Unsupported operation on non transactional table")
} else {
ExecutedCommandExec(dropColumn) :: Nil
}
} else {
throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
}
case desc@DescribeTableCommand(identifier, partitionSpec, isExtended) =>
val isFormatted: Boolean = if (sparkSession.version.startsWith("2.1")) {
CarbonReflectionUtils
.getDescribeTableFormattedField(desc.asInstanceOf[DescribeTableCommand])
} else {
false
}
if (CarbonEnv.getInstance(sparkSession).carbonMetaStore
.tableExists(identifier)(sparkSession) && (isExtended || isFormatted)) {
val resolvedTable =
sparkSession.sessionState.executePlan(UnresolvedRelation(identifier)).analyzed
val resultPlan = sparkSession.sessionState.executePlan(resolvedTable).executedPlan
ExecutedCommandExec(
CarbonDescribeFormattedCommand(
resultPlan,
plan.output,
partitionSpec,
identifier)) :: Nil
} else {
Nil
}
case adp@AlterTableDropPartitionCommand(tableName, specs, ifExists, purge, retainData) =>
val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetaStore
.tableExists(tableName)(sparkSession)
if (isCarbonTable) {
ExecutedCommandExec(
CarbonAlterTableDropHivePartitionCommand(
tableName,
specs,
ifExists,
purge,
retainData)) :: Nil
} else {
ExecutedCommandExec(adp) :: Nil
}
case set@SetCommand(kv) =>
ExecutedCommandExec(CarbonSetCommand(set)) :: Nil
case reset@ResetCommand =>
ExecutedCommandExec(CarbonResetCommand()) :: Nil
case org.apache.spark.sql.execution.datasources.CreateTable(tableDesc, mode, None)
if tableDesc.provider.get != DDLUtils.HIVE_PROVIDER
&& (tableDesc.provider.get.equals("org.apache.spark.sql.CarbonSource")
|| tableDesc.provider.get.equalsIgnoreCase("carbondata")) =>
val updatedCatalog =
CarbonSource.updateCatalogTableWithCarbonSchema(tableDesc, sparkSession)
val cmd =
CreateDataSourceTableCommand(updatedCatalog, ignoreIfExists = mode == SaveMode.Ignore)
ExecutedCommandExec(cmd) :: Nil
case MatchCreateDataSourceTable(tableDesc, mode, query)
if tableDesc.provider.get != DDLUtils.HIVE_PROVIDER
&& (tableDesc.provider.get.equals("org.apache.spark.sql.CarbonSource")
|| tableDesc.provider.get.equalsIgnoreCase("carbondata")) =>
val updatedCatalog = CarbonSource
.updateCatalogTableWithCarbonSchema(tableDesc, sparkSession, Option(query))
val cmd = CreateCarbonSourceTableAsSelectCommand(updatedCatalog, SaveMode.Ignore, query)
ExecutedCommandExec(cmd) :: Nil
case cmd@org.apache.spark.sql.execution.datasources.CreateTable(tableDesc, mode, query)
if tableDesc.provider.get != DDLUtils.HIVE_PROVIDER
&& (tableDesc.provider.get.equals("org.apache.spark.sql.CarbonSource")
|| tableDesc.provider.get.equalsIgnoreCase("carbondata")) =>
val updatedCatalog = CarbonSource
.updateCatalogTableWithCarbonSchema(tableDesc, sparkSession, query)
val cmd = CreateCarbonSourceTableAsSelectCommand(updatedCatalog, SaveMode.Ignore, query.get)
ExecutedCommandExec(cmd) :: Nil
case CreateDataSourceTableCommand(table, ignoreIfExists)
if table.provider.get != DDLUtils.HIVE_PROVIDER
&& (table.provider.get.equals("org.apache.spark.sql.CarbonSource")
|| table.provider.get.equalsIgnoreCase("carbondata")) =>
val updatedCatalog = CarbonSource
.updateCatalogTableWithCarbonSchema(table, sparkSession)
val cmd = CreateDataSourceTableCommand(updatedCatalog, ignoreIfExists)
ExecutedCommandExec(cmd) :: Nil
case AlterTableSetPropertiesCommand(tableName, properties, isView)
if CarbonEnv.getInstance(sparkSession).carbonMetaStore
.tableExists(tableName)(sparkSession) => {
val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetaStore
.lookupRelation(tableName)(sparkSession).asInstanceOf[CarbonRelation].carbonTable
if (carbonTable != null && !carbonTable.getTableInfo.isTransactionalTable) {
throw new MalformedCarbonCommandException(
"Unsupported operation on non transactional table")
}
// TODO remove this limitation later
val property = properties.find(_._1.equalsIgnoreCase("streaming"))
if (property.isDefined) {
if (carbonTable.getTablePath.startsWith("s3") && property.get._2.equalsIgnoreCase("s3")) {
throw new UnsupportedOperationException("streaming is not supported with s3 store")
}
if (carbonTable.isStreamingSink) {
throw new MalformedCarbonCommandException(
"Streaming property can not be changed once it is 'true'")
} else {
if (!property.get._2.trim.equalsIgnoreCase("true")) {
throw new MalformedCarbonCommandException(
"Streaming property value is incorrect")
}
if (DataMapUtil.hasMVDataMap(carbonTable)) {
throw new MalformedCarbonCommandException(
"The table which has MV datamap does not support set streaming property")
}
if (carbonTable.isChildTableForMV) {
throw new MalformedCarbonCommandException(
"Datamap table does not support set streaming property")
}
}
}
ExecutedCommandExec(CarbonAlterTableSetCommand(tableName, properties, isView)) :: Nil
}
case AlterTableUnsetPropertiesCommand(tableName, propKeys, ifExists, isView)
if CarbonEnv.getInstance(sparkSession).carbonMetaStore
.tableExists(tableName)(sparkSession) => {
// TODO remove this limitation later
if (propKeys.exists(_.equalsIgnoreCase("streaming"))) {
throw new MalformedCarbonCommandException(
"Streaming property can not be removed")
}
ExecutedCommandExec(
CarbonAlterTableUnsetCommand(tableName, propKeys, ifExists, isView)) :: Nil
}
case rename@AlterTableRenamePartitionCommand(tableName, oldPartition, newPartition) =>
val dbOption = tableName.database.map(_.toLowerCase)
val tableIdentifier = TableIdentifier(tableName.table.toLowerCase(), dbOption)
val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetaStore
.tableExists(tableIdentifier)(sparkSession)
if (isCarbonTable) {
throw new UnsupportedOperationException("Renaming partition on table is not supported")
} else {
ExecutedCommandExec(rename) :: Nil
}
case addPart@AlterTableAddPartitionCommand(tableName, partitionSpecsAndLocs, ifNotExists) =>
val dbOption = tableName.database.map(_.toLowerCase)
val tableIdentifier = TableIdentifier(tableName.table.toLowerCase(), dbOption)
val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetaStore
.tableExists(tableIdentifier)(sparkSession)
if (isCarbonTable) {
ExecutedCommandExec(
CarbonAlterTableAddHivePartitionCommand(
tableName,
partitionSpecsAndLocs,
ifNotExists)) :: Nil
} else {
ExecutedCommandExec(addPart) :: Nil
}
case RefreshTable(tableIdentifier) =>
RefreshCarbonTableCommand(tableIdentifier.database,
tableIdentifier.table).run(sparkSession)
ExecutedCommandExec(RefreshTable(tableIdentifier)) :: Nil
case refresh@RefreshResource(path : String) =>
val plan = try {
new CarbonSpark2SqlParser().parse(s"REFRESH $path")
} catch {
case e: Exception =>
LOGGER.error(e.getMessage)
refresh
}
ExecutedCommandExec(plan.asInstanceOf[RunnableCommand]) :: Nil
case alterSetLoc@AlterTableSetLocationCommand(tableName, _, _) =>
val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetaStore
.tableExists(tableName)(sparkSession)
if (isCarbonTable) {
throw new UnsupportedOperationException("Set partition location is not supported")
} else {
ExecutedCommandExec(alterSetLoc) :: Nil
}
case _ => Nil
}
}
}