blob: f36e1ac0aed45578c24513d805df7a2b5ebf9ad0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command.table
import org.apache.spark.sql.{CarbonEnv, CarbonSource, Row, SparkSession}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.execution.command.{CreateDataSourceTableCommand, MetadataCommand}
/**
* Command to create table in case of 'USING CARBONDATA' DDL
*
* @param catalogTable catalog table created by spark
* @param ignoreIfExists ignore if table exists
* @param sparkSession spark session
*/
case class CarbonCreateDataSourceTableCommand(
catalogTable: CatalogTable,
ignoreIfExists: Boolean,
sparkSession: SparkSession)
extends MetadataCommand {
override def processMetadata(session: SparkSession): Seq[Row] = {
// Run the spark command to create table in metastore before saving carbon schema
// in table path.
// This is required for spark 2.4, because spark 2.4 will fail to create table
// if table path is created before hand
val updatedCatalogTable =
CarbonSource.updateCatalogTableWithCarbonSchema(catalogTable, session, None, false)
val sparkCommand = CreateDataSourceTableCommand(updatedCatalogTable, ignoreIfExists)
sparkCommand.run(session)
// save the table info (carbondata's schema) in table path
val tableName = catalogTable.identifier.table
val dbName = CarbonEnv.getDatabaseName(catalogTable.identifier.database)(session)
val tablePath = CarbonEnv.getTablePath(Some(dbName), tableName)(session)
val tableInfo = CarbonSource.createTableInfo(
dbName, tableName, tablePath, catalogTable.schema, catalogTable.properties, None, session)
val metastore = CarbonEnv.getInstance(session).carbonMetaStore
metastore.saveToDisk(tableInfo, tablePath)
Seq.empty
}
override protected def opName: String = "CREATE TABLE USING CARBONDATA"
}