| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.spark.sql.execution.command.schema |
| |
| import scala.collection.JavaConverters._ |
| |
| import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} |
| import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableColumnSchemaGenerator, MetadataCommand} |
| import org.apache.spark.sql.hive.CarbonSessionCatalogUtil |
| import org.apache.spark.util.{AlterTableUtil, SparkUtil} |
| |
| import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException |
| import org.apache.carbondata.common.logging.LogServiceFactory |
| import org.apache.carbondata.core.features.TableOperation |
| import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} |
| import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl |
| import org.apache.carbondata.core.metadata.schema.table.CarbonTable |
| import org.apache.carbondata.events.{AlterTableAddColumnPostEvent, AlterTableAddColumnPreEvent, OperationContext, OperationListenerBus} |
| import org.apache.carbondata.format.TableInfo |
| |
| private[sql] case class CarbonAlterTableAddColumnCommand( |
| alterTableAddColumnsModel: AlterTableAddColumnsModel) |
| extends MetadataCommand { |
| |
| override def processMetadata(sparkSession: SparkSession): Seq[Row] = { |
| val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) |
| val tableName = alterTableAddColumnsModel.tableName |
| val dbName = alterTableAddColumnsModel.databaseName |
| .getOrElse(sparkSession.catalog.currentDatabase) |
| setAuditTable(dbName, tableName) |
| val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK) |
| var locks = List.empty[ICarbonLock] |
| var timeStamp = 0L |
| var newCols = Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]() |
| var carbonTable: CarbonTable = null |
| try { |
| locks = AlterTableUtil |
| .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession) |
| // Consider a concurrent scenario where 2 alter operations are executed in parallel. 1st |
| // operation is success and updates the schema file. 2nd operation will get the lock after |
| // completion of 1st operation but as look up relation is called before it will have the |
| // older carbon table and this can lead to inconsistent state in the system. Therefor look |
| // up relation should be called after acquiring the lock |
| val metastore = CarbonEnv.getInstance(sparkSession).carbonMetaStore |
| carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession) |
| if (!carbonTable.canAllow(carbonTable, TableOperation.ALTER_ADD_COLUMN)) { |
| throw new MalformedCarbonCommandException( |
| "alter table add column is not supported for index indexSchema") |
| } |
| val operationContext = new OperationContext |
| val alterTableAddColumnListener = AlterTableAddColumnPreEvent(sparkSession, carbonTable, |
| alterTableAddColumnsModel) |
| OperationListenerBus.getInstance().fireEvent(alterTableAddColumnListener, operationContext) |
| // get the latest carbon table and check for column existence |
| // read the latest schema file |
| val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable) |
| val schemaConverter = new ThriftWrapperSchemaConverterImpl() |
| val wrapperTableInfo = schemaConverter |
| .fromExternalToWrapperTableInfo(thriftTableInfo, |
| dbName, |
| tableName, |
| carbonTable.getTablePath) |
| newCols = new AlterTableColumnSchemaGenerator(alterTableAddColumnsModel, |
| dbName, |
| wrapperTableInfo, |
| carbonTable.getAbsoluteTableIdentifier, |
| sparkSession.sparkContext).process |
| setAuditInfo(Map( |
| "newColumn" -> newCols.map(x => s"${x.getColumnName}:${x.getDataType}").mkString(","))) |
| timeStamp = System.currentTimeMillis |
| val schemaEvolutionEntry = new org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry |
| schemaEvolutionEntry.setTimeStamp(timeStamp) |
| schemaEvolutionEntry.setAdded(newCols.toList.asJava) |
| val thriftTable = schemaConverter |
| .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName) |
| // carbon columns based on schema order |
| val carbonColumns = carbonTable.getCreateOrderColumn().asScala |
| .collect { case carbonColumn if !carbonColumn.isInvisible => carbonColumn.getColumnSchema } |
| // sort the new columns based on schema order |
| val sortedColsBasedActualSchemaOrder = newCols.sortBy(a => a.getSchemaOrdinal) |
| val (tableIdentifier, schemaParts) = AlterTableUtil.updateSchemaInfo( |
| carbonTable, |
| schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry), |
| thriftTable)(sparkSession) |
| // when we call |
| // alterExternalCatalogForTableWithUpdatedSchema to update the new schema to external catalog |
| // in case of add column, spark gets the catalog table and then it itself adds the partition |
| // columns if the table is partition table for all the new data schema sent by carbon, |
| // so there will be duplicate partition columns, so send the columns without partition columns |
| val cols = if (carbonTable.isHivePartitionTable) { |
| val partitionColumns = carbonTable.getPartitionInfo.getColumnSchemaList.asScala |
| val carbonColumnsWithoutPartition = carbonColumns.filterNot(col => partitionColumns.contains |
| (col)) |
| Some(carbonColumnsWithoutPartition ++ sortedColsBasedActualSchemaOrder) |
| } else { |
| Some(carbonColumns ++ sortedColsBasedActualSchemaOrder) |
| } |
| CarbonSessionCatalogUtil.alterAddColumns(tableIdentifier, schemaParts, cols, sparkSession) |
| sparkSession.catalog.refreshTable(tableIdentifier.quotedString) |
| val alterTablePostExecutionEvent: AlterTableAddColumnPostEvent = |
| AlterTableAddColumnPostEvent(sparkSession, carbonTable, alterTableAddColumnsModel) |
| OperationListenerBus.getInstance.fireEvent(alterTablePostExecutionEvent, operationContext) |
| LOGGER.info(s"Alter table for add columns is successful for table $dbName.$tableName") |
| } catch { |
| case e: Exception => |
| if (newCols.nonEmpty) { |
| LOGGER.info("Cleaning up the dictionary files as alter table add operation failed") |
| AlterTableUtil.revertAddColumnChanges(dbName, tableName, timeStamp)(sparkSession) |
| } |
| throwMetadataException(dbName, tableName, |
| s"Alter table add operation failed: ${e.getMessage}") |
| } finally { |
| // release lock after command execution completion |
| AlterTableUtil.releaseLocks(locks) |
| } |
| Seq.empty |
| } |
| |
| override protected def opName: String = "ALTER TABLE ADD COLUMN" |
| } |