blob: 9814cc2bfa3fd26a75d0cf57cce12a053a253946 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command
import java.io.File
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
import scala.language.implicitConversions
import org.apache.commons.lang3.StringUtils
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{RunnableCommand, SparkPlan}
import org.apache.spark.sql.hive.CarbonMetastore
import org.apache.spark.sql.types.TimestampType
import org.apache.spark.util.{CausedBy, FileUtils}
import org.codehaus.jackson.map.ObjectMapper
import org.apache.carbondata.api.CarbonStore
import org.apache.carbondata.common.constants.LoggerAction
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.dictionary.server.DictionaryServer
import org.apache.carbondata.core.exception.InvalidConfigurationException
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.TableInfo
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.processing.exception.DataLoadingException
import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
import org.apache.carbondata.processing.loading.exception.NoRetryException
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
import org.apache.carbondata.processing.util.TableOptionConstant
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.load.ValidateUtil
import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DataManagementFunc, DictionaryLoadModel}
import org.apache.carbondata.spark.util.{CommonUtil, GlobalDictionaryUtil}
object Checker {
def validateTableExists(
dbName: Option[String],
tableName: String,
sqlContext: SQLContext): Unit = {
val identifier = TableIdentifier(tableName, dbName)
if (!CarbonEnv.get.carbonMetastore.tableExists(identifier)(sqlContext)) {
val err = s"table $dbName.$tableName not found"
LogServiceFactory.getLogService(this.getClass.getName).error(err)
throw new IllegalArgumentException(err)
}
}
}
/**
* Command for show table partitions Command
*
* @param tableIdentifier
*/
private[sql] case class ShowCarbonPartitionsCommand(
tableIdentifier: TableIdentifier) extends RunnableCommand {
val LOGGER = LogServiceFactory.getLogService(ShowCarbonPartitionsCommand.getClass.getName)
override val output = CommonUtil.partitionInfoOutput
override def run(sqlContext: SQLContext): Seq[Row] = {
val relation = CarbonEnv.get.carbonMetastore
.lookupRelation1(tableIdentifier)(sqlContext).
asInstanceOf[CarbonRelation]
val carbonTable = relation.tableMeta.carbonTable
var tableName = carbonTable.getFactTableName
var partitionInfo = carbonTable.getPartitionInfo(
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName)
if (partitionInfo == null) {
throw new AnalysisException(
s"SHOW PARTITIONS is not allowed on a table that is not partitioned: $tableName")
}
var partitionType = partitionInfo.getPartitionType
var columnName = partitionInfo.getColumnSchemaList.get(0).getColumnName
LOGGER.info("partition column name:" + columnName)
CommonUtil.getPartitionInfo(columnName, partitionType, partitionInfo)
}
}
/**
* Command for the compaction in alter table command
*
* @param alterTableModel
*/
private[sql] case class AlterTableCompaction(alterTableModel: AlterTableModel) extends
RunnableCommand {
def run(sqlContext: SQLContext): Seq[Row] = {
// TODO : Implement it.
val tableName = alterTableModel.tableName
val databaseName = getDB.getDatabaseName(alterTableModel.dbName, sqlContext)
if (null == CarbonMetadata.getInstance.getCarbonTable(databaseName + "_" + tableName)) {
logError(s"alter table failed. table not found: $databaseName.$tableName")
sys.error(s"alter table failed. table not found: $databaseName.$tableName")
}
val relation =
CarbonEnv.get.carbonMetastore
.lookupRelation1(Option(databaseName), tableName)(sqlContext)
.asInstanceOf[CarbonRelation]
if (relation == null) {
sys.error(s"Table $databaseName.$tableName does not exist")
}
val carbonLoadModel = new CarbonLoadModel()
val table = relation.tableMeta.carbonTable
carbonLoadModel.setTableName(table.getFactTableName)
val dataLoadSchema = new CarbonDataLoadSchema(table)
// Need to fill dimension relation
carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
carbonLoadModel.setStorePath(relation.tableMeta.storePath)
var storeLocation = CarbonProperties.getInstance
.getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
System.getProperty("java.io.tmpdir")
)
storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
try {
CarbonDataRDDFactory.alterTableForCompaction(sqlContext,
alterTableModel,
carbonLoadModel,
relation.tableMeta.storePath,
storeLocation
)
} catch {
case e: Exception =>
if (null != e.getMessage) {
sys.error(s"Compaction failed. Please check logs for more info. ${ e.getMessage }")
} else {
sys.error("Exception in compaction. Please check logs for more info.")
}
}
Seq.empty
}
}
case class CreateTable(cm: TableModel) extends RunnableCommand {
def run(sqlContext: SQLContext): Seq[Row] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
cm.databaseName = getDB.getDatabaseName(cm.databaseNameOp, sqlContext)
val tbName = cm.tableName
val dbName = cm.databaseName
LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tbName]")
val tableInfo: TableInfo = TableNewProcessor(cm)
// Add validation for sort scope when create table
val sortScope = tableInfo.getFactTable.getTableProperties
.getOrDefault("sort_scope", CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
if (!CarbonUtil.isValidSortOption(sortScope)) {
throw new InvalidConfigurationException(s"Passing invalid SORT_SCOPE '$sortScope'," +
s" valid SORT_SCOPE are 'NO_SORT', 'BATCH_SORT', 'LOCAL_SORT' and 'GLOBAL_SORT' ")
}
if (tableInfo.getFactTable.getListOfColumns.isEmpty) {
sys.error("No Dimensions found. Table should have at least one dimesnion !")
}
if (sqlContext.tableNames(dbName).exists(_.equalsIgnoreCase(tbName))) {
if (!cm.ifNotExistsSet) {
LOGGER.audit(
s"Table creation with Database name [$dbName] and Table name [$tbName] failed. " +
s"Table [$tbName] already exists under database [$dbName]")
sys.error(s"Table [$tbName] already exists under database [$dbName]")
}
} else {
// Add Database to catalog and persist
val catalog = CarbonEnv.get.carbonMetastore
// Need to fill partitioner class when we support partition
val tablePath = catalog.createTableFromThrift(tableInfo, dbName, tbName, null)(sqlContext)
try {
sqlContext.sql(
s"""CREATE TABLE $dbName.$tbName USING carbondata""" +
s""" OPTIONS (tableName "$dbName.$tbName", tablePath "$tablePath") """)
.collect
} catch {
case e: Exception =>
val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
// call the drop table to delete the created table.
CarbonEnv.get.carbonMetastore
.dropTable(catalog.storePath, identifier)(sqlContext)
LOGGER.audit(s"Table creation with Database name [$dbName] " +
s"and Table name [$tbName] failed")
throw e
}
LOGGER.audit(s"Table created with Database name [$dbName] and Table name [$tbName]")
}
Seq.empty
}
def setV(ref: Any, name: String, value: Any): Unit = {
ref.getClass.getFields.find(_.getName == name).get
.set(ref, value.asInstanceOf[AnyRef])
}
}
private[sql] case class DeleteLoadsById(
loadids: Seq[String],
databaseNameOp: Option[String],
tableName: String) extends RunnableCommand {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
def run(sqlContext: SQLContext): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sqlContext)
val carbonTable = CarbonEnv.get.carbonMetastore.lookupRelation1(databaseNameOp,
tableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable
CarbonStore.deleteLoadById(
loadids,
getDB.getDatabaseName(databaseNameOp, sqlContext),
tableName,
carbonTable
)
Seq.empty
}
// validates load ids
private def validateLoadIds: Unit = {
if (loadids.isEmpty) {
val errorMessage = "Error: Segment id(s) should not be empty."
throw new MalformedCarbonCommandException(errorMessage)
}
}
}
private[sql] case class DeleteLoadsByLoadDate(
databaseNameOp: Option[String],
tableName: String,
dateField: String,
loadDate: String) extends RunnableCommand {
val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.tablemodel.tableSchema")
def run(sqlContext: SQLContext): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sqlContext)
val carbonTable = CarbonEnv.get.carbonMetastore.lookupRelation1(databaseNameOp,
tableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable
CarbonStore.deleteLoadByDate(
loadDate,
getDB.getDatabaseName(databaseNameOp, sqlContext),
tableName,
carbonTable
)
Seq.empty
}
}
object LoadTable {
def updateTableMetadata(carbonLoadModel: CarbonLoadModel,
sqlContext: SQLContext,
model: DictionaryLoadModel,
noDictDimension: Array[CarbonDimension]): Unit = {
val carbonTablePath = CarbonStorePath.getCarbonTablePath(model.hdfsLocation,
model.table)
val schemaFilePath = carbonTablePath.getSchemaFilePath
// read TableInfo
val tableInfo = CarbonMetastore.readSchemaFileToThriftTable(schemaFilePath)
// modify TableInfo
val columns = tableInfo.getFact_table.getTable_columns
for (i <- 0 until columns.size) {
if (noDictDimension.exists(x => columns.get(i).getColumn_id.equals(x.getColumnId))) {
columns.get(i).encoders.remove(org.apache.carbondata.format.Encoding.DICTIONARY)
}
}
// write TableInfo
CarbonMetastore.writeThriftTableToSchemaFile(schemaFilePath, tableInfo)
// update Metadata
val catalog = CarbonEnv.get.carbonMetastore
catalog.updateMetadataByThriftTable(schemaFilePath, tableInfo,
model.table.getDatabaseName, model.table.getTableName, carbonLoadModel.getStorePath)
// update CarbonDataLoadSchema
val carbonTable = catalog.lookupRelation1(Option(model.table.getDatabaseName),
model.table.getTableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable
carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
}
}
private[sql] case class LoadTableByInsert(relation: CarbonDatasourceRelation,
child: LogicalPlan, isOverwriteExist: Boolean) extends RunnableCommand {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
def run(sqlContext: SQLContext): Seq[Row] = {
val df = new DataFrame(sqlContext, child)
val header = relation.carbonRelation.output.map(_.name).mkString(",")
val load = LoadTable(
Some(relation.carbonRelation.databaseName),
relation.carbonRelation.tableName,
null,
Seq(),
scala.collection.immutable.Map("fileheader" -> header),
isOverwriteExist,
null,
Some(df)).run(sqlContext)
// updating relation metadata. This is in case of auto detect high cardinality
relation.carbonRelation.metaData =
CarbonSparkUtil.createSparkMeta(relation.carbonRelation.tableMeta.carbonTable)
load
}
}
case class LoadTable(
databaseNameOp: Option[String],
tableName: String,
factPathFromUser: String,
dimFilesPath: Seq[DataLoadTableFileMapping],
options: scala.collection.immutable.Map[String, String],
isOverwriteExist: Boolean,
var inputSqlString: String = null,
dataFrame: Option[DataFrame] = None,
updateModel: Option[UpdateTableModel] = None) extends RunnableCommand {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
private def checkDefaultValue(value: String, default: String) = if (StringUtils.isEmpty(value)) {
default
} else {
value
}
def run(sqlContext: SQLContext): Seq[Row] = {
if (dataFrame.isDefined && !updateModel.isDefined) {
val rdd = dataFrame.get.rdd
if (rdd.partitions == null || rdd.partitions.length == 0) {
LOGGER.warn("DataLoading finished. No data was loaded.")
return Seq.empty
}
}
val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext)
if (null == CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)) {
logError(s"Data loading failed. table not found: $dbName.$tableName")
LOGGER.audit(s"Data loading failed. table not found: $dbName.$tableName")
sys.error(s"Data loading failed. table not found: $dbName.$tableName")
}
val relation = CarbonEnv.get.carbonMetastore
.lookupRelation1(Option(dbName), tableName)(sqlContext)
.asInstanceOf[CarbonRelation]
if (relation == null) {
sys.error(s"Table $dbName.$tableName does not exist")
}
CarbonProperties.getInstance().addProperty("zookeeper.enable.lock", "false")
val carbonLock = CarbonLockFactory
.getCarbonLockObj(relation.tableMeta.carbonTable.getAbsoluteTableIdentifier
.getCarbonTableIdentifier,
LockUsage.METADATA_LOCK
)
try {
// take lock only in case of normal data load.
if (!updateModel.isDefined) {
if (carbonLock.lockWithRetries()) {
logInfo("Successfully able to get the table metadata file lock")
} else {
sys.error("Table is locked for updation. Please try after some time")
}
}
val factPath = if (dataFrame.isDefined) {
""
} else {
FileUtils.getPaths(
CarbonUtil.checkAndAppendHDFSUrl(factPathFromUser))
}
val carbonLoadModel = new CarbonLoadModel()
carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
carbonLoadModel.setStorePath(relation.tableMeta.storePath)
val table = relation.tableMeta.carbonTable
carbonLoadModel.setTableName(table.getFactTableName)
val dataLoadSchema = new CarbonDataLoadSchema(table)
// Need to fill dimension relation
carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
val partitionLocation = relation.tableMeta.storePath + "/partition/" +
relation.tableMeta.carbonTableIdentifier.getDatabaseName + "/" +
relation.tableMeta.carbonTableIdentifier.getTableName + "/"
val columnar = sqlContext.getConf("carbon.is.columnar.storage", "true").toBoolean
val delimiter = options.getOrElse("delimiter", ",")
val quoteChar = options.getOrElse("quotechar", "\"")
var fileHeader = options.getOrElse("fileheader", "")
val escapeChar = options.getOrElse("escapechar", "\\")
val commentchar = options.getOrElse("commentchar", "#")
val columnDict = options.getOrElse("columndict", null)
val serializationNullFormat = options.getOrElse("serialization_null_format", "\\N")
val badRecordsLoggerEnable = options.getOrElse("bad_records_logger_enable", "false")
val badRecordActionValue = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
val badRecordsAction = options.getOrElse("bad_records_action", badRecordActionValue)
val isEmptyDataBadRecord = options.getOrElse("is_empty_data_bad_record", "false")
val allDictionaryPath = options.getOrElse("all_dictionary_path", "")
val complex_delimiter_level_1 = options.getOrElse("complex_delimiter_level_1", "\\$")
val complex_delimiter_level_2 = options.getOrElse("complex_delimiter_level_2", "\\:")
val dateFormat = options.getOrElse("dateformat", null)
ValidateUtil.validateDateFormat(dateFormat, table, tableName)
val maxColumns = options.getOrElse("maxcolumns", null)
val tableProperties = table.getTableInfo.getFactTable.getTableProperties
val sortScopeDefault = CarbonProperties.getInstance().
getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
val sortScope = if (null == tableProperties) {
sortScopeDefault
} else {
tableProperties.getOrDefault("sort_scope", sortScopeDefault)
}
ValidateUtil.validateSortScope(table, sortScope)
val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
val batchSortSizeInMB = options.getOrElse("batch_sort_size_inmb", carbonProperty
.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
carbonProperty.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT)))
val globalSortPartitions = options.getOrElse("global_sort_partitions", carbonProperty
.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS, null))
ValidateUtil.validateGlobalSortPartitions(globalSortPartitions)
// if there isn't file header in csv file and load sql doesn't provide FILEHEADER option,
// we should use table schema to generate file header.
val headerOption = options.get("header")
if (headerOption.isDefined) {
// whether the csv file has file header
// the default value is true
val header = try {
headerOption.get.toBoolean
} catch {
case ex: IllegalArgumentException =>
throw new MalformedCarbonCommandException(
"'header' option should be either 'true' or 'false'. " + ex.getMessage)
}
header match {
case true =>
if (fileHeader.nonEmpty) {
throw new MalformedCarbonCommandException(
"When 'header' option is true, 'fileheader' option is not required.")
}
case false =>
// generate file header
if (fileHeader.isEmpty) {
fileHeader = table.getCreateOrderColumn(table.getFactTableName)
.asScala.map(_.getColName).mkString(",")
}
}
}
val bad_record_path = options.getOrElse("bad_record_path",
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
if (badRecordsLoggerEnable.toBoolean ||
LoggerAction.REDIRECT.name().equalsIgnoreCase(badRecordsAction)) {
if (!CarbonUtil.isValidBadStorePath(bad_record_path)) {
sys.error("Invalid bad records location.")
}
}
carbonLoadModel.setBadRecordsLocation(bad_record_path)
carbonLoadModel.setEscapeChar(checkDefaultValue(escapeChar, "\\"))
carbonLoadModel.setQuoteChar(checkDefaultValue(quoteChar, "\""))
carbonLoadModel.setCommentChar(checkDefaultValue(commentchar, "#"))
carbonLoadModel.setDateFormat(dateFormat)
carbonLoadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
carbonLoadModel.setDefaultDateFormat(CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.CARBON_DATE_FORMAT,
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
carbonLoadModel
.setSerializationNullFormat(
TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + "," + serializationNullFormat)
carbonLoadModel
.setBadRecordsLoggerEnable(
TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + "," + badRecordsLoggerEnable)
carbonLoadModel
.setBadRecordsAction(
TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + badRecordsAction)
carbonLoadModel
.setIsEmptyDataBadRecord(
DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + isEmptyDataBadRecord)
carbonLoadModel.setSortScope(sortScope)
carbonLoadModel.setBatchSortSizeInMb(batchSortSizeInMB)
carbonLoadModel.setGlobalSortPartitions(globalSortPartitions)
// when single_pass=true, and not use all dict
val useOnePass = options.getOrElse("single_pass", "false").trim.toLowerCase match {
case "true" =>
true
case "false" =>
// when single_pass = false and if either alldictionary
// or columnDict is configured the do not allow load
if (StringUtils.isNotEmpty(allDictionaryPath) || StringUtils.isNotEmpty(columnDict)) {
throw new MalformedCarbonCommandException(
"Can not use all_dictionary_path or columndict without single_pass.")
} else {
false
}
case illegal =>
LOGGER.error(s"Can't use single_pass, because illegal syntax found: [" + illegal + "] " +
"Please set it as 'true' or 'false'")
false
}
carbonLoadModel.setUseOnePass(useOnePass)
if (delimiter.equalsIgnoreCase(complex_delimiter_level_1) ||
complex_delimiter_level_1.equalsIgnoreCase(complex_delimiter_level_2) ||
delimiter.equalsIgnoreCase(complex_delimiter_level_2)) {
sys.error(s"Field Delimiter & Complex types delimiter are same")
}
else {
carbonLoadModel.setComplexDelimiterLevel1(
CarbonUtil.delimiterConverter(complex_delimiter_level_1))
carbonLoadModel.setComplexDelimiterLevel2(
CarbonUtil.delimiterConverter(complex_delimiter_level_2))
}
// set local dictionary path, and dictionary file extension
carbonLoadModel.setAllDictPath(allDictionaryPath)
val partitionStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
try {
// First system has to partition the data first and then call the load data
LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
carbonLoadModel.setFactFilePath(factPath)
carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimiter))
carbonLoadModel.setCsvHeader(fileHeader)
carbonLoadModel.setColDictFilePath(columnDict)
carbonLoadModel.setDirectLoad(true)
carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel))
val validatedMaxColumns = CommonUtil.validateMaxColumns(carbonLoadModel.getCsvHeaderColumns,
maxColumns)
carbonLoadModel.setMaxColumns(validatedMaxColumns.toString)
GlobalDictionaryUtil.updateTableMetadataFunc = updateTableMetadata
val storePath = relation.tableMeta.storePath
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier
.getCarbonTableIdentifier
val carbonTablePath = CarbonStorePath
.getCarbonTablePath(storePath, carbonTableIdentifier)
val dictFolderPath = carbonTablePath.getMetadataDirectoryPath
val dimensions = carbonTable.getDimensionByTableName(
carbonTable.getFactTableName).asScala.toArray
// add the start entry for the new load in the table status file
if (!updateModel.isDefined) {
CommonUtil.
readAndUpdateLoadProgressInTableMeta(carbonLoadModel, storePath, isOverwriteExist)
}
if (isOverwriteExist) {
LOGGER.info(s"Overwrite is in progress for carbon table with $dbName.$tableName")
}
if (null == carbonLoadModel.getLoadMetadataDetails) {
CommonUtil.readLoadMetadataDetails(carbonLoadModel)
}
if (carbonLoadModel.getLoadMetadataDetails.isEmpty && carbonLoadModel.getUseOnePass &&
StringUtils.isEmpty(columnDict) && StringUtils.isEmpty(allDictionaryPath)) {
LOGGER.info(s"Cannot use single_pass=true for $dbName.$tableName during the first load")
LOGGER.audit(s"Cannot use single_pass=true for $dbName.$tableName during the first load")
carbonLoadModel.setUseOnePass(false)
}
if (carbonLoadModel.getUseOnePass) {
val colDictFilePath = carbonLoadModel.getColDictFilePath
if (!StringUtils.isEmpty(colDictFilePath)) {
carbonLoadModel.initPredefDictMap()
// generate predefined dictionary
GlobalDictionaryUtil
.generatePredefinedColDictionary(colDictFilePath, carbonTableIdentifier,
dimensions, carbonLoadModel, sqlContext, storePath, dictFolderPath)
}
val allDictPath: String = carbonLoadModel.getAllDictPath
if(!StringUtils.isEmpty(allDictPath)) {
carbonLoadModel.initPredefDictMap()
GlobalDictionaryUtil
.generateDictionaryFromDictionaryFiles(sqlContext,
carbonLoadModel,
storePath,
carbonTableIdentifier,
dictFolderPath,
dimensions,
allDictionaryPath)
}
// dictionaryServerClient dictionary generator
val dictionaryServerPort = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.DICTIONARY_SERVER_PORT,
CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT)
val sparkDriverHost = sqlContext.sparkContext.getConf.get("spark.driver.host")
carbonLoadModel.setDictionaryServerHost(sparkDriverHost)
// start dictionary server when use one pass load and dimension with DICTIONARY
// encoding is present.
val allDimensions = table.getAllDimensions.asScala.toList
val createDictionary = allDimensions.exists {
carbonDimension => carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
!carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)
}
val server: Option[DictionaryServer] = if (createDictionary) {
val dictionaryServer = DictionaryServer
.getInstance(dictionaryServerPort.toInt, carbonTable)
carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort)
sqlContext.sparkContext.addSparkListener(new SparkListener() {
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
dictionaryServer.shutdown()
}
})
Some(dictionaryServer)
} else {
None
}
CarbonDataRDDFactory.loadCarbonData(sqlContext,
carbonLoadModel,
relation.tableMeta.storePath,
columnar,
partitionStatus,
server,
isOverwriteExist,
dataFrame,
updateModel)
} else {
val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) {
val fields = dataFrame.get.schema.fields
import org.apache.spark.sql.functions.udf
// extracting only segment from tupleId
val getSegIdUDF = udf((tupleId: String) =>
CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID))
// getting all fields except tupleId field as it is not required in the value
var otherFields = fields.toSeq
.filter(field => !field.name
.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
.map(field => {
if (field.name.endsWith(CarbonCommonConstants.UPDATED_COL_EXTENSION) && false) {
new Column(field.name
.substring(0,
field.name.lastIndexOf(CarbonCommonConstants.UPDATED_COL_EXTENSION)))
} else {
new Column(field.name)
}
})
// extract tupleId field which will be used as a key
val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute
.quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).
as(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_SEGMENTID)
// use dataFrameWithoutTupleId as dictionaryDataFrame
val dataFrameWithoutTupleId = dataFrame.get.select(otherFields: _*)
otherFields = otherFields :+ segIdColumn
// use dataFrameWithTupleId as loadDataFrame
val dataFrameWithTupleId = dataFrame.get.select(otherFields: _*)
(Some(dataFrameWithoutTupleId), Some(dataFrameWithTupleId))
} else {
(dataFrame, dataFrame)
}
GlobalDictionaryUtil
.generateGlobalDictionary(sqlContext, carbonLoadModel, relation.tableMeta.storePath,
dictionaryDataFrame)
CarbonDataRDDFactory.loadCarbonData(sqlContext,
carbonLoadModel,
relation.tableMeta.storePath,
columnar,
partitionStatus,
None,
isOverwriteExist,
loadDataFrame,
updateModel)
}
} catch {
case CausedBy(ex: NoRetryException) =>
LOGGER.error(ex, s"Dataload failure for $dbName.$tableName")
throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}")
case ex: Exception =>
LOGGER.error(ex)
LOGGER.audit(s"Dataload failure for $dbName.$tableName. Please check the logs")
throw ex
} finally {
// Once the data load is successful delete the unwanted partition files
try {
val fileType = FileFactory.getFileType(partitionLocation)
if (FileFactory.isFileExist(partitionLocation, fileType)) {
val file = FileFactory
.getCarbonFile(partitionLocation, fileType)
CarbonUtil.deleteFoldersAndFiles(file)
}
} catch {
case ex: Exception =>
LOGGER.error(ex)
LOGGER.audit(s"Dataload failure for $dbName.$tableName. " +
"Problem deleting the partition folder")
throw ex
}
}
} catch {
case dle: DataLoadingException =>
LOGGER.audit(s"Dataload failed for $dbName.$tableName. " + dle.getMessage)
throw dle
case mce: MalformedCarbonCommandException =>
LOGGER.audit(s"Dataload failed for $dbName.$tableName. " + mce.getMessage)
throw mce
} finally {
if (carbonLock != null) {
if (carbonLock.unlock()) {
logInfo("Table MetaData Unlocked Successfully after data load")
} else {
logError("Unable to unlock Table MetaData")
}
}
}
Seq.empty
}
private def updateTableMetadata(carbonLoadModel: CarbonLoadModel,
sqlContext: SQLContext,
model: DictionaryLoadModel,
noDictDimension: Array[CarbonDimension]): Unit = {
val carbonTablePath = CarbonStorePath.getCarbonTablePath(model.hdfsLocation,
model.table)
val schemaFilePath = carbonTablePath.getSchemaFilePath
// read TableInfo
val tableInfo = CarbonMetastore.readSchemaFileToThriftTable(schemaFilePath)
// modify TableInfo
val columns = tableInfo.getFact_table.getTable_columns
for (i <- 0 until columns.size) {
if (noDictDimension.exists(x => columns.get(i).getColumn_id.equals(x.getColumnId))) {
columns.get(i).encoders.remove(org.apache.carbondata.format.Encoding.DICTIONARY)
}
}
// write TableInfo
CarbonMetastore.writeThriftTableToSchemaFile(schemaFilePath, tableInfo)
val catalog = CarbonEnv.get.carbonMetastore
// upate the schema modified time
catalog.updateSchemasUpdatedTime(catalog.touchSchemaFileSystemTime(
carbonLoadModel.getDatabaseName,
carbonLoadModel.getTableName))
// update Metadata
catalog.updateMetadataByThriftTable(schemaFilePath, tableInfo,
model.table.getDatabaseName, model.table.getTableName, carbonLoadModel.getStorePath)
// update CarbonDataLoadSchema
val carbonTable = catalog.lookupRelation1(Option(model.table.getDatabaseName),
model.table.getTableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable
carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
}
}
private[sql] case class DropTableCommand(ifExistsSet: Boolean, databaseNameOp: Option[String],
tableName: String)
extends RunnableCommand {
def run(sqlContext: SQLContext): Seq[Row] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext)
val identifier = TableIdentifier(tableName, Option(dbName))
val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "")
val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK)
val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer()
val catalog = CarbonEnv.get.carbonMetastore
val storePath = catalog.storePath
try {
locksToBeAcquired foreach {
lock => carbonLocks += CarbonLockUtil.getLockObject(carbonTableIdentifier, lock)
}
LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
CarbonEnv.get.carbonMetastore.dropTable(storePath, identifier)(sqlContext)
LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]")
} catch {
case ex: Exception =>
LOGGER.error(ex, s"Dropping table $dbName.$tableName failed")
sys.error(s"Dropping table $dbName.$tableName failed: ${ex.getMessage}")
} finally {
if (carbonLocks.nonEmpty) {
val unlocked = carbonLocks.forall(_.unlock())
if (unlocked) {
logInfo("Table MetaData Unlocked Successfully")
// deleting any remaining files.
val metadataFilePath = CarbonStorePath
.getCarbonTablePath(storePath, carbonTableIdentifier).getMetadataDirectoryPath
val fileType = FileFactory.getFileType(metadataFilePath)
if (FileFactory.isFileExist(metadataFilePath, fileType)) {
val file = FileFactory.getCarbonFile(metadataFilePath, fileType)
CarbonUtil.deleteFoldersAndFiles(file.getParentFile)
}
}
}
}
Seq.empty
}
}
private[sql] case class ShowLoads(
databaseNameOp: Option[String],
tableName: String,
limit: Option[String],
override val output: Seq[Attribute]) extends RunnableCommand {
override def run(sqlContext: SQLContext): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sqlContext)
val carbonTable = CarbonEnv.get.carbonMetastore.lookupRelation1(databaseNameOp,
tableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable
CarbonStore.showSegments(
getDB.getDatabaseName(databaseNameOp, sqlContext),
tableName,
limit,
carbonTable.getMetaDataFilepath
)
}
}
private[sql] case class DescribeCommandFormatted(
child: SparkPlan,
override val output: Seq[Attribute],
tblIdentifier: TableIdentifier)
extends RunnableCommand {
override def run(sqlContext: SQLContext): Seq[Row] = {
val relation = CarbonEnv.get.carbonMetastore
.lookupRelation1(tblIdentifier)(sqlContext).asInstanceOf[CarbonRelation]
val mapper = new ObjectMapper()
val colProps = StringBuilder.newBuilder
var results: Seq[(String, String, String)] = child.schema.fields.map { field =>
val comment = if (relation.metaData.dims.contains(field.name)) {
val dimension = relation.metaData.carbonTable.getDimensionByName(
relation.tableMeta.carbonTableIdentifier.getTableName,
field.name)
if (null != dimension.getColumnProperties && dimension.getColumnProperties.size() > 0) {
colProps.append(field.name).append(".")
.append(mapper.writeValueAsString(dimension.getColumnProperties))
.append(",")
}
if (dimension.hasEncoding(Encoding.DICTIONARY)) {
"DICTIONARY, KEY COLUMN" + (dimension.hasEncoding(Encoding.INVERTED_INDEX) match {
case false => ",NOINVERTEDINDEX"
case _ => ""
})
} else {
"KEY COLUMN" + (dimension.hasEncoding(Encoding.INVERTED_INDEX) match {
case false => ",NOINVERTEDINDEX"
case _ => ""
})
}
} else {
"MEASURE"
}
(field.name, field.dataType.simpleString, comment)
}
val colPropStr = if (colProps.toString().trim().length() > 0) {
// drops additional comma at endpom
colProps.toString().dropRight(1)
} else {
colProps.toString()
}
results ++= Seq(("", "", ""), ("##Detailed Table Information", "", ""))
results ++= Seq(("Database Name: ", relation.tableMeta.carbonTableIdentifier
.getDatabaseName, "")
)
results ++= Seq(("Table Name: ", relation.tableMeta.carbonTableIdentifier.getTableName, ""))
results ++= Seq(("CARBON Store Path: ", relation.tableMeta.storePath, ""))
val carbonTable = relation.tableMeta.carbonTable
results ++= Seq(("Table Block Size : ", carbonTable.getBlockSizeInMB + " MB", ""))
results ++= Seq(("SORT_SCOPE", carbonTable.getTableInfo.getFactTable
.getTableProperties.getOrDefault("sort_scope", CarbonCommonConstants
.LOAD_SORT_SCOPE_DEFAULT), CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
results ++= Seq(("", "", ""), ("##Detailed Column property", "", ""))
if (colPropStr.length() > 0) {
results ++= Seq((colPropStr, "", ""))
} else {
results ++= Seq(("ADAPTIVE", "", ""))
}
results ++= Seq(("SORT_COLUMNS", relation.metaData.carbonTable.getSortColumns(
relation.tableMeta.carbonTableIdentifier.getTableName).asScala
.map(column => column).mkString(","), ""))
val dimension = carbonTable
.getDimensionByTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
results ++= getColumnGroups(dimension.asScala.toList)
if (carbonTable.getPartitionInfo(carbonTable.getFactTableName) != null) {
results ++=
Seq(("Partition Columns: ", carbonTable.getPartitionInfo(carbonTable.getFactTableName)
.getColumnSchemaList.asScala.map(_.getColumnName).mkString(","), ""))
}
results.map { case (name, dataType, comment) =>
Row(f"$name%-36s $dataType%-80s $comment%-72s")
}
}
private def getColumnGroups(dimensions: List[CarbonDimension]): Seq[(String, String, String)] = {
var results: Seq[(String, String, String)] =
Seq(("", "", ""), ("##Column Group Information", "", ""))
val groupedDimensions = dimensions.groupBy(x => x.columnGroupId()).filter {
case (groupId, _) => groupId != -1
}.toSeq.sortBy(_._1)
val groups = groupedDimensions.map(colGroups => {
colGroups._2.map(dim => dim.getColName).mkString(", ")
})
var index = 1
groups.foreach { x =>
results = results :+ (s"Column Group $index", x, "")
index = index + 1
}
results
}
}
private[sql] case class DeleteLoadByDate(
databaseNameOp: Option[String],
tableName: String,
dateField: String,
dateValue: String
) extends RunnableCommand {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
def run(sqlContext: SQLContext): Seq[Row] = {
val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext)
LOGGER.audit(s"The delete load by date request has been received for $dbName.$tableName")
val identifier = TableIdentifier(tableName, Option(dbName))
val relation = CarbonEnv.get.carbonMetastore
.lookupRelation1(identifier)(sqlContext).asInstanceOf[CarbonRelation]
var level: String = ""
val carbonTable = CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName)
if (relation == null) {
LOGGER.audit(s"The delete load by date is failed. Table $dbName.$tableName does not exist")
sys.error(s"Table $dbName.$tableName does not exist")
}
val matches: Seq[AttributeReference] = relation.dimensionsAttr.filter(
filter => filter.name.equalsIgnoreCase(dateField) &&
filter.dataType.isInstanceOf[TimestampType]).toList
if (matches.isEmpty) {
LOGGER.audit("The delete load by date is failed. " +
s"Table $dbName.$tableName does not contain date field: $dateField")
sys.error(s"Table $dbName.$tableName does not contain date field $dateField")
} else {
level = matches.asJava.get(0).name
}
val actualColName = relation.metaData.carbonTable.getDimensionByName(tableName, level)
.getColName
DataManagementFunc.deleteLoadByDate(
sqlContext,
new CarbonDataLoadSchema(carbonTable),
dbName,
tableName,
CarbonEnv.get.carbonMetastore.storePath,
level,
actualColName,
dateValue)
LOGGER.audit(s"The delete load by date $dateValue is successful for $dbName.$tableName.")
Seq.empty
}
}
private[sql] case class CleanFiles(
databaseNameOp: Option[String],
tableName: String) extends RunnableCommand {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
def run(sqlContext: SQLContext): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sqlContext)
val carbonTable = CarbonEnv.get.carbonMetastore.lookupRelation1(databaseNameOp,
tableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable
CarbonStore.cleanFiles(
getDB.getDatabaseName(databaseNameOp, sqlContext),
tableName,
sqlContext.asInstanceOf[CarbonContext].storePath,
carbonTable,
false)
Seq.empty
}
}