blob: ee9fb0f21db3ca88380370ba20d04023bdb231ab [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive
import java.net.URI
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, AtomicRunnableCommand, RunnableCommand}
import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation}
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.{AnalysisException, Dataset, Row, SaveMode, SparkSession}
import org.apache.spark.util.CarbonReflectionUtils
/**
* Create table 'using carbondata' and insert the query result into it.
*
* @param table the Catalog Table
* @param mode SaveMode:Ignore,OverWrite,ErrorIfExists,Append
* @param query the query whose result will be insert into the new relation
*
*/
case class CreateCarbonSourceTableAsSelectCommand(
table: CatalogTable,
mode: SaveMode,
query: LogicalPlan)
extends AtomicRunnableCommand {
override protected def innerChildren: Seq[LogicalPlan] = Seq(query)
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
Seq.empty
}
override def processData(sparkSession: SparkSession): Seq[Row] ={
assert(table.tableType != CatalogTableType.VIEW)
assert(table.provider.isDefined)
val sessionState = sparkSession.sessionState
val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
val tableIdentWithDB = table.identifier.copy(database = Some(db))
val tableName = tableIdentWithDB.unquotedString
setAuditTable(db, table.identifier.table)
if (sessionState.catalog.tableExists(tableIdentWithDB)) {
assert(mode != SaveMode.Overwrite,
s"Expect the table $tableName has been dropped when the save mode is Overwrite")
if (mode == SaveMode.ErrorIfExists) {
throw new AnalysisException(s"Table $tableName already exists. You need to drop it first.")
}
if (mode == SaveMode.Ignore) {
// Since the table already exists and the save mode is Ignore, we will just return.
return Seq.empty
}
saveDataIntoTable(
sparkSession, table, table.storage.locationUri, query, SaveMode.Append, tableExists = true)
} else {
assert(table.schema.isEmpty)
val tableLocation = if (table.tableType == CatalogTableType.MANAGED) {
Some(sessionState.catalog.defaultTablePath(table.identifier))
} else {
table.storage.locationUri
}
val result = saveDataIntoTable(
sparkSession, table, tableLocation, query, SaveMode.Overwrite, tableExists = false)
result match {
case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&
sparkSession.sqlContext.conf.manageFilesourcePartitions =>
// Need to recover partitions into the metastore so our saved data is visible.
sessionState.executePlan(AlterTableRecoverPartitionsCommand(table.identifier)).toRdd
case _ =>
}
}
Seq.empty[Row]
}
private def saveDataIntoTable(
session: SparkSession,
table: CatalogTable,
tableLocation: Option[URI],
data: LogicalPlan,
mode: SaveMode,
tableExists: Boolean): BaseRelation = {
// Create the relation based on the input logical plan: `data`.
val pathOption = tableLocation.map("path" -> CatalogUtils.URIToString(_))
val dataSource = DataSource(
session,
className = table.provider.get,
partitionColumns = table.partitionColumnNames,
bucketSpec = table.bucketSpec,
options = table.storage.properties ++ pathOption,
catalogTable = if (tableExists) {
Some(table)
} else {
None
})
try {
val physicalPlan = session.sessionState.executePlan(data).executedPlan
CarbonReflectionUtils.invokewriteAndReadMethod(dataSource,
Dataset.ofRows(session, query),
data,
session,
mode,
query,
physicalPlan)
} catch {
case ex: AnalysisException =>
logError(s"Failed to write to table ${ table.identifier.unquotedString }", ex)
throw ex
}
}
override protected def opName: String = "CREATE TABLE AS SELECT"
}