blob: 713561b35e29d1065ca48780b12b378e5a75431c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command.table
import scala.collection.JavaConverters._
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, _}
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
import org.apache.spark.sql.execution.command.MetadataCommand
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.compression.CompressorFactory
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.exception.InvalidConfigurationException
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.partition.PartitionType
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
import org.apache.carbondata.core.util.{CarbonUtil, ThreadLocalSessionInfo}
import org.apache.carbondata.events.{CreateTablePostExecutionEvent, CreateTablePreExecutionEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.spark.util.CarbonSparkUtil
case class CarbonCreateTableCommand(
tableInfo: TableInfo,
ifNotExistsSet: Boolean = false,
tableLocation: Option[String] = None,
isExternal : Boolean = false,
createDSTable: Boolean = true,
isVisible: Boolean = true)
extends MetadataCommand {
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
val tableName = tableInfo.getFactTable.getTableName
var databaseOpt : Option[String] = None
ThreadLocalSessionInfo
.setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
if (tableInfo.getDatabaseName != null) {
databaseOpt = Some(tableInfo.getDatabaseName)
}
val dbName = CarbonEnv.getDatabaseName(databaseOpt)(sparkSession)
setAuditTable(dbName, tableName)
setAuditInfo(tableInfo.getFactTable.getTableProperties.asScala.toMap
++ Map("external" -> isExternal.toString))
// set dbName and tableUnique Name in the table info
tableInfo.setDatabaseName(dbName)
tableInfo.setTableUniqueName(CarbonTable.buildUniqueName(dbName, tableName))
val isTransactionalTable = tableInfo.isTransactionalTable
if (sparkSession.sessionState.catalog.listTables(dbName)
.exists(_.table.equalsIgnoreCase(tableName))) {
if (!ifNotExistsSet) {
throw new TableAlreadyExistsException(dbName, tableName)
}
} else {
val path = tableLocation.getOrElse(
CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession))
val tablePath = if (FileFactory.getCarbonFile(path).exists() && !isExternal &&
isTransactionalTable && tableLocation.isEmpty) {
path + "_" + tableInfo.getFactTable.getTableId
} else {
path
}
val streaming = tableInfo.getFactTable.getTableProperties.get("streaming")
if (path.startsWith("s3") && streaming != null && streaming != null &&
streaming.equalsIgnoreCase("true")) {
throw new UnsupportedOperationException("streaming is not supported with s3 store")
}
tableInfo.setTablePath(tablePath)
val tableIdentifier = AbsoluteTableIdentifier
.from(tablePath, dbName, tableName, tableInfo.getFactTable.getTableId)
// Add validation for sort scope when create table
val sortScope = tableInfo.getFactTable.getTableProperties.asScala
.getOrElse("sort_scope", CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
if (!CarbonUtil.isValidSortOption(sortScope)) {
throw new InvalidConfigurationException(
s"Passing invalid SORT_SCOPE '$sortScope', valid SORT_SCOPE are 'NO_SORT'," +
s" 'BATCH_SORT', 'LOCAL_SORT' and 'GLOBAL_SORT' ")
}
if (tableInfo.getFactTable.getListOfColumns.size <= 0) {
throwMetadataException(dbName, tableName, "Table should have at least one column.")
}
// Add validatation for column compressor when create table
val columnCompressor = tableInfo.getFactTable.getTableProperties.get(
CarbonCommonConstants.COMPRESSOR)
try {
if (null != columnCompressor) {
CompressorFactory.getInstance().getCompressor(columnCompressor)
}
} catch {
case ex : UnsupportedOperationException =>
throw new InvalidConfigurationException(ex.getMessage)
}
val operationContext = new OperationContext
val createTablePreExecutionEvent: CreateTablePreExecutionEvent =
CreateTablePreExecutionEvent(sparkSession, tableIdentifier, Some(tableInfo))
OperationListenerBus.getInstance.fireEvent(createTablePreExecutionEvent, operationContext)
val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tableIdentifier)
if (createDSTable) {
try {
val tablePath = tableIdentifier.getTablePath
val carbonRelation = CarbonSparkUtil.createCarbonRelation(tableInfo, tablePath)
val rawSchema = CarbonSparkUtil.getRawSchema(carbonRelation)
sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null)
val partitionInfo = tableInfo.getFactTable.getPartitionInfo
val partitionString =
if (partitionInfo != null &&
partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) {
// Restrict dictionary encoding on partition columns.
// TODO Need to decide whether it is required
val dictionaryOnPartitionColumn =
partitionInfo.getColumnSchemaList.asScala.exists{p =>
p.hasEncoding(Encoding.DICTIONARY) && !p.hasEncoding(Encoding.DIRECT_DICTIONARY)
}
if (dictionaryOnPartitionColumn) {
throwMetadataException(
dbName,
tableName,
s"Dictionary include cannot be applied on partition columns")
}
s" PARTITIONED BY (${partitionInfo.getColumnSchemaList.asScala.map(
_.getColumnName).mkString(",")})"
} else {
""
}
// isVisible property is added to hive table properties to differentiate between main
// table and datamaps(like preaggregate). It is false only for datamaps. This is added
// to improve the show tables performance when filtering the datamaps from main tables
// synchronized to prevent concurrently creation of table with same name
CarbonCreateTableCommand.synchronized {
sparkSession.sql(
s"""CREATE TABLE $dbName.$tableName
|(${ rawSchema })
|USING org.apache.spark.sql.CarbonSource
|OPTIONS (
| tableName "$tableName",
| dbName "$dbName",
| tablePath "$tablePath",
| path "$tablePath",
| isExternal "$isExternal",
| isTransactional "$isTransactionalTable",
| isVisible "$isVisible"
| $carbonSchemaString)
| $partitionString
""".stripMargin)
}
} catch {
case e: AnalysisException => throw e
case e: Exception =>
// call the drop table to delete the created table.
try {
CarbonEnv.getInstance(sparkSession).carbonMetastore
.dropTable(tableIdentifier)(sparkSession)
} catch {
case _: Exception => // No operation
}
val msg = s"Create table'$tableName' in database '$dbName' failed"
throwMetadataException(dbName, tableName, msg.concat(", ").concat(e.getMessage))
}
}
val createTablePostExecutionEvent: CreateTablePostExecutionEvent =
CreateTablePostExecutionEvent(sparkSession, tableIdentifier)
OperationListenerBus.getInstance.fireEvent(createTablePostExecutionEvent, operationContext)
}
Seq.empty
}
override protected def opName: String = "CREATE TABLE"
}