blob: 36f166d78566190ffa639b7ed03b69f7bf096309 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive
import java.util.concurrent.Callable
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
import org.apache.spark.sql.internal.{SQLConf, SessionResourceLoader, SessionState, SessionStateBuilder}
import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
import org.apache.spark.sql.parser.CarbonSparkSqlParser
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.format.TableInfo
import org.apache.carbondata.spark.util.CarbonScalaUtil
/**
* This class will have carbon catalog and refresh the relation from cache if the carbontable in
* carbon catalog is not same as cached carbon relation's carbon table
*
* @param externalCatalog
* @param globalTempViewManager
* @param sparkSession
* @param functionResourceLoader
* @param functionRegistry
* @param conf
* @param hadoopConf
*/
class InMemorySessionCatalog(
externalCatalog: ExternalCatalog,
globalTempViewManager: GlobalTempViewManager,
functionRegistry: FunctionRegistry,
sparkSession: SparkSession,
conf: SQLConf,
hadoopConf: Configuration,
parser: ParserInterface,
functionResourceLoader: FunctionResourceLoader)
extends SessionCatalog(
externalCatalog,
globalTempViewManager,
functionRegistry,
conf,
hadoopConf,
parser,
functionResourceLoader
) with CarbonSessionCatalog {
override def alterTableRename(oldTableIdentifier: TableIdentifier,
newTableIdentifier: TableIdentifier,
newTablePath: String): Unit = {
sparkSession.sessionState.catalog.renameTable(oldTableIdentifier, newTableIdentifier)
}
override def alterTable(tableIdentifier: TableIdentifier,
schemaParts: String,
cols: Option[Seq[ColumnSchema]]): Unit = {
// NOt Required in case of In-memory catalog
}
override def alterAddColumns(tableIdentifier: TableIdentifier,
schemaParts: String,
newColumns: Option[Seq[ColumnSchema]]): Unit = {
val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
val structType = catalogTable.schema
var newStructType = structType
newColumns.get.foreach {cols =>
newStructType = structType
.add(cols.getColumnName,
CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(cols.getDataType))
}
alterSchema(newStructType, catalogTable, tableIdentifier)
}
override def alterDropColumns(tableIdentifier: TableIdentifier,
schemaParts: String,
dropCols: Option[Seq[ColumnSchema]]): Unit = {
val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
val fields = catalogTable.schema.fields.filterNot { field =>
dropCols.get.exists { col =>
col.getColumnName.equalsIgnoreCase(field.name)
}
}
alterSchema(new StructType(fields), catalogTable, tableIdentifier)
}
override def alterColumnChangeDataTypeOrRename(tableIdentifier: TableIdentifier,
schemaParts: String,
columns: Option[Seq[ColumnSchema]]): Unit = {
val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
val a = catalogTable.schema.fields.flatMap { field =>
columns.get.map { col =>
if (col.getColumnName.equalsIgnoreCase(field.name)) {
StructField(col.getColumnName,
CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(col.getDataType))
} else {
field
}
}
}
alterSchema(new StructType(a), catalogTable, tableIdentifier)
}
private def alterSchema(structType: StructType,
catalogTable: CatalogTable,
tableIdentifier: TableIdentifier): Unit = {
val copy = catalogTable.copy(schema = structType)
sparkSession.sessionState.catalog.alterTable(copy)
sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
}
lazy val carbonEnv = {
val env = new CarbonEnv
env.init(sparkSession)
env
}
def getCarbonEnv() : CarbonEnv = {
carbonEnv
}
// Initialize all listeners to the Operation bus.
CarbonEnv.init
def getThriftTableInfo(tablePath: String): TableInfo = {
val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath)
CarbonUtil.readSchemaFile(tableMetadataFile)
}
override def lookupRelation(name: TableIdentifier): LogicalPlan = {
var rtnRelation = super.lookupRelation(name)
val isRelationRefreshed =
CarbonSessionUtil.refreshRelationAndSetStats(rtnRelation, name)(sparkSession)
if (isRelationRefreshed) {
rtnRelation = super.lookupRelation(name)
// Reset the stats after lookup.
CarbonSessionUtil.refreshRelationAndSetStats(rtnRelation, name)(sparkSession)
}
rtnRelation
}
override def getCachedPlan(t: QualifiedTableName,
c: Callable[LogicalPlan]): LogicalPlan = {
val plan = super.getCachedPlan(t, c)
CarbonSessionUtil.updateCachedPlan(plan)
}
/**
* returns hive client from HiveExternalCatalog
*
* @return
*/
def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
null
}
override def createPartitions(
tableName: TableIdentifier,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = {
try {
val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
val updatedParts = CarbonScalaUtil.updatePartitions(parts, table)
super.createPartitions(tableName, updatedParts, ignoreIfExists)
} catch {
case e: Exception =>
super.createPartitions(tableName, parts, ignoreIfExists)
}
}
/**
* This is alternate way of getting partition information. It first fetches all partitions from
* hive and then apply filter instead of querying hive along with filters.
* @param partitionFilters
* @param sparkSession
* @param identifier
* @return
*/
override def getPartitionsAlternate(partitionFilters: Seq[Expression],
sparkSession: SparkSession,
identifier: TableIdentifier) = {
CarbonSessionUtil.prunePartitionsByFilter(partitionFilters, sparkSession, identifier)
}
/**
* Update the storageformat with new location information
*/
override def updateStorageLocation(
path: Path,
storage: CatalogStorageFormat,
newTableName: String,
dbName: String): CatalogStorageFormat = {
storage.copy(locationUri = Some(path.toUri))
}
}
class CarbonInMemorySessionStateBuilder (sparkSession: SparkSession,
parentState: Option[SessionState] = None)
extends SessionStateBuilder(sparkSession, parentState) {
override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
experimentalMethods.extraStrategies =
Seq(new StreamingTableStrategy(sparkSession),
new CarbonLateDecodeStrategy,
new DDLStrategy(sparkSession)
)
experimentalMethods.extraOptimizations = Seq(new CarbonIUDRule,
new CarbonUDFTransformRule,
new CarbonLateDecodeRule)
/**
* Internal catalog for managing table and database states.
*/
override protected lazy val catalog: InMemorySessionCatalog = {
val catalog = new InMemorySessionCatalog(
externalCatalog,
session.sharedState.globalTempViewManager,
functionRegistry,
sparkSession,
conf,
SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
sqlParser,
resourceLoader)
parentState.foreach(_.catalog.copyStateTo(catalog))
catalog
}
private def externalCatalog: ExternalCatalog =
session.sharedState.externalCatalog.asInstanceOf[ExternalCatalog]
override protected lazy val resourceLoader: SessionResourceLoader = {
new SessionResourceLoader(session)
}
override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
override protected def analyzer: Analyzer = {
new CarbonAnalyzer(catalog,
conf,
sparkSession,
getAnalyzer(super.analyzer))
}
/**
* This method adds carbon rules to Hive Analyzer and returns new analyzer
*
* @param analyzer SessionStateBuilder analyzer
* @return
*/
def getAnalyzer(analyzer: Analyzer): Analyzer = {
new Analyzer(catalog, conf) {
override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
analyzer.extendedResolutionRules ++
Seq(CarbonIUDAnalysisRule(sparkSession)) ++
Seq(CarbonPreInsertionCasts(sparkSession)) ++ customResolutionRules
override val extendedCheckRules: Seq[LogicalPlan => Unit] =
analyzer.extendedCheckRules
override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
analyzer.postHocResolutionRules
}
}
override protected def newBuilder: NewBuilder = new CarbonInMemorySessionStateBuilder(_, _)
}