blob: 74da11ac1fdfafce46d208e7b9865f4b3bef98a5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command.table
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.execution.command.AtomicRunnableCommand
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.events._
case class CarbonDropTableCommand(
ifExistsSet: Boolean,
databaseNameOp: Option[String],
tableName: String,
dropChildTable: Boolean = false)
extends AtomicRunnableCommand {
var carbonTable: CarbonTable = _
var childDropCommands : Seq[CarbonDropTableCommand] = Seq.empty
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK)
val identifier = CarbonEnv.getIdentifier(databaseNameOp, tableName)(sparkSession)
val dbName = identifier.getCarbonTableIdentifier.getDatabaseName
val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer()
try {
locksToBeAcquired foreach {
lock => carbonLocks += CarbonLockUtil.getLockObject(identifier, lock)
}
carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) {
throw new ConcurrentOperationException(carbonTable, "loading", "drop table")
}
LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
if (carbonTable.isStreamingTable) {
// streaming table should acquire streaming.lock
carbonLocks += CarbonLockUtil.getLockObject(identifier, LockUsage.STREAMING_LOCK)
}
val relationIdentifiers = carbonTable.getTableInfo.getParentRelationIdentifiers
if (relationIdentifiers != null && !relationIdentifiers.isEmpty) {
if (!dropChildTable) {
if (!ifExistsSet) {
throwMetadataException(dbName, tableName,
"Child table which is associated with datamap cannot be dropped, " +
"use DROP DATAMAP command to drop")
} else {
return Seq.empty
}
}
}
val operationContext = new OperationContext
val dropTablePreEvent: DropTablePreEvent =
DropTablePreEvent(
carbonTable,
ifExistsSet,
sparkSession)
OperationListenerBus.getInstance.fireEvent(dropTablePreEvent, operationContext)
CarbonEnv.getInstance(sparkSession).carbonMetastore.dropTable(identifier)(sparkSession)
if (carbonTable.hasDataMapSchema) {
// drop all child tables
val childSchemas = carbonTable.getTableInfo.getDataMapSchemaList
childDropCommands = childSchemas.asScala
.filter(_.getRelationIdentifier != null)
.map { childSchema =>
val childTable =
CarbonEnv.getCarbonTable(
TableIdentifier(childSchema.getRelationIdentifier.getTableName,
Some(childSchema.getRelationIdentifier.getDatabaseName)))(sparkSession)
val dropCommand = CarbonDropTableCommand(
ifExistsSet = true,
Some(childSchema.getRelationIdentifier.getDatabaseName),
childSchema.getRelationIdentifier.getTableName,
dropChildTable = true
)
dropCommand.carbonTable = childTable
dropCommand
}
childDropCommands.foreach(_.processMetadata(sparkSession))
}
// fires the event after dropping main table
val dropTablePostEvent: DropTablePostEvent =
DropTablePostEvent(
carbonTable,
ifExistsSet,
sparkSession)
OperationListenerBus.getInstance.fireEvent(dropTablePostEvent, operationContext)
LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]")
} catch {
case ex: NoSuchTableException =>
if (!ifExistsSet) {
throw ex
}
case ex: ConcurrentOperationException =>
throw ex
case ex: Exception =>
val msg = s"Dropping table $dbName.$tableName failed: ${ex.getMessage}"
LOGGER.error(ex, msg)
throwMetadataException(dbName, tableName, msg)
} finally {
if (carbonLocks.nonEmpty) {
val unlocked = carbonLocks.forall(_.unlock())
if (unlocked) {
LOGGER.info("Table MetaData Unlocked Successfully")
}
}
}
Seq.empty
}
override def processData(sparkSession: SparkSession): Seq[Row] = {
// clear driver side index and dictionary cache
if (carbonTable != null) {
ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable)
// delete the table folder
val tablePath = carbonTable.getTablePath
val fileType = FileFactory.getFileType(tablePath)
// delete table data only if it is not external table
if (FileFactory.isFileExist(tablePath, fileType) &&
!(carbonTable.isExternalTable || carbonTable.isFileLevelFormat)) {
val file = FileFactory.getCarbonFile(tablePath, fileType)
CarbonUtil.deleteFoldersAndFilesSilent(file)
}
if (carbonTable.hasDataMapSchema && childDropCommands.nonEmpty) {
// drop all child tables
childDropCommands.foreach(_.processData(sparkSession))
}
}
Seq.empty
}
}