| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.spark.sql.hive |
| |
| import scala.collection.JavaConverters._ |
| |
| import org.apache.hadoop.fs.Path |
| import org.apache.spark.sql.{CarbonSession, SparkSession} |
| import org.apache.spark.sql.catalyst.TableIdentifier |
| |
| import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree |
| import org.apache.carbondata.core.datamap.DataMapStoreManager |
| import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier} |
| import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl |
| import org.apache.carbondata.core.metadata.schema.table.CarbonTable |
| import org.apache.carbondata.core.util.CarbonUtil |
| import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} |
| import org.apache.carbondata.format |
| import org.apache.carbondata.format.SchemaEvolutionEntry |
| import org.apache.carbondata.spark.util.CarbonSparkUtil |
| |
| /** |
| * Metastore to store carbonschema in hive |
| */ |
| class CarbonHiveMetaStore extends CarbonFileMetastore { |
| |
| override def isReadFromHiveMetaStore: Boolean = true |
| |
| /** |
| * Create spark session from paramters. |
| * |
| * @param parameters |
| * @param absIdentifier |
| * @param sparkSession |
| */ |
| override def createCarbonRelation(parameters: Map[String, String], |
| absIdentifier: AbsoluteTableIdentifier, |
| sparkSession: SparkSession): CarbonRelation = { |
| val info = CarbonUtil.convertGsonToTableInfo(parameters.asJava) |
| if (info != null) { |
| val table = CarbonTable.buildFromTableInfo(info) |
| CarbonRelation(info.getDatabaseName, info.getFactTable.getTableName, |
| CarbonSparkUtil.createSparkMeta(table), table) |
| } else { |
| super.createCarbonRelation(parameters, absIdentifier, sparkSession) |
| } |
| } |
| |
| |
| override def isTablePathExists(tableIdentifier: TableIdentifier) |
| (sparkSession: SparkSession): Boolean = { |
| tableExists(tableIdentifier)(sparkSession) |
| } |
| |
| override def dropTable(absoluteTableIdentifier: AbsoluteTableIdentifier) |
| (sparkSession: SparkSession): Unit = { |
| val dbName = absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName |
| val tableName = absoluteTableIdentifier.getCarbonTableIdentifier.getTableName |
| val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName, tableName) |
| if (null != carbonTable) { |
| // clear driver B-tree and dictionary cache |
| ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable) |
| } |
| checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName))) |
| removeTableFromMetadata(dbName, tableName) |
| CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession) |
| // discard cached table info in cachedDataSourceTables |
| val tableIdentifier = TableIdentifier(tableName, Option(dbName)) |
| sparkSession.sessionState.catalog.refreshTable(tableIdentifier) |
| DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier) |
| } |
| |
| override def checkSchemasModifiedTimeAndReloadTable(tableIdentifier: TableIdentifier): Boolean = { |
| // do nothing |
| false |
| } |
| |
| override def listAllTables(sparkSession: SparkSession): Seq[CarbonTable] = { |
| // Todo |
| Seq() |
| } |
| |
| override def getThriftTableInfo(tablePath: CarbonTablePath) |
| (sparkSession: SparkSession): format.TableInfo = { |
| val identifier = tablePath.getCarbonTableIdentifier |
| val relation = lookupRelation(TableIdentifier(identifier.getTableName, |
| Some(identifier.getDatabaseName)))(sparkSession).asInstanceOf[CarbonRelation] |
| val carbonTable = relation.metaData.carbonTable |
| val schemaConverter = new ThriftWrapperSchemaConverterImpl |
| schemaConverter.fromWrapperToExternalTableInfo(carbonTable.getTableInfo, |
| carbonTable.getDatabaseName, |
| carbonTable.getTableName) |
| } |
| |
| |
| /** |
| * This method will overwrite the existing schema and update it with the given details |
| * |
| * @param newTableIdentifier |
| * @param thriftTableInfo |
| * @param schemaEvolutionEntry |
| * @param sparkSession |
| */ |
| override def updateTableSchemaForAlter(newTableIdentifier: CarbonTableIdentifier, |
| oldTableIdentifier: CarbonTableIdentifier, |
| thriftTableInfo: format.TableInfo, |
| schemaEvolutionEntry: SchemaEvolutionEntry, |
| tablePath: String) |
| (sparkSession: SparkSession): String = { |
| val schemaConverter = new ThriftWrapperSchemaConverterImpl |
| if (schemaEvolutionEntry != null) { |
| thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry) |
| } |
| updateHiveMetaStoreForAlter(newTableIdentifier, |
| oldTableIdentifier, |
| thriftTableInfo, |
| tablePath, |
| sparkSession, |
| schemaConverter) |
| } |
| |
| /** |
| * This method will overwrite the existing schema and update it with the given details |
| * |
| * @param newTableIdentifier |
| * @param thriftTableInfo |
| * @param carbonTablePath |
| * @param sparkSession |
| */ |
| override def updateTableSchemaForDataMap(newTableIdentifier: CarbonTableIdentifier, |
| oldTableIdentifier: CarbonTableIdentifier, |
| thriftTableInfo: org.apache.carbondata.format.TableInfo, |
| carbonTablePath: String)(sparkSession: SparkSession): String = { |
| val schemaConverter = new ThriftWrapperSchemaConverterImpl |
| updateHiveMetaStoreForAlter(newTableIdentifier, |
| oldTableIdentifier, |
| thriftTableInfo, |
| carbonTablePath, |
| sparkSession, |
| schemaConverter) |
| } |
| |
| private def updateHiveMetaStoreForAlter(newTableIdentifier: CarbonTableIdentifier, |
| oldTableIdentifier: CarbonTableIdentifier, |
| thriftTableInfo: format.TableInfo, |
| oldTablePath: String, |
| sparkSession: SparkSession, |
| schemaConverter: ThriftWrapperSchemaConverterImpl) = { |
| val newTablePath = |
| CarbonUtil.getNewTablePath(new Path(oldTablePath), newTableIdentifier.getTableName) |
| val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo( |
| thriftTableInfo, |
| newTableIdentifier.getDatabaseName, |
| newTableIdentifier.getTableName, |
| newTablePath) |
| val dbName = oldTableIdentifier.getDatabaseName |
| val tableName = oldTableIdentifier.getTableName |
| val schemaParts = CarbonUtil.convertToMultiGsonStrings(wrapperTableInfo, "=", "'", "") |
| val hiveClient = sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog] |
| .getClient() |
| hiveClient.runSqlHive(s"ALTER TABLE $dbName.$tableName SET SERDEPROPERTIES($schemaParts)") |
| |
| sparkSession.catalog.refreshTable(TableIdentifier(tableName, Some(dbName)).quotedString) |
| removeTableFromMetadata(dbName, tableName) |
| CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo) |
| CarbonStorePath.getCarbonTablePath(oldTablePath, newTableIdentifier).getPath |
| } |
| |
| /** |
| * This method will is used to remove the evolution entry in case of failure. |
| * |
| * @param carbonTableIdentifier |
| * @param thriftTableInfo |
| * @param sparkSession |
| */ |
| override def revertTableSchemaInAlterFailure(carbonTableIdentifier: CarbonTableIdentifier, |
| thriftTableInfo: format.TableInfo, |
| identifier: AbsoluteTableIdentifier) |
| (sparkSession: SparkSession): String = { |
| val schemaConverter = new ThriftWrapperSchemaConverterImpl |
| val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history |
| evolutionEntries.remove(evolutionEntries.size() - 1) |
| updateHiveMetaStoreForAlter(carbonTableIdentifier, |
| carbonTableIdentifier, |
| thriftTableInfo, |
| identifier.getTablePath, |
| sparkSession, |
| schemaConverter) |
| } |
| |
| override def revertTableSchemaForPreAggCreationFailure(absoluteTableIdentifier: |
| AbsoluteTableIdentifier, |
| thriftTableInfo: org.apache.carbondata.format.TableInfo) |
| (sparkSession: SparkSession): String = { |
| val schemaConverter = new ThriftWrapperSchemaConverterImpl |
| val childSchemas = thriftTableInfo.dataMapSchemas |
| childSchemas.remove(childSchemas.size()) |
| val carbonTableIdentifier = absoluteTableIdentifier.getCarbonTableIdentifier |
| updateHiveMetaStoreForAlter(carbonTableIdentifier, |
| carbonTableIdentifier, |
| thriftTableInfo, |
| absoluteTableIdentifier.getTablePath, |
| sparkSession, |
| schemaConverter) |
| } |
| } |