blob: d87cf0ae9de907d4a35de0a6617eb3f42fb83c15 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command.index
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.hive.{CarbonHiveIndexMetadataUtil, CarbonMetaStore, CarbonRelation}
import org.apache.spark.sql.index.CarbonIndexUtil
import org.apache.spark.sql.secondaryindex.hive.CarbonInternalMetastore
import org.apache.carbondata.common.exceptions.sql.{MalformedIndexCommandException, NoSuchIndexException}
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.index.IndexStoreManager
import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.index.IndexType
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
/**
* Command to drop index on a table
*/
private[sql] case class DropIndexCommand(
ifExistsSet: Boolean,
dbNameOp: Option[String],
parentTableName: String = null,
indexName: String,
needLock: Boolean = true)
extends RunnableCommand {
def run(sparkSession: SparkSession): Seq[Row] = {
var isSecondaryIndex = false
val parentTable = try {
CarbonEnv.getCarbonTable(dbNameOp, parentTableName)(sparkSession)
} catch {
case e: NoSuchTableException =>
if (!ifExistsSet) throw e
else return Seq.empty
}
if (!parentTable.getIndexTableNames().contains(indexName)) {
if (!ifExistsSet) {
throw new MalformedIndexCommandException("Index with name " + indexName + " does not exist")
}
}
if (parentTable.getIndexTableNames(IndexType.SI.getIndexProviderName)
.contains(indexName)) {
isSecondaryIndex = true
} else {
// Clear the index of a table from memory and disk
IndexStoreManager.getInstance().deleteIndex(parentTable, indexName)
}
dropIndex(isSecondaryIndex, sparkSession)
Seq.empty
}
def dropIndex(isSecondaryIndex: Boolean, sparkSession: SparkSession): Seq[Row] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
val dbName = CarbonEnv.getDatabaseName(dbNameOp)(sparkSession)
var tableIdentifierForAcquiringLock: AbsoluteTableIdentifier = null
val locksToBeAcquired = if (needLock) {
List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK)
} else {
List.empty
}
val catalog = CarbonEnv.getInstance(sparkSession).carbonMetaStore
// flag to check if folders and files can be successfully deleted
var isValidDeletion = false
val carbonLocks: scala.collection.mutable.ArrayBuffer[ICarbonLock] = ArrayBuffer[ICarbonLock]()
var carbonTable: Option[CarbonTable] = None
try {
if (isSecondaryIndex) {
LOGGER.info(s"dropping index table $indexName")
carbonTable =
try {
Some(CarbonEnv.getCarbonTable(Some(dbName), indexName)(sparkSession))
} catch {
case ex: NoSuchTableException =>
var isIndexTableExists = false
// even if the index table does not exists
// check if the parent table exists and remove the index table reference
// in case if the parent table hold the deleted index table reference
try {
val parentCarbonTable = getParentTableFromCatalog(sparkSession, dbName, catalog)
val indexTableList = CarbonIndexUtil.getSecondaryIndexes(parentCarbonTable.get)
if (!indexTableList.isEmpty) {
removeIndexInfoFromParentTable(sparkSession,
dbName,
locksToBeAcquired,
carbonLocks,
parentCarbonTable.get)
isIndexTableExists = true
}
} catch {
case ex: NoSuchTableException =>
if (!ifExistsSet) {
throw ex
}
case e: Exception =>
throw e
}
if (!ifExistsSet && !isIndexTableExists) {
throw new NoSuchIndexException(indexName)
}
None
}
if (carbonTable.isDefined) {
CarbonInternalMetastore.refreshIndexInfo(dbName, indexName, carbonTable.get)(sparkSession)
val isIndexTableBool = carbonTable.get.isIndexTable
val parentTableName = CarbonIndexUtil.getParentTableName(carbonTable.get)
var parentTable = CarbonEnv.getCarbonTable(Some(dbName), parentTableName)(sparkSession)
if (!isIndexTableBool) {
sys.error(s"Drop Index command is not permitted on table [$dbName.$indexName]")
} else if (isIndexTableBool &&
!parentTableName.equalsIgnoreCase(parentTableName)) {
throw new NoSuchIndexException(indexName)
} else {
if (isIndexTableBool) {
tableIdentifierForAcquiringLock = parentTable.getAbsoluteTableIdentifier
} else {
tableIdentifierForAcquiringLock = AbsoluteTableIdentifier
.from(carbonTable.get.getTablePath, dbName.toLowerCase, indexName.toLowerCase)
}
locksToBeAcquired foreach {
lock => {
carbonLocks += CarbonLockUtil.getLockObject(tableIdentifierForAcquiringLock, lock)
}
}
isValidDeletion = true
}
val tableIdentifier = TableIdentifier(indexName, Some(dbName))
// drop carbon table
val tablePath = carbonTable.get.getTablePath
CarbonInternalMetastore.dropIndexTable(tableIdentifier, carbonTable.get,
tablePath,
parentTable,
removeEntryFromParentTable = true)(sparkSession)
// take the refreshed table object after dropping and updating the index table
parentTable = getRefreshedParentTable(sparkSession, dbName)
val indexTables = CarbonIndexUtil.getSecondaryIndexes(parentTable)
// if all the indexes are dropped then the main table holds no index tables,
// so change the "indexTableExists" property to false, iff all the indexes are deleted
if (null == indexTables || indexTables.isEmpty) {
val tableIdentifier = TableIdentifier(parentTable.getTableName,
Some(parentTable.getDatabaseName))
// modify the tableProperties of mainTable by adding "indexTableExists" property
CarbonIndexUtil
.addOrModifyTableProperty(parentTable,
Map("indexTableExists" -> "false"), needLock = false)(sparkSession)
CarbonHiveIndexMetadataUtil.refreshTable(dbName, parentTableName, sparkSession)
}
}
} else {
// remove cg or fg index
var parentCarbonTable = getParentTableFromCatalog(sparkSession, dbName, catalog).get
removeIndexInfoFromParentTable(sparkSession,
dbName,
locksToBeAcquired,
carbonLocks,
parentCarbonTable)
parentCarbonTable = getRefreshedParentTable(sparkSession, dbName)
val indexMetadata = parentCarbonTable.getIndexMetadata
if (null != indexMetadata && null != indexMetadata.getIndexesMap) {
val hasCgFgIndexes =
!(indexMetadata.getIndexesMap.size() == 1 &&
indexMetadata.getIndexesMap.containsKey(IndexType.SI.getIndexProviderName))
if (hasCgFgIndexes) {
CarbonIndexUtil
.addOrModifyTableProperty(parentCarbonTable,
Map("indexExists" -> "false"), needLock = false)(sparkSession)
CarbonHiveIndexMetadataUtil.refreshTable(dbName, parentTableName, sparkSession)
}
}
}
} finally {
if (carbonLocks.nonEmpty) {
val unlocked = carbonLocks.forall(_.unlock())
if (unlocked) {
logInfo("Table MetaData Unlocked Successfully")
if (isValidDeletion) {
if (carbonTable != null && carbonTable.isDefined) {
CarbonInternalMetastore.deleteTableDirectory(dbName, indexName, sparkSession)
}
}
} else {
logError("Table metadata unlocking is unsuccessful, index table may be in stale state")
}
}
// in case if the the physical folders still exists for the index table
// but the carbon and hive info for the index table is removed,
// DROP INDEX IF EXISTS should clean up those physical directories
if (ifExistsSet && carbonTable.isEmpty) {
CarbonInternalMetastore.deleteTableDirectory(dbName, indexName, sparkSession)
}
}
Seq.empty
}
/**
* get the refreshed table object after dropping the index
*/
private def getRefreshedParentTable(sparkSession: SparkSession,
dbName: String) = {
CarbonEnv
.getInstance(sparkSession)
.carbonMetaStore
.lookupRelation(Some(dbName), parentTableName)(sparkSession)
.asInstanceOf[CarbonRelation]
.carbonTable
}
private def getParentTableFromCatalog(sparkSession: SparkSession,
dbName: String,
catalog: CarbonMetaStore): Option[CarbonTable] = {
Some(catalog.lookupRelation(Some(dbName), parentTableName)(sparkSession)
.asInstanceOf[CarbonRelation].carbonTable)
}
/**
* AcquireLock and remove indexInfo from parent tabe
*/
private def removeIndexInfoFromParentTable(sparkSession: SparkSession,
dbName: String,
locksToBeAcquired: List[String],
carbonLocks: ArrayBuffer[ICarbonLock],
parentCarbonTable: CarbonTable): Unit = {
locksToBeAcquired foreach {
lock => {
carbonLocks += CarbonLockUtil
.getLockObject(parentCarbonTable.getAbsoluteTableIdentifier, lock)
}
}
CarbonHiveIndexMetadataUtil.removeIndexInfoFromParentTable(
parentCarbonTable.getIndexInfo,
parentCarbonTable,
dbName,
indexName)(sparkSession)
// clear parent table from meta store cache as it is also required to be
// refreshed when SI table is dropped
CarbonInternalMetastore
.removeTableFromMetadataCache(dbName, parentTableName)(sparkSession)
}
}