| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.spark.sql.execution.command.table |
| |
| import scala.collection.JavaConverters._ |
| |
| import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, _} |
| import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException |
| import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY |
| import org.apache.spark.sql.execution.command.MetadataCommand |
| |
| import org.apache.carbondata.common.logging.LogServiceFactory |
| import org.apache.carbondata.core.constants.CarbonCommonConstants |
| import org.apache.carbondata.core.datastore.compression.CompressorFactory |
| import org.apache.carbondata.core.datastore.impl.FileFactory |
| import org.apache.carbondata.core.exception.InvalidConfigurationException |
| import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier |
| import org.apache.carbondata.core.metadata.encoder.Encoding |
| import org.apache.carbondata.core.metadata.schema.partition.PartitionType |
| import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} |
| import org.apache.carbondata.core.util.{CarbonUtil, ThreadLocalSessionInfo} |
| import org.apache.carbondata.events.{CreateTablePostExecutionEvent, CreateTablePreExecutionEvent, OperationContext, OperationListenerBus} |
| import org.apache.carbondata.spark.util.CarbonSparkUtil |
| |
| case class CarbonCreateTableCommand( |
| tableInfo: TableInfo, |
| ifNotExistsSet: Boolean = false, |
| tableLocation: Option[String] = None, |
| isExternal : Boolean = false, |
| createDSTable: Boolean = true, |
| isVisible: Boolean = true) |
| extends MetadataCommand { |
| |
| override def processMetadata(sparkSession: SparkSession): Seq[Row] = { |
| val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) |
| val tableName = tableInfo.getFactTable.getTableName |
| var databaseOpt : Option[String] = None |
| ThreadLocalSessionInfo |
| .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf()) |
| if (tableInfo.getDatabaseName != null) { |
| databaseOpt = Some(tableInfo.getDatabaseName) |
| } |
| val dbName = CarbonEnv.getDatabaseName(databaseOpt)(sparkSession) |
| setAuditTable(dbName, tableName) |
| setAuditInfo(tableInfo.getFactTable.getTableProperties.asScala.toMap |
| ++ Map("external" -> isExternal.toString)) |
| // set dbName and tableUnique Name in the table info |
| tableInfo.setDatabaseName(dbName) |
| tableInfo.setTableUniqueName(CarbonTable.buildUniqueName(dbName, tableName)) |
| val isTransactionalTable = tableInfo.isTransactionalTable |
| if (sparkSession.sessionState.catalog.listTables(dbName) |
| .exists(_.table.equalsIgnoreCase(tableName))) { |
| if (!ifNotExistsSet) { |
| throw new TableAlreadyExistsException(dbName, tableName) |
| } |
| } else { |
| val path = tableLocation.getOrElse( |
| CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession)) |
| val tablePath = if (FileFactory.getCarbonFile(path).exists() && !isExternal && |
| isTransactionalTable && tableLocation.isEmpty) { |
| path + "_" + tableInfo.getFactTable.getTableId |
| } else { |
| path |
| } |
| val streaming = tableInfo.getFactTable.getTableProperties.get("streaming") |
| if (path.startsWith("s3") && streaming != null && streaming != null && |
| streaming.equalsIgnoreCase("true")) { |
| throw new UnsupportedOperationException("streaming is not supported with s3 store") |
| } |
| tableInfo.setTablePath(tablePath) |
| val tableIdentifier = AbsoluteTableIdentifier |
| .from(tablePath, dbName, tableName, tableInfo.getFactTable.getTableId) |
| |
| // Add validation for sort scope when create table |
| val sortScope = tableInfo.getFactTable.getTableProperties.asScala |
| .getOrElse("sort_scope", CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT) |
| if (!CarbonUtil.isValidSortOption(sortScope)) { |
| throw new InvalidConfigurationException( |
| s"Passing invalid SORT_SCOPE '$sortScope', valid SORT_SCOPE are 'NO_SORT'," + |
| s" 'BATCH_SORT', 'LOCAL_SORT' and 'GLOBAL_SORT' ") |
| } |
| |
| if (tableInfo.getFactTable.getListOfColumns.size <= 0) { |
| throwMetadataException(dbName, tableName, "Table should have at least one column.") |
| } |
| |
| // Add validatation for column compressor when create table |
| val columnCompressor = tableInfo.getFactTable.getTableProperties.get( |
| CarbonCommonConstants.COMPRESSOR) |
| try { |
| if (null != columnCompressor) { |
| CompressorFactory.getInstance().getCompressor(columnCompressor) |
| } |
| } catch { |
| case ex : UnsupportedOperationException => |
| throw new InvalidConfigurationException(ex.getMessage) |
| } |
| |
| val operationContext = new OperationContext |
| val createTablePreExecutionEvent: CreateTablePreExecutionEvent = |
| CreateTablePreExecutionEvent(sparkSession, tableIdentifier, Some(tableInfo)) |
| OperationListenerBus.getInstance.fireEvent(createTablePreExecutionEvent, operationContext) |
| val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore |
| val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tableIdentifier) |
| if (createDSTable) { |
| try { |
| val tablePath = tableIdentifier.getTablePath |
| val carbonRelation = CarbonSparkUtil.createCarbonRelation(tableInfo, tablePath) |
| val rawSchema = CarbonSparkUtil.getRawSchema(carbonRelation) |
| sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null) |
| val partitionInfo = tableInfo.getFactTable.getPartitionInfo |
| val partitionString = |
| if (partitionInfo != null && |
| partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) { |
| // Restrict dictionary encoding on partition columns. |
| // TODO Need to decide whether it is required |
| val dictionaryOnPartitionColumn = |
| partitionInfo.getColumnSchemaList.asScala.exists{p => |
| p.hasEncoding(Encoding.DICTIONARY) && !p.hasEncoding(Encoding.DIRECT_DICTIONARY) |
| } |
| if (dictionaryOnPartitionColumn) { |
| throwMetadataException( |
| dbName, |
| tableName, |
| s"Dictionary include cannot be applied on partition columns") |
| } |
| s" PARTITIONED BY (${partitionInfo.getColumnSchemaList.asScala.map( |
| _.getColumnName).mkString(",")})" |
| } else { |
| "" |
| } |
| // isVisible property is added to hive table properties to differentiate between main |
| // table and datamaps(like preaggregate). It is false only for datamaps. This is added |
| // to improve the show tables performance when filtering the datamaps from main tables |
| // synchronized to prevent concurrently creation of table with same name |
| CarbonCreateTableCommand.synchronized { |
| sparkSession.sql( |
| s"""CREATE TABLE $dbName.$tableName |
| |(${ rawSchema }) |
| |USING org.apache.spark.sql.CarbonSource |
| |OPTIONS ( |
| | tableName "$tableName", |
| | dbName "$dbName", |
| | tablePath "$tablePath", |
| | path "$tablePath", |
| | isExternal "$isExternal", |
| | isTransactional "$isTransactionalTable", |
| | isVisible "$isVisible" |
| | $carbonSchemaString) |
| | $partitionString |
| """.stripMargin) |
| } |
| } catch { |
| case e: AnalysisException => throw e |
| case e: Exception => |
| // call the drop table to delete the created table. |
| try { |
| CarbonEnv.getInstance(sparkSession).carbonMetastore |
| .dropTable(tableIdentifier)(sparkSession) |
| } catch { |
| case _: Exception => // No operation |
| } |
| val msg = s"Create table'$tableName' in database '$dbName' failed" |
| throwMetadataException(dbName, tableName, msg.concat(", ").concat(e.getMessage)) |
| } |
| } |
| val createTablePostExecutionEvent: CreateTablePostExecutionEvent = |
| CreateTablePostExecutionEvent(sparkSession, tableIdentifier) |
| OperationListenerBus.getInstance.fireEvent(createTablePostExecutionEvent, operationContext) |
| } |
| Seq.empty |
| } |
| |
| override protected def opName: String = "CREATE TABLE" |
| } |