blob: 1d91458d0df5d7f3f7cffe10b255e59eca98c18e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command.management
import java.util
import scala.collection.JavaConverters._
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, MetadataCommand}
import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, SegmentFileStore}
import org.apache.carbondata.core.metadata.schema.SchemaReader
import org.apache.carbondata.core.metadata.schema.partition.PartitionType
import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, TableInfo}
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{OperationContext, OperationListenerBus, RefreshTablePostExecutionEvent, RefreshTablePreExecutionEvent}
/**
* Command to register carbon table from existing carbon table data
*/
case class RefreshCarbonTableCommand(
databaseNameOp: Option[String],
tableName: String)
extends MetadataCommand {
val LOGGER: LogService =
LogServiceFactory.getLogService(this.getClass.getName)
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
val databaseName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
// Steps
// 1. get table path
// 2. perform the below steps
// 2.1 check if the table already register with hive then ignore and continue with the next
// schema
// 2.2 register the table with the hive check if the table being registered has aggregate table
// then do the below steps
// 2.2.1 validate that all the aggregate tables are copied at the store location.
// 2.2.2 Register the aggregate tables
val tablePath = CarbonEnv.getTablePath(databaseNameOp, tableName)(sparkSession)
val identifier = AbsoluteTableIdentifier.from(tablePath, databaseName, tableName)
// 2.1 check if the table already register with hive then ignore and continue with the next
// schema
if (!sparkSession.sessionState.catalog.listTables(databaseName)
.exists(_.table.equalsIgnoreCase(tableName))) {
// check the existence of the schema file to know its a carbon table
val schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath)
// if schema file does not exist then the table will either non carbon table or stale
// carbon table
if (FileFactory.isFileExist(schemaFilePath, FileFactory.getFileType(schemaFilePath))) {
// read TableInfo
val tableInfo = SchemaReader.getTableInfo(identifier)
// 2.2 register the table with the hive check if the table being registered has
// aggregate table then do the below steps
// 2.2.1 validate that all the aggregate tables are copied at the store location.
val dataMapSchemaList = tableInfo.getDataMapSchemaList
if (null != dataMapSchemaList && dataMapSchemaList.size() != 0) {
// validate all the aggregate tables are copied at the storeLocation
val allExists = validateAllAggregateTablePresent(databaseName,
dataMapSchemaList, sparkSession)
if (!allExists) {
// fail the register operation
val msg = s"Table registration with Database name [$databaseName] and Table name " +
s"[$tableName] failed. All the aggregate Tables for table [$tableName] is" +
s" not copied under database [$databaseName]"
LOGGER.audit(msg)
throwMetadataException(databaseName, tableName, msg)
}
// 2.2.1 Register the aggregate tables to hive
registerAggregates(databaseName, dataMapSchemaList)(sparkSession)
}
registerTableWithHive(databaseName, tableName, tableInfo)(sparkSession)
// Register partitions to hive metastore in case of hive partitioning carbon table
if (tableInfo.getFactTable.getPartitionInfo != null &&
tableInfo.getFactTable.getPartitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) {
registerAllPartitionsToHive(identifier, sparkSession)
}
} else {
LOGGER.audit(
s"Table registration with Database name [$databaseName] and Table name [$tableName] " +
s"failed." +
s"Table [$tableName] either non carbon table or stale carbon table under database " +
s"[$databaseName]")
}
} else {
LOGGER.audit(
s"Table registration with Database name [$databaseName] and Table name [$tableName] " +
s"failed." +
s"Table [$tableName] either already exists or registered under database [$databaseName]")
}
// update the schema modified time
metaStore.updateAndTouchSchemasUpdatedTime()
Seq.empty
}
/**
* the method prepare the data type for raw column
*
* @param column
* @return
*/
def prepareDataType(column: ColumnSchema): String = {
column.getDataType.getName.toLowerCase() match {
case "decimal" =>
"decimal(" + column.getPrecision + "," + column.getScale + ")"
case others =>
others
}
}
/**
* The method register the carbon table with hive
*
* @param dbName
* @param tableName
* @param tableInfo
* @param sparkSession
* @return
*/
def registerTableWithHive(dbName: String,
tableName: String,
tableInfo: TableInfo)(sparkSession: SparkSession): Any = {
val operationContext = new OperationContext
try {
val refreshTablePreExecutionEvent: RefreshTablePreExecutionEvent =
new RefreshTablePreExecutionEvent(sparkSession,
tableInfo.getOrCreateAbsoluteTableIdentifier())
OperationListenerBus.getInstance.fireEvent(refreshTablePreExecutionEvent, operationContext)
CarbonCreateTableCommand(tableInfo, ifNotExistsSet = false).run(sparkSession)
LOGGER.audit(s"Table registration with Database name [$dbName] and Table name " +
s"[$tableName] is successful.")
} catch {
case e: AnalysisException => throw e
case e: Exception =>
throw e
}
val refreshTablePostExecutionEvent: RefreshTablePostExecutionEvent =
new RefreshTablePostExecutionEvent(sparkSession,
tableInfo.getOrCreateAbsoluteTableIdentifier())
OperationListenerBus.getInstance.fireEvent(refreshTablePostExecutionEvent, operationContext)
}
/**
* The method validate that all the aggregate table are physically present
*
* @param dataMapSchemaList
* @param sparkSession
*/
def validateAllAggregateTablePresent(dbName: String, dataMapSchemaList: util.List[DataMapSchema],
sparkSession: SparkSession): Boolean = {
var fileExist = false
dataMapSchemaList.asScala.foreach(dataMap => {
val tableName = dataMap.getChildSchema.getTableName
val tablePath = CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession)
val schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath)
try {
fileExist = FileFactory.isFileExist(schemaFilePath, FileFactory.getFileType(schemaFilePath))
} catch {
case e: Exception =>
fileExist = false
}
if (!fileExist) {
return fileExist;
}
})
true
}
/**
* The method iterates over all the aggregate tables and register them to hive
*
* @param dataMapSchemaList
* @return
*/
def registerAggregates(dbName: String,
dataMapSchemaList: util.List[DataMapSchema])(sparkSession: SparkSession): Any = {
val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
dataMapSchemaList.asScala.foreach(dataMap => {
val tableName = dataMap.getChildSchema.getTableName
if (!sparkSession.sessionState.catalog.listTables(dbName)
.exists(_.table.equalsIgnoreCase(tableName))) {
val tablePath = CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession)
val absoluteTableIdentifier = AbsoluteTableIdentifier
.from(tablePath, dbName, tableName)
val tableInfo = SchemaReader.getTableInfo(absoluteTableIdentifier)
registerTableWithHive(dbName, tableName, tableInfo)(sparkSession)
}
})
}
/**
* Read all the partition information which is stored in each segment and add to
* the hive metastore
*/
private def registerAllPartitionsToHive(
absIdentifier: AbsoluteTableIdentifier,
sparkSession: SparkSession): Unit = {
val metadataDetails =
SegmentStatusManager.readLoadMetadata(
CarbonTablePath.getMetadataPath(absIdentifier.getTablePath))
// First read all partition information from each segment.
val allpartitions = metadataDetails.map{ metadata =>
if (metadata.getSegmentStatus == SegmentStatus.SUCCESS ||
metadata.getSegmentStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS) {
val mapper = new SegmentFileStore(absIdentifier.getTablePath, metadata.getSegmentFile)
val specs = mapper.getLocationMap.asScala.map { case(location, fd) =>
var updatedLoc =
if (fd.isRelative) {
absIdentifier.getTablePath + CarbonCommonConstants.FILE_SEPARATOR + location
} else {
location
}
new PartitionSpec(fd.getPartitions, updatedLoc)
}
Some(specs)
} else {
None
}
}.filter(_.isDefined).map(_.get)
val identifier =
TableIdentifier(absIdentifier.getTableName, Some(absIdentifier.getDatabaseName))
// Register the partition information to the hive metastore
allpartitions.foreach { segPartitions =>
val specs: Seq[(TablePartitionSpec, Option[String])] = segPartitions.map { indexPartitions =>
(indexPartitions.getPartitions.asScala.map{ p =>
val spec = p.split("=")
(spec(0), spec(1))
}.toMap, Some(indexPartitions.getLocation.toString))
}.toSeq
// Add partition information
AlterTableAddPartitionCommand(identifier, specs, true).run(sparkSession)
}
}
}