| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.spark.sql.hive |
| |
| import org.apache.hadoop.conf.Configuration |
| import org.apache.hadoop.fs.Path |
| import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} |
| import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTablePartition, FunctionResourceLoader, GlobalTempViewManager, SessionCatalog} |
| import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, InterpretedPredicate, PredicateSubquery, ScalarSubquery} |
| import org.apache.spark.sql.catalyst.optimizer.Optimizer |
| import org.apache.spark.sql.catalyst.parser.{ParserInterface, SqlBaseParser} |
| import org.apache.spark.sql.catalyst.parser.ParserUtils.{string, _} |
| import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{CreateTableContext, ShowTablesContext} |
| import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias} |
| import org.apache.spark.sql.catalyst.rules.Rule |
| import org.apache.spark.sql.catalyst.{CatalystConf, TableIdentifier} |
| import org.apache.spark.sql.execution.command.table.{CarbonExplainCommand, CarbonShowTablesCommand} |
| import org.apache.spark.sql.execution.datasources._ |
| import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy} |
| import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder} |
| import org.apache.spark.sql.internal.SQLConf |
| import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule} |
| import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser, CarbonSparkSqlParserUtil} |
| import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, ExperimentalMethods, SparkSession, Strategy} |
| import org.apache.carbondata.core.constants.CarbonCommonConstants |
| import org.apache.carbondata.core.datamap.DataMapStoreManager |
| import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier |
| import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema |
| import org.apache.carbondata.core.util.CarbonProperties |
| import org.apache.carbondata.spark.util.CarbonScalaUtil |
| import org.apache.spark.util.CarbonReflectionUtils |
| |
| /** |
| * This class will have carbon catalog and refresh the relation from cache if the carbontable in |
| * carbon catalog is not same as cached carbon relation's carbon table |
| * |
| * @param externalCatalog |
| * @param globalTempViewManager |
| * @param sparkSession |
| * @param functionResourceLoader |
| * @param functionRegistry |
| * @param conf |
| * @param hadoopConf |
| */ |
| class CarbonHiveSessionCatalog( |
| externalCatalog: HiveExternalCatalog, |
| globalTempViewManager: GlobalTempViewManager, |
| sparkSession: SparkSession, |
| functionResourceLoader: FunctionResourceLoader, |
| functionRegistry: FunctionRegistry, |
| conf: SQLConf, |
| hadoopConf: Configuration) |
| extends HiveSessionCatalog( |
| externalCatalog, |
| globalTempViewManager, |
| sparkSession, |
| functionResourceLoader, |
| functionRegistry, |
| conf, |
| hadoopConf) with CarbonSessionCatalog { |
| |
| private lazy val carbonEnv = { |
| val env = new CarbonEnv |
| env.init(sparkSession) |
| env |
| } |
| /** |
| * return's the carbonEnv instance |
| * @return |
| */ |
| override def getCarbonEnv() : CarbonEnv = { |
| carbonEnv |
| } |
| |
| def alterAddColumns(tableIdentifier: TableIdentifier, |
| schemaParts: String, |
| cols: Option[Seq[ColumnSchema]]) |
| : Unit = { |
| alterTable(tableIdentifier, schemaParts, cols) |
| } |
| |
| def alterDropColumns(tableIdentifier: TableIdentifier, |
| schemaParts: String, |
| cols: Option[Seq[ColumnSchema]]) |
| : Unit = { |
| alterTable(tableIdentifier, schemaParts, cols) |
| } |
| |
| def alterColumnChangeDataTypeOrRename(tableIdentifier: TableIdentifier, |
| schemaParts: String, |
| cols: Option[Seq[ColumnSchema]]) |
| : Unit = { |
| alterTable(tableIdentifier, schemaParts, cols) |
| } |
| |
| // Initialize all listeners to the Operation bus. |
| CarbonEnv.init |
| |
| /** |
| * This method will invalidate carbonrelation from cache if carbon table is updated in |
| * carbon catalog |
| * |
| * @param name |
| * @param alias |
| * @return |
| */ |
| override def lookupRelation(name: TableIdentifier, |
| alias: Option[String]): LogicalPlan = { |
| val rtnRelation = super.lookupRelation(name, alias) |
| var toRefreshRelation = false |
| rtnRelation match { |
| case SubqueryAlias(_, |
| LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _), _) => |
| toRefreshRelation = CarbonEnv.isRefreshRequired(name)(sparkSession) |
| case LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _) => |
| toRefreshRelation = CarbonEnv.isRefreshRequired(name)(sparkSession) |
| case _ => |
| } |
| |
| if (toRefreshRelation) { |
| super.lookupRelation(name, alias) |
| } else { |
| rtnRelation |
| } |
| } |
| |
| /** |
| * returns hive client from session state |
| * |
| * @return |
| */ |
| override def getClient(): org.apache.spark.sql.hive.client.HiveClient = { |
| sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive |
| } |
| |
| override def createPartitions( |
| tableName: TableIdentifier, |
| parts: Seq[CatalogTablePartition], |
| ignoreIfExists: Boolean): Unit = { |
| try { |
| val table = CarbonEnv.getCarbonTable(tableName)(sparkSession) |
| val updatedParts = CarbonScalaUtil.updatePartitions(parts, table) |
| super.createPartitions(tableName, updatedParts, ignoreIfExists) |
| } catch { |
| case e: Exception => |
| super.createPartitions(tableName, parts, ignoreIfExists) |
| } |
| } |
| |
| /** |
| * This is alternate way of getting partition information. It first fetches all partitions from |
| * hive and then apply filter instead of querying hive along with filters. |
| * @param partitionFilters |
| * @param sparkSession |
| * @param identifier |
| * @return |
| */ |
| def getPartitionsAlternate( |
| partitionFilters: Seq[Expression], |
| sparkSession: SparkSession, |
| identifier: TableIdentifier) = { |
| val allPartitions = sparkSession.sessionState.catalog.listPartitions(identifier) |
| val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(identifier) |
| val partitionSchema = catalogTable.partitionSchema |
| if (partitionFilters.nonEmpty) { |
| val boundPredicate = |
| InterpretedPredicate.create(partitionFilters.reduce(And).transform { |
| case att: AttributeReference => |
| val index = partitionSchema.indexWhere(_.name == att.name) |
| BoundReference(index, partitionSchema(index).dataType, nullable = true) |
| }) |
| allPartitions.filter { p => boundPredicate(p.toRow(partitionSchema)) } |
| } else { |
| allPartitions |
| } |
| } |
| |
| /** |
| * Update the storageformat with new location information |
| */ |
| override def updateStorageLocation( |
| path: Path, |
| storage: CatalogStorageFormat, |
| newTableName: String, |
| dbName: String): CatalogStorageFormat = { |
| storage.copy(locationUri = Some(path.toString)) |
| } |
| } |
| |
| /** |
| * Session state implementation to override sql parser and adding strategies |
| * @param sparkSession |
| */ |
| class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sparkSession) { |
| |
| override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession) |
| |
| experimentalMethods.extraStrategies = extraStrategies |
| |
| experimentalMethods.extraOptimizations = extraOptimizations |
| |
| def extraStrategies: Seq[Strategy] = { |
| Seq( |
| new StreamingTableStrategy(sparkSession), |
| new CarbonLateDecodeStrategy, |
| new DDLStrategy(sparkSession) |
| ) |
| } |
| |
| def extraOptimizations: Seq[Rule[LogicalPlan]] = { |
| Seq(new CarbonIUDRule, |
| new CarbonUDFTransformRule, |
| new CarbonLateDecodeRule) |
| } |
| |
| override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods) |
| |
| def extendedAnalyzerRules: Seq[Rule[LogicalPlan]] = Nil |
| def internalAnalyzerRules: Seq[Rule[LogicalPlan]] = { |
| catalog.ParquetConversions :: |
| catalog.OrcConversions :: |
| CarbonPreInsertionCasts(sparkSession) :: |
| CarbonIUDAnalysisRule(sparkSession) :: |
| AnalyzeCreateTable(sparkSession) :: |
| PreprocessTableInsertion(conf) :: |
| DataSourceAnalysis(conf) :: |
| (if (conf.runSQLonFile) { |
| new ResolveDataSource(sparkSession) :: Nil |
| } else { Nil }) |
| } |
| |
| override lazy val analyzer: Analyzer = |
| new CarbonAnalyzer(catalog, conf, sparkSession, |
| new Analyzer(catalog, conf) { |
| override val extendedResolutionRules = |
| if (extendedAnalyzerRules.nonEmpty) { |
| extendedAnalyzerRules ++ internalAnalyzerRules |
| } else { |
| internalAnalyzerRules |
| } |
| override val extendedCheckRules = Seq( |
| PreWriteCheck(conf, catalog)) |
| } |
| ) |
| |
| /** |
| * Internal catalog for managing table and database states. |
| */ |
| override lazy val catalog = { |
| new CarbonHiveSessionCatalog( |
| sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog], |
| sparkSession.sharedState.globalTempViewManager, |
| sparkSession, |
| functionResourceLoader, |
| functionRegistry, |
| conf, |
| newHadoopConf()) |
| } |
| } |
| |
| class CarbonAnalyzer(catalog: SessionCatalog, |
| conf: CatalystConf, |
| sparkSession: SparkSession, |
| analyzer: Analyzer) extends Analyzer(catalog, conf) { |
| |
| val mvPlan = try { |
| CarbonReflectionUtils.createObject( |
| "org.apache.carbondata.mv.datamap.MVAnalyzerRule", |
| sparkSession)._1.asInstanceOf[Rule[LogicalPlan]] |
| } catch { |
| case e: Exception => |
| null |
| } |
| |
| override def execute(plan: LogicalPlan): LogicalPlan = { |
| val logicalPlan = analyzer.execute(plan) |
| if (mvPlan != null) { |
| mvPlan.apply(logicalPlan) |
| } else { |
| logicalPlan |
| } |
| } |
| } |
| |
| class CarbonOptimizer( |
| catalog: SessionCatalog, |
| conf: SQLConf, |
| experimentalMethods: ExperimentalMethods) |
| extends SparkOptimizer(catalog, conf, experimentalMethods) { |
| |
| override def execute(plan: LogicalPlan): LogicalPlan = { |
| val transFormedPlan: LogicalPlan = CarbonOptimizerUtil.transformForScalarSubQuery(plan) |
| super.execute(transFormedPlan) |
| } |
| } |
| |
| object CarbonOptimizerUtil { |
| def transformForScalarSubQuery(plan: LogicalPlan) : LogicalPlan = { |
| // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule, |
| // And optimize whole plan at once. |
| val transFormedPlan = plan.transform { |
| case filter: Filter => |
| filter.transformExpressions { |
| case s: ScalarSubquery => |
| val tPlan = s.plan.transform { |
| case lr: LogicalRelation |
| if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => |
| lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true |
| lr |
| } |
| ScalarSubquery(tPlan, s.children, s.exprId) |
| case p: PredicateSubquery => |
| val tPlan = p.plan.transform { |
| case lr: LogicalRelation |
| if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => |
| lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true |
| lr |
| } |
| PredicateSubquery(tPlan, p.children, p.nullAware, p.exprId) |
| } |
| } |
| transFormedPlan |
| } |
| } |
| |
| class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession) |
| extends SparkSqlAstBuilder(conf) { |
| |
| val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession) |
| |
| override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = { |
| val fileStorage = CarbonSparkSqlParserUtil.getFileStorage(ctx.createFileFormat) |
| |
| if (fileStorage.equalsIgnoreCase("'carbondata'") || |
| fileStorage.equalsIgnoreCase("carbondata") || |
| fileStorage.equalsIgnoreCase("'carbonfile'") || |
| fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) { |
| val createTableTuple = (ctx.createTableHeader, ctx.skewSpec, ctx.bucketSpec, |
| ctx.partitionColumns, ctx.columns, ctx.tablePropertyList, ctx.locationSpec(), |
| Option(ctx.STRING()).map(string), |
| ctx.AS, ctx.query, fileStorage) |
| helper.createCarbonTable(createTableTuple) |
| } else { |
| super.visitCreateTable(ctx) |
| } |
| } |
| |
| override def visitShowTables(ctx: ShowTablesContext): LogicalPlan = { |
| withOrigin(ctx) { |
| if (CarbonProperties.getInstance() |
| .getProperty(CarbonCommonConstants.CARBON_SHOW_DATAMAPS, |
| CarbonCommonConstants.CARBON_SHOW_DATAMAPS_DEFAULT).toBoolean) { |
| super.visitShowTables(ctx) |
| } else { |
| CarbonShowTablesCommand( |
| Option(ctx.db).map(_.getText), |
| Option(ctx.pattern).map(string)) |
| } |
| } |
| } |
| |
| override def visitExplain(ctx: SqlBaseParser.ExplainContext): LogicalPlan = { |
| CarbonExplainCommand(super.visitExplain(ctx)) |
| } |
| } |