blob: 0d192c18bf6ccf515f20f45a4470a48058704cba [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.secondaryindex.hive
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import org.apache.spark.sql.{AnalysisException, CarbonEnv, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.hive.{CarbonHiveMetadataUtil, CarbonRelation}
import org.apache.spark.sql.secondaryindex.util.CarbonInternalScalaUtil
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.schema.indextable.{IndexMetadata, IndexTableInfo}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonTablePath
object CarbonInternalMetastore {
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
/**
* This method can be used to delete the index table and update the parent table and hive
* metadata.
*
* @param removeEntryFromParentTable if true then the index table info would be removed from
* the Parent table
*/
def dropIndexTable(indexTableIdentifier: TableIdentifier, indexCarbonTable: CarbonTable,
tableStorePath: String,
parentCarbonTable: CarbonTable,
removeEntryFromParentTable: Boolean)(sparkSession: SparkSession) {
val dbName = indexTableIdentifier.database.get
val tableName = indexTableIdentifier.table
try {
if (indexCarbonTable != null) {
LOGGER.info(s"Deleting index table $dbName.$tableName")
CarbonEnv.getInstance(sparkSession).carbonMetaStore
.dropTable(indexCarbonTable.getAbsoluteTableIdentifier)(sparkSession)
if (removeEntryFromParentTable && parentCarbonTable != null) {
val parentTableName = parentCarbonTable.getTableName
val relation: LogicalPlan = CarbonEnv.getInstance(sparkSession).carbonMetaStore
.lookupRelation(Some(dbName), parentTableName)(sparkSession)
val indexInfo = if (relation != null) {
relation.asInstanceOf[CarbonRelation].carbonTable.getIndexInfo
} else {
sys.error(s"Table $dbName.$parentTableName does not exists")
}
sparkSession.sessionState
.catalog
.dropTable(indexTableIdentifier, ignoreIfNotExists = true, purge = false)
// even if folders are deleted from carbon store it can happen that table exists in hive
CarbonHiveMetadataUtil
.invalidateAndUpdateIndexInfo(indexTableIdentifier, indexInfo, parentCarbonTable)(
sparkSession)
// clear parent table from meta store cache as it is also required to be
// refreshed when SI table is dropped
DataMapStoreManager.getInstance()
.clearDataMaps(indexCarbonTable.getAbsoluteTableIdentifier)
removeTableFromMetadataCache(dbName, indexCarbonTable.getTableName)(sparkSession)
removeTableFromMetadataCache(dbName, parentTableName)(sparkSession)
}
}
} finally {
// Even if some exception occurs we will try to remove the table from catalog to avoid
// stale state.
sparkSession.sessionState.catalog
.dropTable(indexTableIdentifier, ignoreIfNotExists = true, purge = false)
sparkSession.sessionState.catalog.refreshTable(indexTableIdentifier)
LOGGER.info(s"Deleted index table $dbName.$tableName")
}
}
def removeTableFromMetadataCache(dbName: String, tableName: String)
(sparkSession: SparkSession): Unit = {
CarbonEnv.getInstance(sparkSession).carbonMetaStore.removeTableFromMetadata(dbName, tableName)
}
/**
* This method will delete the index tables silently. We want this because even if one index
* delete fails, we need to try delete on all other index tables as well.
*
*/
def deleteIndexSilent(carbonTableIdentifier: TableIdentifier,
storePath: String,
parentCarbonTable: CarbonTable)(sparkSession: SparkSession): Unit = {
val dbName = carbonTableIdentifier.database
val indexTable = carbonTableIdentifier.table
var indexCarbonTable: CarbonTable = null
try {
indexCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetaStore
.lookupRelation(dbName, indexTable)(sparkSession)
.asInstanceOf[CarbonRelation].carbonTable
} catch {
case e: Exception =>
LOGGER.error("Exception occurred while drop index table for : " +
s"$dbName.$indexTable : ${ e.getMessage }")
}
finally {
try {
dropIndexTable(carbonTableIdentifier,
indexCarbonTable,
storePath,
parentCarbonTable,
removeEntryFromParentTable = true
)(sparkSession)
} catch {
case e: Exception =>
LOGGER.error("Exception occurred while drop index table for : " +
s"$dbName.$indexTable : ${ e.getMessage }")
}
}
}
def refreshIndexInfo(dbName: String, tableName: String,
carbonTable: CarbonTable, needLock: Boolean = true)(sparkSession: SparkSession): Unit = {
val indexTableExists = CarbonInternalScalaUtil.isIndexTableExists(carbonTable)
// tables created without property "indexTableExists", will return null, for those tables enter
// into below block, gather the actual data from hive and then set this property to true/false
// then once the property has a value true/false, make decision based on the property value
if (null != carbonTable && (null == indexTableExists || indexTableExists.toBoolean)) {
// When Index information is not loaded in main table, then it will fetch
// index info from hivemetastore and set it in the carbon table.
val indexTableMap = new ConcurrentHashMap[String, java.util.List[String]]
try {
val (isIndexTable, parentTableName, indexInfo, parentTablePath, parentTableId, schema) =
indexInfoFromHive(dbName, tableName)(sparkSession)
if (isIndexTable.equals("true")) {
val indexMeta = new IndexMetadata(indexTableMap,
parentTableName,
true,
parentTablePath,
parentTableId)
carbonTable.getTableInfo.getFactTable.getTableProperties
.put(carbonTable.getCarbonTableIdentifier.getTableId, indexMeta.serialize)
} else {
IndexTableInfo.fromGson(indexInfo)
.foreach { indexTableInfo =>
indexTableMap
.put(indexTableInfo.getTableName, indexTableInfo.getIndexCols)
}
val indexMetadata = new IndexMetadata(indexTableMap,
parentTableName,
isIndexTable.toBoolean,
parentTablePath, parentTableId)
carbonTable.getTableInfo.getFactTable.getTableProperties
.put(carbonTable.getCarbonTableIdentifier.getTableId, indexMetadata.serialize)
}
if (null == indexTableExists && !isIndexTable.equals("true")) {
val indexTables = CarbonInternalScalaUtil.getIndexesTables(carbonTable)
val tableIdentifier = new TableIdentifier(carbonTable.getTableName,
Some(carbonTable.getDatabaseName))
if (indexTables.isEmpty) {
// modify the tableProperties of mainTable by adding "indexTableExists" property
// to false as there is no index table for this table
CarbonInternalScalaUtil
.addOrModifyTableProperty(carbonTable,
Map("indexTableExists" -> "false"), needLock)(sparkSession)
} else {
// modify the tableProperties of mainTable by adding "indexTableExists" property
// to true as there are some index table for this table
CarbonInternalScalaUtil
.addOrModifyTableProperty(carbonTable,
Map("indexTableExists" -> "true"), needLock)(sparkSession)
}
}
} catch {
case e: Exception =>
// In case of creating a table, hive table will not be available.
LOGGER.error(e.getMessage, e)
}
}
}
def indexInfoFromHive(databaseName: String, tableName: String)
(sparkSession: SparkSession): (String, String, String, String, String, String) = {
val hiveTable = sparkSession.sessionState.catalog
.getTableMetadata(TableIdentifier(tableName, Some(databaseName)))
val indexList = hiveTable.storage.properties.getOrElse(
"indexInfo", IndexTableInfo.toGson(new Array[IndexTableInfo](0)))
val datasourceOptions = optionsValueFromParts(hiveTable)
val isIndexTable = datasourceOptions.getOrElse("isIndexTable", "false")
val parentTableName = datasourceOptions.getOrElse("parentTableName", "")
val parentTablePath = if (!parentTableName.isEmpty) {
CarbonEnv
.getCarbonTable(TableIdentifier(parentTableName, Some(databaseName)))(sparkSession)
.getTablePath
} else {
""
}
val parentTableId = datasourceOptions.getOrElse("parentTableId", "")
(isIndexTable, parentTableName, indexList, parentTablePath, parentTableId, hiveTable.schema
.json)
}
private def optionsValueFromParts(table: CatalogTable): Map[String, String] = {
val optionsCombined = new java.util.HashMap[String, String]
val optionsKeys: Option[String] =
table.storage.properties.get("spark.sql.sources.options.keys.numParts").map { numParts =>
combinePartsFromSerdeProps(numParts, "spark.sql.sources.options.keys", table).mkString
}
optionsKeys match {
case Some(optKeys) =>
optKeys.split(",").foreach { optKey =>
table.storage.properties.get(s"$optKey.numParts").map { numParts =>
optionsCombined.put(optKey,
combinePartsFromSerdeProps(numParts, optKey, table).mkString)
}
}
optionsCombined.asScala.toMap
case None =>
LOGGER.info(s"spark.sql.sources.options.keys expected, but read nothing")
table.storage.properties
}
}
private def combinePartsFromSerdeProps(numParts: String,
key: String, table: CatalogTable): Seq[String] = {
val keysParts = (0 until numParts.toInt).map { index =>
val keysPart =
table.storage.properties.get(s"$key.part.$index").orNull
if (keysPart == null) {
throw new AnalysisException(
s"Could not read $key from the metastore because it is corrupted " +
s"(missing part $index of the $key, $numParts parts are expected).")
}
keysPart
}
keysParts
}
def deleteTableDirectory(dbName: String, tableName: String,
sparkSession: SparkSession): Unit = {
val databaseLocation = CarbonEnv.getDatabaseLocation(dbName, sparkSession)
val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName.toLowerCase
val metadataFilePath =
CarbonTablePath.getMetadataPath(tablePath)
if (FileFactory.isFileExist(metadataFilePath)) {
val file = FileFactory.getCarbonFile(metadataFilePath)
CarbonUtil.deleteFoldersAndFilesSilent(file.getParentFile)
}
}
}