blob: f6a54755f514fae29774764b955c745bff458d72 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{CarbonToSparkAdapter, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
object CarbonSessionCatalogUtil {
/**
* Method used to update the table name
* @param oldTableIdentifier old table identifier
* @param newTableIdentifier new table identifier
* @param newTablePath new table path
*/
def alterTableRename(
oldTableIdentifier: TableIdentifier,
newTableIdentifier: TableIdentifier,
newTablePath: String,
sparkSession: SparkSession,
isExternal: Boolean
): Unit = {
if (!isExternal) {
getClient(sparkSession).runSqlHive(
s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ oldTableIdentifier.table } " +
s"SET TBLPROPERTIES('EXTERNAL'='TRUE')")
}
getClient(sparkSession).runSqlHive(
s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ oldTableIdentifier.table } " +
s"RENAME TO ${ newTableIdentifier.database.get }.${ newTableIdentifier.table }")
if (!isExternal) {
getClient(sparkSession).runSqlHive(
s"ALTER TABLE ${ newTableIdentifier.database.get }.${ newTableIdentifier.table } " +
s"SET TBLPROPERTIES('EXTERNAL'='FALSE')")
}
getClient(sparkSession).runSqlHive(
s"ALTER TABLE ${ newTableIdentifier.database.get }.${ newTableIdentifier.table } " +
s"SET SERDEPROPERTIES" +
s"('tableName'='${ newTableIdentifier.table }', " +
s"'dbName'='${ newTableIdentifier.database.get }', 'tablePath'='${ newTablePath }')")
}
/**
* Below method will be used to update serd properties
* @param tableIdentifier table identifier
* @param schemaParts schema parts
* @param cols cols
*/
def alterTable(tableIdentifier: TableIdentifier,
schemaParts: String,
cols: Option[Seq[ColumnSchema]],
sparkSession: SparkSession): Unit = {
getClient(sparkSession)
.runSqlHive(s"ALTER TABLE ${ tableIdentifier.database.get }.${ tableIdentifier.table } " +
s"SET TBLPROPERTIES(${ schemaParts })")
}
def alterTableProperties(
sparkSession: SparkSession,
tableIdentifier: TableIdentifier,
properties: Map[String, String],
propKeys: Seq[String]
): Unit = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableIdentifier)
var newProperties = table.storage.properties
if (!propKeys.isEmpty) {
val updatedPropKeys = propKeys.map(_.toLowerCase)
newProperties = newProperties.filter { case (k, _) => !updatedPropKeys.contains(k) }
}
if (!properties.isEmpty) {
newProperties = newProperties ++ CarbonSparkSqlParserUtil.normalizeProperties(properties)
}
val newTable = table.copy(
storage = table.storage.copy(properties = newProperties)
)
catalog.alterTable(newTable)
}
/**
* returns hive client from HiveExternalCatalog
*
* @return
*/
def getClient(sparkSession: SparkSession): org.apache.spark.sql.hive.client.HiveClient = {
// For Spark2.2 we need to use unified Spark thrift server instead of carbon thrift
// server. CarbonSession is not available anymore so HiveClient is created directly
// using sparkSession.sharedState which internally contains all required carbon rules,
// optimizers pluged-in through SessionStateBuilder in spark-defaults.conf.
// spark.sql.session.state.builder=org.apache.spark.sql.hive.CarbonSessionStateBuilder
CarbonToSparkAdapter.getHiveExternalCatalog(sparkSession).client
}
def alterAddColumns(tableIdentifier: TableIdentifier,
schemaParts: String,
cols: Option[Seq[ColumnSchema]], sparkSession: SparkSession): Unit = {
updateCatalogTableForAlter(tableIdentifier, schemaParts, cols, sparkSession)
}
def alterDropColumns(tableIdentifier: TableIdentifier,
schemaParts: String,
cols: Option[Seq[ColumnSchema]], sparkSession: SparkSession): Unit = {
updateCatalogTableForAlter(tableIdentifier, schemaParts, cols, sparkSession)
}
def alterColumnChangeDataTypeOrRename(tableIdentifier: TableIdentifier,
schemaParts: String,
cols: Option[Seq[ColumnSchema]], sparkSession: SparkSession): Unit = {
updateCatalogTableForAlter(tableIdentifier, schemaParts, cols, sparkSession)
}
/**
* This method alters table to set serde properties and updates the catalog table with new updated
* schema for all the alter operations like add column, drop column, change datatype or rename
* column
* @param tableIdentifier
* @param schemaParts
* @param cols
*/
private def updateCatalogTableForAlter(tableIdentifier: TableIdentifier,
schemaParts: String,
cols: Option[Seq[ColumnSchema]],
sparkSession: SparkSession): Unit = {
alterTable(tableIdentifier, schemaParts, cols, sparkSession)
CarbonSessionUtil.alterExternalCatalogForTableWithUpdatedSchema(
tableIdentifier, cols, schemaParts, sparkSession)
}
/**
* This is alternate way of getting partition information. It first fetches all partitions from
* hive and then apply filter instead of querying hive along with filters.
* @param partitionFilters
* @param sparkSession
* @param identifier
* @return
*/
def getPartitionsAlternate(
partitionFilters: Seq[Expression],
sparkSession: SparkSession,
identifier: TableIdentifier): Seq[CatalogTablePartition] = {
CarbonSessionUtil.prunePartitionsByFilter(partitionFilters, sparkSession, identifier)
}
/**
* Update the storageformat with new location information
*/
def updateStorageLocation(
path: Path,
storage: CatalogStorageFormat,
newTableName: String,
dbName: String): CatalogStorageFormat = {
storage.copy(locationUri = Some(path.toUri))
}
}