blob: dd690e48ce5d7036ede7617d19f00b9e87d34095 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog.{
CatalogRelation, CatalogTable, CatalogTableType,
SimpleCatalogRelation
}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command.{
AlterTableRecoverPartitionsCommand, DDLUtils,
RunnableCommand
}
import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.sources.InsertableRelation
import org.apache.spark.sql.types.StructType
/**
* Create table 'using carbondata' and insert the query result into it.
* @param table the Catalog Table
* @param mode SaveMode:Ignore,OverWrite,ErrorIfExists,Append
* @param query the query whose result will be insert into the new relation
*/
case class CreateCarbonSourceTableAsSelectCommand(
table: CatalogTable,
mode: SaveMode,
query: LogicalPlan)
extends RunnableCommand {
override protected def innerChildren: Seq[LogicalPlan] = Seq(query)
override def run(sparkSession: SparkSession): Seq[Row] = {
assert(table.tableType != CatalogTableType.VIEW)
assert(table.provider.isDefined)
assert(table.schema.isEmpty)
val provider = table.provider.get
val sessionState = sparkSession.sessionState
val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
val tableIdentWithDB = table.identifier.copy(database = Some(db))
val tableName = tableIdentWithDB.unquotedString
var createMetastoreTable = false
var existingSchema = Option.empty[StructType]
if (sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) {
// Check if we need to throw an exception or just return.
mode match {
case SaveMode.ErrorIfExists =>
throw new AnalysisException(s"Table $tableName already exists. " +
s"If you are using saveAsTable, you can set SaveMode to " +
s"SaveMode.Append to " +
s"insert data into the table or set SaveMode to SaveMode" +
s".Overwrite to overwrite" +
s"the existing data. " +
s"Or, if you are using SQL CREATE TABLE, you need to drop " +
s"$tableName first.")
case SaveMode.Ignore =>
// Since the table already exists and the save mode is Ignore, we will just return.
return Seq.empty[Row]
case SaveMode.Append =>
// Check if the specified data source match the data source of the existing table.
val existingProvider = DataSource.lookupDataSource(provider)
// TODO: Check that options from the resolved relation match the relation that we are
// inserting into (i.e. using the same compression).
// Pass a table identifier with database part, so that `lookupRelation` won't get temp
// views unexpectedly.
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) match {
case l@LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
// check if the file formats match
l.relation match {
case r: HadoopFsRelation if r.fileFormat.getClass != existingProvider =>
throw new AnalysisException(
s"The file format of the existing table $tableName is " +
s"`${ r.fileFormat.getClass.getName }`. It doesn't match the specified " +
s"format `$provider`")
case _ =>
}
if (query.schema.size != l.schema.size) {
throw new AnalysisException(
s"The column number of the existing schema[${ l.schema }] " +
s"doesn't match the data schema[${ query.schema }]'s")
}
existingSchema = Some(l.schema)
case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) =>
existingSchema = Some(s.metadata.schema)
case c: CatalogRelation if c.catalogTable.provider == Some(DDLUtils.HIVE_PROVIDER) =>
throw new AnalysisException("Saving data in the Hive serde table " +
s"${ c.catalogTable.identifier } is not supported yet. " +
s"Please use the insertInto() API as an alternative..")
case o =>
throw new AnalysisException(s"Saving data in ${ o.toString } is not supported.")
}
case SaveMode.Overwrite =>
sessionState.catalog.dropTable(tableIdentWithDB, ignoreIfNotExists = true, purge = false)
// Need to create the table again.
createMetastoreTable = true
}
} else {
// The table does not exist. We need to create it in metastore.
createMetastoreTable = true
}
val data = Dataset.ofRows(sparkSession, query)
val df = existingSchema match {
// If we are inserting into an existing table, just use the existing schema.
case Some(s) => data.selectExpr(s.fieldNames: _*)
case None => data
}
val tableLocation = if (table.tableType == CatalogTableType.MANAGED) {
Some(sessionState.catalog.defaultTablePath(table.identifier))
} else {
table.storage.locationUri
}
// Create the relation based on the data of df.
val pathOption = tableLocation.map("path" -> _)
val dataSource = DataSource(
sparkSession,
className = provider,
partitionColumns = table.partitionColumnNames,
bucketSpec = table.bucketSpec,
options = table.storage.properties ++ pathOption,
catalogTable = Some(table))
val result = try {
dataSource.write(mode, df)
} catch {
case ex: AnalysisException =>
logError(s"Failed to write to table $tableName in $mode mode", ex)
throw ex
}
result match {
case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&
sparkSession.sqlContext.conf.manageFilesourcePartitions =>
// Need to recover partitions into the metastore so our saved data is visible.
sparkSession.sessionState.executePlan(
AlterTableRecoverPartitionsCommand(table.identifier)).toRdd
case _ =>
}
// Refresh the cache of the table in the catalog.
sessionState.catalog.refreshTable(tableIdentWithDB)
Seq.empty[Row]
}
}