blob: a2d10644bc4da6e7f1697bc892ec5162ea8bdbe9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive
import scala.collection.JavaConverters._
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{CarbonSession, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
import org.apache.carbondata.format
import org.apache.carbondata.format.SchemaEvolutionEntry
import org.apache.carbondata.spark.util.CarbonSparkUtil
/**
* Metastore to store carbonschema in hive
*/
class CarbonHiveMetaStore extends CarbonFileMetastore {
override def isReadFromHiveMetaStore: Boolean = true
/**
* Create spark session from paramters.
*
* @param parameters
* @param absIdentifier
* @param sparkSession
*/
override def createCarbonRelation(parameters: Map[String, String],
absIdentifier: AbsoluteTableIdentifier,
sparkSession: SparkSession): CarbonRelation = {
val info = CarbonUtil.convertGsonToTableInfo(parameters.asJava)
if (info != null) {
val table = CarbonTable.buildFromTableInfo(info)
CarbonRelation(info.getDatabaseName, info.getFactTable.getTableName,
CarbonSparkUtil.createSparkMeta(table), table)
} else {
super.createCarbonRelation(parameters, absIdentifier, sparkSession)
}
}
override def isTablePathExists(tableIdentifier: TableIdentifier)
(sparkSession: SparkSession): Boolean = {
tableExists(tableIdentifier)(sparkSession)
}
override def dropTable(absoluteTableIdentifier: AbsoluteTableIdentifier)
(sparkSession: SparkSession): Unit = {
val dbName = absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName
val tableName = absoluteTableIdentifier.getCarbonTableIdentifier.getTableName
val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName, tableName)
if (null != carbonTable) {
// clear driver B-tree and dictionary cache
ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable)
}
checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName)))
removeTableFromMetadata(dbName, tableName)
CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
// discard cached table info in cachedDataSourceTables
val tableIdentifier = TableIdentifier(tableName, Option(dbName))
sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier)
}
override def checkSchemasModifiedTimeAndReloadTable(tableIdentifier: TableIdentifier): Boolean = {
// do nothing
false
}
override def listAllTables(sparkSession: SparkSession): Seq[CarbonTable] = {
// Todo
Seq()
}
override def getThriftTableInfo(tablePath: CarbonTablePath)
(sparkSession: SparkSession): format.TableInfo = {
val identifier = tablePath.getCarbonTableIdentifier
val relation = lookupRelation(TableIdentifier(identifier.getTableName,
Some(identifier.getDatabaseName)))(sparkSession).asInstanceOf[CarbonRelation]
val carbonTable = relation.metaData.carbonTable
val schemaConverter = new ThriftWrapperSchemaConverterImpl
schemaConverter.fromWrapperToExternalTableInfo(carbonTable.getTableInfo,
carbonTable.getDatabaseName,
carbonTable.getTableName)
}
/**
* This method will overwrite the existing schema and update it with the given details
*
* @param newTableIdentifier
* @param thriftTableInfo
* @param schemaEvolutionEntry
* @param sparkSession
*/
override def updateTableSchemaForAlter(newTableIdentifier: CarbonTableIdentifier,
oldTableIdentifier: CarbonTableIdentifier,
thriftTableInfo: format.TableInfo,
schemaEvolutionEntry: SchemaEvolutionEntry,
tablePath: String)
(sparkSession: SparkSession): String = {
val schemaConverter = new ThriftWrapperSchemaConverterImpl
if (schemaEvolutionEntry != null) {
thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry)
}
updateHiveMetaStoreForAlter(newTableIdentifier,
oldTableIdentifier,
thriftTableInfo,
tablePath,
sparkSession,
schemaConverter)
}
/**
* This method will overwrite the existing schema and update it with the given details
*
* @param newTableIdentifier
* @param thriftTableInfo
* @param carbonTablePath
* @param sparkSession
*/
override def updateTableSchemaForDataMap(newTableIdentifier: CarbonTableIdentifier,
oldTableIdentifier: CarbonTableIdentifier,
thriftTableInfo: org.apache.carbondata.format.TableInfo,
carbonTablePath: String)(sparkSession: SparkSession): String = {
val schemaConverter = new ThriftWrapperSchemaConverterImpl
updateHiveMetaStoreForAlter(newTableIdentifier,
oldTableIdentifier,
thriftTableInfo,
carbonTablePath,
sparkSession,
schemaConverter)
}
private def updateHiveMetaStoreForAlter(newTableIdentifier: CarbonTableIdentifier,
oldTableIdentifier: CarbonTableIdentifier,
thriftTableInfo: format.TableInfo,
oldTablePath: String,
sparkSession: SparkSession,
schemaConverter: ThriftWrapperSchemaConverterImpl) = {
val newTablePath =
CarbonUtil.getNewTablePath(new Path(oldTablePath), newTableIdentifier.getTableName)
val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
thriftTableInfo,
newTableIdentifier.getDatabaseName,
newTableIdentifier.getTableName,
newTablePath)
val dbName = oldTableIdentifier.getDatabaseName
val tableName = oldTableIdentifier.getTableName
val schemaParts = CarbonUtil.convertToMultiGsonStrings(wrapperTableInfo, "=", "'", "")
val hiveClient = sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]
.getClient()
hiveClient.runSqlHive(s"ALTER TABLE $dbName.$tableName SET SERDEPROPERTIES($schemaParts)")
sparkSession.catalog.refreshTable(TableIdentifier(tableName, Some(dbName)).quotedString)
removeTableFromMetadata(dbName, tableName)
CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
CarbonStorePath.getCarbonTablePath(oldTablePath, newTableIdentifier).getPath
}
/**
* This method will is used to remove the evolution entry in case of failure.
*
* @param carbonTableIdentifier
* @param thriftTableInfo
* @param sparkSession
*/
override def revertTableSchemaInAlterFailure(carbonTableIdentifier: CarbonTableIdentifier,
thriftTableInfo: format.TableInfo,
identifier: AbsoluteTableIdentifier)
(sparkSession: SparkSession): String = {
val schemaConverter = new ThriftWrapperSchemaConverterImpl
val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history
evolutionEntries.remove(evolutionEntries.size() - 1)
updateHiveMetaStoreForAlter(carbonTableIdentifier,
carbonTableIdentifier,
thriftTableInfo,
identifier.getTablePath,
sparkSession,
schemaConverter)
}
override def revertTableSchemaForPreAggCreationFailure(absoluteTableIdentifier:
AbsoluteTableIdentifier,
thriftTableInfo: org.apache.carbondata.format.TableInfo)
(sparkSession: SparkSession): String = {
val schemaConverter = new ThriftWrapperSchemaConverterImpl
val childSchemas = thriftTableInfo.dataMapSchemas
childSchemas.remove(childSchemas.size())
val carbonTableIdentifier = absoluteTableIdentifier.getCarbonTableIdentifier
updateHiveMetaStoreForAlter(carbonTableIdentifier,
carbonTableIdentifier,
thriftTableInfo,
absoluteTableIdentifier.getTablePath,
sparkSession,
schemaConverter)
}
}