blob: ded87b9e74a217183ebf4a8732770085c85280ed [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command.datamap
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.execution.command.AtomicRunnableCommand
import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
import org.apache.carbondata.common.exceptions.sql.NoSuchDataMapException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.datamap.{DataMapProvider, DataMapStoreManager}
import org.apache.carbondata.core.datamap.status.DataMapStatusManager
import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.datamap.{DataMapManager, IndexDataMapProvider}
import org.apache.carbondata.events._
/**
* Drops the datamap and any related tables associated with the datamap
* @param dataMapName
* @param ifExistsSet
* @param table
*/
case class CarbonDropDataMapCommand(
dataMapName: String,
ifExistsSet: Boolean,
table: Option[TableIdentifier],
forceDrop: Boolean = false)
extends AtomicRunnableCommand {
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
private var dataMapProvider: DataMapProvider = _
var mainTable: CarbonTable = _
var dataMapSchema: DataMapSchema = _
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
setAuditInfo(Map("dmName" -> dataMapName))
if (table.isDefined) {
val databaseNameOp = table.get.database
val tableName = table.get.table
val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
val locksToBeAcquired = List(LockUsage.METADATA_LOCK)
val carbonEnv = CarbonEnv.getInstance(sparkSession)
val catalog = carbonEnv.carbonMetaStore
val tablePath = CarbonEnv.getTablePath(databaseNameOp, tableName)(sparkSession)
if (mainTable == null) {
mainTable = try {
CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
} catch {
case ex: NoSuchTableException =>
throwMetadataException(dbName, tableName,
s"Dropping datamap $dataMapName failed: ${ ex.getMessage }")
null
}
}
setAuditTable(mainTable)
val tableIdentifier =
AbsoluteTableIdentifier
.from(tablePath,
dbName.toLowerCase,
tableName.toLowerCase,
mainTable.getCarbonTableIdentifier.getTableId)
// forceDrop will be true only when parent table schema updation has failed.
// This method will forcefully drop child table instance from metastore.
if (forceDrop) {
val childTableName = tableName + "_" + dataMapName
LOGGER.info(s"Trying to force drop $childTableName from metastore")
val childCarbonTable: Option[CarbonTable] = try {
Some(CarbonEnv.getCarbonTable(databaseNameOp, childTableName)(sparkSession))
} catch {
case _: Exception =>
LOGGER.warn(s"Child table $childTableName not found in metastore")
None
}
if (childCarbonTable.isDefined) {
val commandToRun = CarbonDropTableCommand(
ifExistsSet = true,
Some(childCarbonTable.get.getDatabaseName),
childCarbonTable.get.getTableName,
dropChildTable = true)
commandToRun.run(sparkSession)
}
dropDataMapFromSystemFolder(sparkSession)
return Seq.empty
}
val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer()
try {
locksToBeAcquired foreach {
lock => carbonLocks += CarbonLockUtil.getLockObject(tableIdentifier, lock)
}
// drop index,mv datamap on the main table.
if (mainTable != null &&
DataMapStoreManager.getInstance().getAllDataMap(mainTable).size() > 0) {
val isDMSchemaExists = DataMapStoreManager.getInstance().getAllDataMap(mainTable).asScala.
exists(_.getDataMapSchema.getDataMapName.equalsIgnoreCase(dataMapName))
if (isDMSchemaExists) {
dropDataMapFromSystemFolder(sparkSession)
return Seq.empty
}
} else if (mainTable != null) {
// If table is defined and datamap is MV datamap, then drop the datamap
val dmSchema = DataMapStoreManager.getInstance().getAllDataMapSchemas.asScala
.filter(dataMapSchema => dataMapSchema.getDataMapName.equalsIgnoreCase(dataMapName))
if (dmSchema.nonEmpty && (!dmSchema.head.isIndexDataMap &&
null != dmSchema.head.getRelationIdentifier)) {
dropDataMapFromSystemFolder(sparkSession)
return Seq.empty
}
}
if (!ifExistsSet) {
throw new NoSuchDataMapException(dataMapName)
}
} catch {
case e: NoSuchDataMapException =>
throw e
case ex: Exception =>
LOGGER.error(s"Dropping datamap $dataMapName failed", ex)
throwMetadataException(dbName, tableName,
s"Dropping datamap $dataMapName failed: ${ ex.getMessage }")
}
finally {
if (carbonLocks.nonEmpty) {
val unlocked = carbonLocks.forall(_.unlock())
if (unlocked) {
LOGGER.info("Table MetaData Unlocked Successfully")
}
}
}
} else {
try {
dropDataMapFromSystemFolder(sparkSession)
} catch {
case e: Exception =>
if (!ifExistsSet) {
throw e
}
}
}
Seq.empty
}
private def dropDataMapFromSystemFolder(sparkSession: SparkSession): Unit = {
try {
if (dataMapSchema == null) {
dataMapSchema = DataMapStoreManager.getInstance().getDataMapSchema(dataMapName)
}
if (dataMapSchema != null) {
dataMapProvider =
DataMapManager.get.getDataMapProvider(mainTable, dataMapSchema, sparkSession)
val operationContext: OperationContext = new OperationContext()
val systemFolderLocation: String = CarbonProperties.getInstance().getSystemFolderLocation
val updateDataMapPreExecutionEvent: UpdateDataMapPreExecutionEvent =
UpdateDataMapPreExecutionEvent(sparkSession, systemFolderLocation, null)
OperationListenerBus.getInstance().fireEvent(updateDataMapPreExecutionEvent,
operationContext)
DataMapStatusManager.dropDataMap(dataMapSchema.getDataMapName)
val updateDataMapPostExecutionEvent: UpdateDataMapPostExecutionEvent =
UpdateDataMapPostExecutionEvent(sparkSession, systemFolderLocation, null)
OperationListenerBus.getInstance().fireEvent(updateDataMapPostExecutionEvent,
operationContext)
// if it is indexDataMap provider like lucene, then call cleanData, which will launch a job
// to clear datamap from memory(clears from segmentMap and cache), This is called before
// deleting the datamap schemas from _System folder
if (dataMapProvider.isInstanceOf[IndexDataMapProvider]) {
dataMapProvider.cleanData()
}
dataMapProvider.cleanMeta()
}
} catch {
case e: Exception =>
if (!ifExistsSet) {
throw e
}
}
}
override def processData(sparkSession: SparkSession): Seq[Row] = {
// delete the table folder
if (dataMapProvider != null) {
dataMapProvider.cleanData()
}
Seq.empty
}
override protected def opName: String = "DROP DATAMAP"
}