blob: 18190e56a97c740434ac05f4f018d2745460222c [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import java.util
import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, MetadataCommand}
import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
import org.apache.spark.sql.execution.datasources.RefreshTable
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, SegmentFileStore}
import org.apache.carbondata.core.metadata.schema.SchemaReader
import org.apache.carbondata.core.metadata.schema.partition.PartitionType
import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, TableInfo}
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.path.CarbonTablePath
import{OperationContext, OperationListenerBus, RefreshTablePostExecutionEvent, RefreshTablePreExecutionEvent}
* Command to register carbon table from existing carbon table data
case class RefreshCarbonTableCommand(
databaseNameOp: Option[String],
tableName: String)
extends MetadataCommand {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
val databaseName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
setAuditTable(databaseName, tableName)
// Steps
// 1. Get Table Metadata from spark.
// 2 Perform below steps:
// 2.1 If table exists then check if provider if carbon. If yes then go for carbon
// refresh otherwise no need to do anything.
// 2.1.1 If table does not exists then consider the table as carbon and check for schema file
// existence.
// 2.2 register the table with the hive check if the table being registered has aggregate table
// then do the below steps
// 2.2.1 validate that all the aggregate tables are copied at the store location.
// 2.2.2 Register the aggregate tables
// 2.1 check if the table already register with hive then ignore and continue with the next
// schema
val isCarbonDataSource = try {
.getTableMetadata(TableIdentifier(tableName, databaseNameOp)))
} catch {
case _: NoSuchTableException =>
if (isCarbonDataSource) {
val tablePath = CarbonEnv.getTablePath(databaseNameOp, tableName.toLowerCase)(sparkSession)
val identifier = AbsoluteTableIdentifier.from(tablePath, databaseName, tableName.toLowerCase)
// check the existence of the schema file to know its a carbon table
val schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath)
// if schema file does not exist then the table will either non carbon table or stale
// carbon table
if (FileFactory.isFileExist(schemaFilePath)) {
// read TableInfo
val tableInfo = SchemaReader.getTableInfo(identifier)
// refresh the column schema in case of store before V3
// 2.2 register the table with the hive check if the table being registered has
// aggregate table then do the below steps
// 2.2.1 validate that all the aggregate tables are copied at the store location.
val dataMapSchemaList = tableInfo.getDataMapSchemaList
if (null != dataMapSchemaList && dataMapSchemaList.size() != 0) {
// validate all the aggregate tables are copied at the storeLocation
val allExists = validateAllAggregateTablePresent(databaseName,
dataMapSchemaList, sparkSession)
if (!allExists) {
// fail the register operation
val msg = s"Table registration with Database name [$databaseName] and Table name " +
s"[$tableName] failed. All the aggregate Tables for table [$tableName] is" +
s" not copied under database [$databaseName]"
throwMetadataException(databaseName, tableName, msg)
// 2.2.1 Register the aggregate tables to hive
registerAggregates(databaseName, dataMapSchemaList)(sparkSession)
registerTableWithHive(databaseName, tableName, tableInfo, tablePath)(sparkSession)
// Register partitions to hive metastore in case of hive partitioning carbon table
if (tableInfo.getFactTable.getPartitionInfo != null &&
tableInfo.getFactTable.getPartitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) {
registerAllPartitionsToHive(identifier, sparkSession)
RefreshTable(TableIdentifier(tableName, Option(databaseName))).run(sparkSession)
* Refresh the sort_column flag in column schema in case of old store. Before V3, sort_column
* option is not set but by default all dimension columns should be treated
* as sort columns if SORT_COLUMNS property is not defined in tblproperties
* @param tableInfo
def refreshColumnSchema(tableInfo: TableInfo): Unit = {
val tableProps: mutable.Map[String, String] = tableInfo.getFactTable.getTableProperties.asScala
val sortColumns = tableProps.get(CarbonCommonConstants.SORT_COLUMNS)
sortColumns match {
case Some(sortColumn) =>
// don't do anything
case None =>
// iterate over all the columns and make all the dimensions as sort columns true
// check for the complex data types parent and child columns to
// avoid adding them in SORT_COLUMNS
tableInfo.getFactTable.getListOfColumns.asScala collect
case columnSchema if columnSchema.isDimensionColumn &&
!columnSchema.getDataType.isComplexType &&
columnSchema.getSchemaOrdinal != -1 =>
* the method prepare the data type for raw column
* @param column
* @return
def prepareDataType(column: ColumnSchema): String = {
column.getDataType.getName.toLowerCase() match {
case "decimal" =>
"decimal(" + column.getPrecision + "," + column.getScale + ")"
case others =>
* The method register the carbon table with hive
* @param dbName
* @param tableName
* @param tableInfo
* @param sparkSession
* @return
def registerTableWithHive(dbName: String,
tableName: String,
tableInfo: TableInfo,
tablePath: String)(sparkSession: SparkSession): Any = {
val operationContext = new OperationContext
try {
val refreshTablePreExecutionEvent: RefreshTablePreExecutionEvent =
new RefreshTablePreExecutionEvent(sparkSession,
OperationListenerBus.getInstance.fireEvent(refreshTablePreExecutionEvent, operationContext)
CarbonCreateTableCommand(tableInfo, ifNotExistsSet = false, tableLocation = Some(tablePath))
} catch {
case e: AnalysisException => throw e
case e: Exception =>
throw e
val refreshTablePostExecutionEvent: RefreshTablePostExecutionEvent =
new RefreshTablePostExecutionEvent(sparkSession,
OperationListenerBus.getInstance.fireEvent(refreshTablePostExecutionEvent, operationContext)
* The method validate that all the aggregate table are physically present
* @param dataMapSchemaList
* @param sparkSession
def validateAllAggregateTablePresent(dbName: String, dataMapSchemaList: util.List[DataMapSchema],
sparkSession: SparkSession): Boolean = {
var fileExist = false
dataMapSchemaList.asScala.foreach(dataMap => {
val tableName = dataMap.getChildSchema.getTableName
val tablePath = CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession)
val schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath)
try {
fileExist = FileFactory.isFileExist(schemaFilePath)
} catch {
case e: Exception =>
fileExist = false
if (!fileExist) {
return fileExist;
* The method iterates over all the aggregate tables and register them to hive
* @param dataMapSchemaList
* @return
def registerAggregates(dbName: String,
dataMapSchemaList: util.List[DataMapSchema])(sparkSession: SparkSession): Any = {
dataMapSchemaList.asScala.foreach(dataMap => {
val tableName = dataMap.getChildSchema.getTableName
if (!sparkSession.sessionState.catalog.listTables(dbName)
.exists(_.table.equalsIgnoreCase(tableName))) {
val tablePath = CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession)
val absoluteTableIdentifier = AbsoluteTableIdentifier
.from(tablePath, dbName, tableName)
val tableInfo = SchemaReader.getTableInfo(absoluteTableIdentifier)
registerTableWithHive(dbName, tableName, tableInfo, tablePath)(sparkSession)
* Read all the partition information which is stored in each segment and add to
* the hive metastore
private def registerAllPartitionsToHive(
absIdentifier: AbsoluteTableIdentifier,
sparkSession: SparkSession): Unit = {
val metadataDetails =
// First read all partition information from each segment.
val allpartitions ={ metadata =>
if (metadata.getSegmentStatus == SegmentStatus.SUCCESS ||
metadata.getSegmentStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS) {
val mapper = new SegmentFileStore(absIdentifier.getTablePath, metadata.getSegmentFile)
val specs = { case(location, fd) =>
var updatedLoc =
if (fd.isRelative) {
absIdentifier.getTablePath + CarbonCommonConstants.FILE_SEPARATOR + location
} else {
new PartitionSpec(fd.getPartitions, updatedLoc)
} else {
val identifier =
TableIdentifier(absIdentifier.getTableName, Some(absIdentifier.getDatabaseName))
// Register the partition information to the hive metastore
allpartitions.foreach { segPartitions =>
val specs: Seq[(TablePartitionSpec, Option[String])] = { indexPartitions =>
({ p =>
val spec = p.split("=")
(spec(0), spec(1))
}.toMap, Some(indexPartitions.getLocation.toString))
// Add partition information
AlterTableAddPartitionCommand(identifier, specs, true).run(sparkSession)
override protected def opName: String = "REFRESH TABLE"