| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.spark.sql.execution.command |
| |
| import java.io.File |
| |
| import scala.collection.JavaConverters._ |
| import scala.collection.mutable.ListBuffer |
| import scala.language.implicitConversions |
| |
| import org.apache.commons.lang3.StringUtils |
| import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} |
| import org.apache.spark.sql._ |
| import org.apache.spark.sql.catalyst.TableIdentifier |
| import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute |
| import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} |
| import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan |
| import org.apache.spark.sql.execution.{RunnableCommand, SparkPlan} |
| import org.apache.spark.sql.hive.CarbonMetastore |
| import org.apache.spark.sql.types.TimestampType |
| import org.apache.spark.util.{CausedBy, FileUtils} |
| import org.codehaus.jackson.map.ObjectMapper |
| |
| import org.apache.carbondata.api.CarbonStore |
| import org.apache.carbondata.common.constants.LoggerAction |
| import org.apache.carbondata.common.logging.LogServiceFactory |
| import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} |
| import org.apache.carbondata.core.datastore.impl.FileFactory |
| import org.apache.carbondata.core.dictionary.server.DictionaryServer |
| import org.apache.carbondata.core.exception.InvalidConfigurationException |
| import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage} |
| import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifier} |
| import org.apache.carbondata.core.metadata.encoder.Encoding |
| import org.apache.carbondata.core.metadata.schema.table.TableInfo |
| import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension |
| import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum} |
| import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} |
| import org.apache.carbondata.core.util.path.CarbonStorePath |
| import org.apache.carbondata.processing.exception.DataLoadingException |
| import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants |
| import org.apache.carbondata.processing.loading.exception.NoRetryException |
| import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} |
| import org.apache.carbondata.processing.util.TableOptionConstant |
| import org.apache.carbondata.spark.exception.MalformedCarbonCommandException |
| import org.apache.carbondata.spark.load.ValidateUtil |
| import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DataManagementFunc, DictionaryLoadModel} |
| import org.apache.carbondata.spark.util.{CommonUtil, GlobalDictionaryUtil} |
| |
| object Checker { |
| def validateTableExists( |
| dbName: Option[String], |
| tableName: String, |
| sqlContext: SQLContext): Unit = { |
| val identifier = TableIdentifier(tableName, dbName) |
| if (!CarbonEnv.get.carbonMetastore.tableExists(identifier)(sqlContext)) { |
| val err = s"table $dbName.$tableName not found" |
| LogServiceFactory.getLogService(this.getClass.getName).error(err) |
| throw new IllegalArgumentException(err) |
| } |
| } |
| } |
| |
| /** |
| * Command for show table partitions Command |
| * |
| * @param tableIdentifier |
| */ |
| private[sql] case class ShowCarbonPartitionsCommand( |
| tableIdentifier: TableIdentifier) extends RunnableCommand { |
| val LOGGER = LogServiceFactory.getLogService(ShowCarbonPartitionsCommand.getClass.getName) |
| override val output = CommonUtil.partitionInfoOutput |
| override def run(sqlContext: SQLContext): Seq[Row] = { |
| val relation = CarbonEnv.get.carbonMetastore |
| .lookupRelation1(tableIdentifier)(sqlContext). |
| asInstanceOf[CarbonRelation] |
| val carbonTable = relation.tableMeta.carbonTable |
| var tableName = carbonTable.getFactTableName |
| var partitionInfo = carbonTable.getPartitionInfo( |
| carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName) |
| if (partitionInfo == null) { |
| throw new AnalysisException( |
| s"SHOW PARTITIONS is not allowed on a table that is not partitioned: $tableName") |
| } |
| var partitionType = partitionInfo.getPartitionType |
| var columnName = partitionInfo.getColumnSchemaList.get(0).getColumnName |
| LOGGER.info("partition column name:" + columnName) |
| CommonUtil.getPartitionInfo(columnName, partitionType, partitionInfo) |
| } |
| } |
| |
| /** |
| * Command for the compaction in alter table command |
| * |
| * @param alterTableModel |
| */ |
| private[sql] case class AlterTableCompaction(alterTableModel: AlterTableModel) extends |
| RunnableCommand { |
| |
| def run(sqlContext: SQLContext): Seq[Row] = { |
| // TODO : Implement it. |
| val tableName = alterTableModel.tableName |
| val databaseName = getDB.getDatabaseName(alterTableModel.dbName, sqlContext) |
| if (null == CarbonMetadata.getInstance.getCarbonTable(databaseName + "_" + tableName)) { |
| logError(s"alter table failed. table not found: $databaseName.$tableName") |
| sys.error(s"alter table failed. table not found: $databaseName.$tableName") |
| } |
| |
| val relation = |
| CarbonEnv.get.carbonMetastore |
| .lookupRelation1(Option(databaseName), tableName)(sqlContext) |
| .asInstanceOf[CarbonRelation] |
| if (relation == null) { |
| sys.error(s"Table $databaseName.$tableName does not exist") |
| } |
| val carbonLoadModel = new CarbonLoadModel() |
| |
| |
| val table = relation.tableMeta.carbonTable |
| carbonLoadModel.setTableName(table.getFactTableName) |
| val dataLoadSchema = new CarbonDataLoadSchema(table) |
| // Need to fill dimension relation |
| carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema) |
| carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName) |
| carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName) |
| carbonLoadModel.setStorePath(relation.tableMeta.storePath) |
| |
| var storeLocation = CarbonProperties.getInstance |
| .getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH, |
| System.getProperty("java.io.tmpdir") |
| ) |
| storeLocation = storeLocation + "/carbonstore/" + System.nanoTime() |
| try { |
| CarbonDataRDDFactory.alterTableForCompaction(sqlContext, |
| alterTableModel, |
| carbonLoadModel, |
| relation.tableMeta.storePath, |
| storeLocation |
| ) |
| } catch { |
| case e: Exception => |
| if (null != e.getMessage) { |
| sys.error(s"Compaction failed. Please check logs for more info. ${ e.getMessage }") |
| } else { |
| sys.error("Exception in compaction. Please check logs for more info.") |
| } |
| } |
| Seq.empty |
| } |
| } |
| |
| case class CreateTable(cm: TableModel) extends RunnableCommand { |
| |
| def run(sqlContext: SQLContext): Seq[Row] = { |
| val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) |
| cm.databaseName = getDB.getDatabaseName(cm.databaseNameOp, sqlContext) |
| val tbName = cm.tableName |
| val dbName = cm.databaseName |
| LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tbName]") |
| |
| val tableInfo: TableInfo = TableNewProcessor(cm) |
| |
| // Add validation for sort scope when create table |
| val sortScope = tableInfo.getFactTable.getTableProperties |
| .getOrDefault("sort_scope", CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT) |
| if (!CarbonUtil.isValidSortOption(sortScope)) { |
| throw new InvalidConfigurationException(s"Passing invalid SORT_SCOPE '$sortScope'," + |
| s" valid SORT_SCOPE are 'NO_SORT', 'BATCH_SORT', 'LOCAL_SORT' and 'GLOBAL_SORT' ") |
| } |
| |
| if (tableInfo.getFactTable.getListOfColumns.isEmpty) { |
| sys.error("No Dimensions found. Table should have at least one dimesnion !") |
| } |
| |
| if (sqlContext.tableNames(dbName).exists(_.equalsIgnoreCase(tbName))) { |
| if (!cm.ifNotExistsSet) { |
| LOGGER.audit( |
| s"Table creation with Database name [$dbName] and Table name [$tbName] failed. " + |
| s"Table [$tbName] already exists under database [$dbName]") |
| sys.error(s"Table [$tbName] already exists under database [$dbName]") |
| } |
| } else { |
| // Add Database to catalog and persist |
| val catalog = CarbonEnv.get.carbonMetastore |
| // Need to fill partitioner class when we support partition |
| val tablePath = catalog.createTableFromThrift(tableInfo, dbName, tbName, null)(sqlContext) |
| try { |
| sqlContext.sql( |
| s"""CREATE TABLE $dbName.$tbName USING carbondata""" + |
| s""" OPTIONS (tableName "$dbName.$tbName", tablePath "$tablePath") """) |
| .collect |
| } catch { |
| case e: Exception => |
| val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName)) |
| // call the drop table to delete the created table. |
| |
| CarbonEnv.get.carbonMetastore |
| .dropTable(catalog.storePath, identifier)(sqlContext) |
| |
| LOGGER.audit(s"Table creation with Database name [$dbName] " + |
| s"and Table name [$tbName] failed") |
| throw e |
| } |
| |
| LOGGER.audit(s"Table created with Database name [$dbName] and Table name [$tbName]") |
| } |
| |
| Seq.empty |
| } |
| |
| def setV(ref: Any, name: String, value: Any): Unit = { |
| ref.getClass.getFields.find(_.getName == name).get |
| .set(ref, value.asInstanceOf[AnyRef]) |
| } |
| } |
| |
| private[sql] case class DeleteLoadsById( |
| loadids: Seq[String], |
| databaseNameOp: Option[String], |
| tableName: String) extends RunnableCommand { |
| |
| val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) |
| |
| def run(sqlContext: SQLContext): Seq[Row] = { |
| Checker.validateTableExists(databaseNameOp, tableName, sqlContext) |
| val carbonTable = CarbonEnv.get.carbonMetastore.lookupRelation1(databaseNameOp, |
| tableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable |
| CarbonStore.deleteLoadById( |
| loadids, |
| getDB.getDatabaseName(databaseNameOp, sqlContext), |
| tableName, |
| carbonTable |
| ) |
| Seq.empty |
| |
| } |
| |
| // validates load ids |
| private def validateLoadIds: Unit = { |
| if (loadids.isEmpty) { |
| val errorMessage = "Error: Segment id(s) should not be empty." |
| throw new MalformedCarbonCommandException(errorMessage) |
| |
| } |
| } |
| } |
| |
| private[sql] case class DeleteLoadsByLoadDate( |
| databaseNameOp: Option[String], |
| tableName: String, |
| dateField: String, |
| loadDate: String) extends RunnableCommand { |
| |
| val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.tablemodel.tableSchema") |
| |
| def run(sqlContext: SQLContext): Seq[Row] = { |
| Checker.validateTableExists(databaseNameOp, tableName, sqlContext) |
| val carbonTable = CarbonEnv.get.carbonMetastore.lookupRelation1(databaseNameOp, |
| tableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable |
| CarbonStore.deleteLoadByDate( |
| loadDate, |
| getDB.getDatabaseName(databaseNameOp, sqlContext), |
| tableName, |
| carbonTable |
| ) |
| Seq.empty |
| |
| } |
| |
| } |
| |
| object LoadTable { |
| |
| def updateTableMetadata(carbonLoadModel: CarbonLoadModel, |
| sqlContext: SQLContext, |
| model: DictionaryLoadModel, |
| noDictDimension: Array[CarbonDimension]): Unit = { |
| |
| val carbonTablePath = CarbonStorePath.getCarbonTablePath(model.hdfsLocation, |
| model.table) |
| val schemaFilePath = carbonTablePath.getSchemaFilePath |
| |
| // read TableInfo |
| val tableInfo = CarbonMetastore.readSchemaFileToThriftTable(schemaFilePath) |
| |
| // modify TableInfo |
| val columns = tableInfo.getFact_table.getTable_columns |
| for (i <- 0 until columns.size) { |
| if (noDictDimension.exists(x => columns.get(i).getColumn_id.equals(x.getColumnId))) { |
| columns.get(i).encoders.remove(org.apache.carbondata.format.Encoding.DICTIONARY) |
| } |
| } |
| |
| // write TableInfo |
| CarbonMetastore.writeThriftTableToSchemaFile(schemaFilePath, tableInfo) |
| |
| // update Metadata |
| val catalog = CarbonEnv.get.carbonMetastore |
| catalog.updateMetadataByThriftTable(schemaFilePath, tableInfo, |
| model.table.getDatabaseName, model.table.getTableName, carbonLoadModel.getStorePath) |
| |
| // update CarbonDataLoadSchema |
| val carbonTable = catalog.lookupRelation1(Option(model.table.getDatabaseName), |
| model.table.getTableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable |
| carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable)) |
| } |
| |
| } |
| |
| private[sql] case class LoadTableByInsert(relation: CarbonDatasourceRelation, |
| child: LogicalPlan, isOverwriteExist: Boolean) extends RunnableCommand { |
| val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) |
| def run(sqlContext: SQLContext): Seq[Row] = { |
| val df = new DataFrame(sqlContext, child) |
| val header = relation.carbonRelation.output.map(_.name).mkString(",") |
| val load = LoadTable( |
| Some(relation.carbonRelation.databaseName), |
| relation.carbonRelation.tableName, |
| null, |
| Seq(), |
| scala.collection.immutable.Map("fileheader" -> header), |
| isOverwriteExist, |
| null, |
| Some(df)).run(sqlContext) |
| // updating relation metadata. This is in case of auto detect high cardinality |
| relation.carbonRelation.metaData = |
| CarbonSparkUtil.createSparkMeta(relation.carbonRelation.tableMeta.carbonTable) |
| load |
| } |
| } |
| case class LoadTable( |
| databaseNameOp: Option[String], |
| tableName: String, |
| factPathFromUser: String, |
| dimFilesPath: Seq[DataLoadTableFileMapping], |
| options: scala.collection.immutable.Map[String, String], |
| isOverwriteExist: Boolean, |
| var inputSqlString: String = null, |
| dataFrame: Option[DataFrame] = None, |
| updateModel: Option[UpdateTableModel] = None) extends RunnableCommand { |
| |
| val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) |
| |
| private def checkDefaultValue(value: String, default: String) = if (StringUtils.isEmpty(value)) { |
| default |
| } else { |
| value |
| } |
| |
| def run(sqlContext: SQLContext): Seq[Row] = { |
| if (dataFrame.isDefined && !updateModel.isDefined) { |
| val rdd = dataFrame.get.rdd |
| if (rdd.partitions == null || rdd.partitions.length == 0) { |
| LOGGER.warn("DataLoading finished. No data was loaded.") |
| return Seq.empty |
| } |
| } |
| |
| val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext) |
| if (null == CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)) { |
| logError(s"Data loading failed. table not found: $dbName.$tableName") |
| LOGGER.audit(s"Data loading failed. table not found: $dbName.$tableName") |
| sys.error(s"Data loading failed. table not found: $dbName.$tableName") |
| } |
| |
| val relation = CarbonEnv.get.carbonMetastore |
| .lookupRelation1(Option(dbName), tableName)(sqlContext) |
| .asInstanceOf[CarbonRelation] |
| if (relation == null) { |
| sys.error(s"Table $dbName.$tableName does not exist") |
| } |
| CarbonProperties.getInstance().addProperty("zookeeper.enable.lock", "false") |
| val carbonLock = CarbonLockFactory |
| .getCarbonLockObj(relation.tableMeta.carbonTable.getAbsoluteTableIdentifier |
| .getCarbonTableIdentifier, |
| LockUsage.METADATA_LOCK |
| ) |
| try { |
| // take lock only in case of normal data load. |
| if (!updateModel.isDefined) { |
| if (carbonLock.lockWithRetries()) { |
| logInfo("Successfully able to get the table metadata file lock") |
| } else { |
| sys.error("Table is locked for updation. Please try after some time") |
| } |
| } |
| |
| val factPath = if (dataFrame.isDefined) { |
| "" |
| } else { |
| FileUtils.getPaths( |
| CarbonUtil.checkAndAppendHDFSUrl(factPathFromUser)) |
| } |
| val carbonLoadModel = new CarbonLoadModel() |
| carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName) |
| carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName) |
| carbonLoadModel.setStorePath(relation.tableMeta.storePath) |
| |
| val table = relation.tableMeta.carbonTable |
| carbonLoadModel.setTableName(table.getFactTableName) |
| val dataLoadSchema = new CarbonDataLoadSchema(table) |
| // Need to fill dimension relation |
| carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema) |
| |
| val partitionLocation = relation.tableMeta.storePath + "/partition/" + |
| relation.tableMeta.carbonTableIdentifier.getDatabaseName + "/" + |
| relation.tableMeta.carbonTableIdentifier.getTableName + "/" |
| |
| |
| val columnar = sqlContext.getConf("carbon.is.columnar.storage", "true").toBoolean |
| |
| val delimiter = options.getOrElse("delimiter", ",") |
| val quoteChar = options.getOrElse("quotechar", "\"") |
| var fileHeader = options.getOrElse("fileheader", "") |
| val escapeChar = options.getOrElse("escapechar", "\\") |
| val commentchar = options.getOrElse("commentchar", "#") |
| val columnDict = options.getOrElse("columndict", null) |
| val serializationNullFormat = options.getOrElse("serialization_null_format", "\\N") |
| val badRecordsLoggerEnable = options.getOrElse("bad_records_logger_enable", "false") |
| val badRecordActionValue = CarbonProperties.getInstance() |
| .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, |
| CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT) |
| val badRecordsAction = options.getOrElse("bad_records_action", badRecordActionValue) |
| val isEmptyDataBadRecord = options.getOrElse("is_empty_data_bad_record", "false") |
| val allDictionaryPath = options.getOrElse("all_dictionary_path", "") |
| val complex_delimiter_level_1 = options.getOrElse("complex_delimiter_level_1", "\\$") |
| val complex_delimiter_level_2 = options.getOrElse("complex_delimiter_level_2", "\\:") |
| val dateFormat = options.getOrElse("dateformat", null) |
| ValidateUtil.validateDateFormat(dateFormat, table, tableName) |
| val maxColumns = options.getOrElse("maxcolumns", null) |
| val tableProperties = table.getTableInfo.getFactTable.getTableProperties |
| val sortScopeDefault = CarbonProperties.getInstance(). |
| getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE, |
| CarbonProperties.getInstance().getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, |
| CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)) |
| val sortScope = if (null == tableProperties) { |
| sortScopeDefault |
| } else { |
| tableProperties.getOrDefault("sort_scope", sortScopeDefault) |
| } |
| |
| ValidateUtil.validateSortScope(table, sortScope) |
| val carbonProperty: CarbonProperties = CarbonProperties.getInstance() |
| val batchSortSizeInMB = options.getOrElse("batch_sort_size_inmb", carbonProperty |
| .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB, |
| carbonProperty.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, |
| CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))) |
| val globalSortPartitions = options.getOrElse("global_sort_partitions", carbonProperty |
| .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS, null)) |
| ValidateUtil.validateGlobalSortPartitions(globalSortPartitions) |
| |
| // if there isn't file header in csv file and load sql doesn't provide FILEHEADER option, |
| // we should use table schema to generate file header. |
| val headerOption = options.get("header") |
| if (headerOption.isDefined) { |
| // whether the csv file has file header |
| // the default value is true |
| val header = try { |
| headerOption.get.toBoolean |
| } catch { |
| case ex: IllegalArgumentException => |
| throw new MalformedCarbonCommandException( |
| "'header' option should be either 'true' or 'false'. " + ex.getMessage) |
| } |
| header match { |
| case true => |
| if (fileHeader.nonEmpty) { |
| throw new MalformedCarbonCommandException( |
| "When 'header' option is true, 'fileheader' option is not required.") |
| } |
| case false => |
| // generate file header |
| if (fileHeader.isEmpty) { |
| fileHeader = table.getCreateOrderColumn(table.getFactTableName) |
| .asScala.map(_.getColName).mkString(",") |
| } |
| } |
| } |
| |
| val bad_record_path = options.getOrElse("bad_record_path", |
| CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, |
| CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)) |
| if (badRecordsLoggerEnable.toBoolean || |
| LoggerAction.REDIRECT.name().equalsIgnoreCase(badRecordsAction)) { |
| if (!CarbonUtil.isValidBadStorePath(bad_record_path)) { |
| sys.error("Invalid bad records location.") |
| } |
| } |
| carbonLoadModel.setBadRecordsLocation(bad_record_path) |
| carbonLoadModel.setEscapeChar(checkDefaultValue(escapeChar, "\\")) |
| carbonLoadModel.setQuoteChar(checkDefaultValue(quoteChar, "\"")) |
| carbonLoadModel.setCommentChar(checkDefaultValue(commentchar, "#")) |
| carbonLoadModel.setDateFormat(dateFormat) |
| carbonLoadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty( |
| CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, |
| CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)) |
| carbonLoadModel.setDefaultDateFormat(CarbonProperties.getInstance().getProperty( |
| CarbonCommonConstants.CARBON_DATE_FORMAT, |
| CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)) |
| carbonLoadModel |
| .setSerializationNullFormat( |
| TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + "," + serializationNullFormat) |
| carbonLoadModel |
| .setBadRecordsLoggerEnable( |
| TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + "," + badRecordsLoggerEnable) |
| carbonLoadModel |
| .setBadRecordsAction( |
| TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + badRecordsAction) |
| carbonLoadModel |
| .setIsEmptyDataBadRecord( |
| DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + isEmptyDataBadRecord) |
| carbonLoadModel.setSortScope(sortScope) |
| carbonLoadModel.setBatchSortSizeInMb(batchSortSizeInMB) |
| carbonLoadModel.setGlobalSortPartitions(globalSortPartitions) |
| // when single_pass=true, and not use all dict |
| val useOnePass = options.getOrElse("single_pass", "false").trim.toLowerCase match { |
| case "true" => |
| true |
| case "false" => |
| // when single_pass = false and if either alldictionary |
| // or columnDict is configured the do not allow load |
| if (StringUtils.isNotEmpty(allDictionaryPath) || StringUtils.isNotEmpty(columnDict)) { |
| throw new MalformedCarbonCommandException( |
| "Can not use all_dictionary_path or columndict without single_pass.") |
| } else { |
| false |
| } |
| case illegal => |
| LOGGER.error(s"Can't use single_pass, because illegal syntax found: [" + illegal + "] " + |
| "Please set it as 'true' or 'false'") |
| false |
| } |
| carbonLoadModel.setUseOnePass(useOnePass) |
| |
| if (delimiter.equalsIgnoreCase(complex_delimiter_level_1) || |
| complex_delimiter_level_1.equalsIgnoreCase(complex_delimiter_level_2) || |
| delimiter.equalsIgnoreCase(complex_delimiter_level_2)) { |
| sys.error(s"Field Delimiter & Complex types delimiter are same") |
| } |
| else { |
| carbonLoadModel.setComplexDelimiterLevel1( |
| CarbonUtil.delimiterConverter(complex_delimiter_level_1)) |
| carbonLoadModel.setComplexDelimiterLevel2( |
| CarbonUtil.delimiterConverter(complex_delimiter_level_2)) |
| } |
| // set local dictionary path, and dictionary file extension |
| carbonLoadModel.setAllDictPath(allDictionaryPath) |
| |
| val partitionStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS |
| |
| try { |
| // First system has to partition the data first and then call the load data |
| LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)") |
| carbonLoadModel.setFactFilePath(factPath) |
| carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimiter)) |
| carbonLoadModel.setCsvHeader(fileHeader) |
| carbonLoadModel.setColDictFilePath(columnDict) |
| carbonLoadModel.setDirectLoad(true) |
| carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel)) |
| val validatedMaxColumns = CommonUtil.validateMaxColumns(carbonLoadModel.getCsvHeaderColumns, |
| maxColumns) |
| carbonLoadModel.setMaxColumns(validatedMaxColumns.toString) |
| GlobalDictionaryUtil.updateTableMetadataFunc = updateTableMetadata |
| val storePath = relation.tableMeta.storePath |
| val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable |
| val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier |
| .getCarbonTableIdentifier |
| val carbonTablePath = CarbonStorePath |
| .getCarbonTablePath(storePath, carbonTableIdentifier) |
| val dictFolderPath = carbonTablePath.getMetadataDirectoryPath |
| val dimensions = carbonTable.getDimensionByTableName( |
| carbonTable.getFactTableName).asScala.toArray |
| // add the start entry for the new load in the table status file |
| if (!updateModel.isDefined) { |
| CommonUtil. |
| readAndUpdateLoadProgressInTableMeta(carbonLoadModel, storePath, isOverwriteExist) |
| } |
| if (isOverwriteExist) { |
| LOGGER.info(s"Overwrite is in progress for carbon table with $dbName.$tableName") |
| } |
| if (null == carbonLoadModel.getLoadMetadataDetails) { |
| CommonUtil.readLoadMetadataDetails(carbonLoadModel) |
| } |
| if (carbonLoadModel.getLoadMetadataDetails.isEmpty && carbonLoadModel.getUseOnePass && |
| StringUtils.isEmpty(columnDict) && StringUtils.isEmpty(allDictionaryPath)) { |
| LOGGER.info(s"Cannot use single_pass=true for $dbName.$tableName during the first load") |
| LOGGER.audit(s"Cannot use single_pass=true for $dbName.$tableName during the first load") |
| carbonLoadModel.setUseOnePass(false) |
| } |
| if (carbonLoadModel.getUseOnePass) { |
| val colDictFilePath = carbonLoadModel.getColDictFilePath |
| if (!StringUtils.isEmpty(colDictFilePath)) { |
| carbonLoadModel.initPredefDictMap() |
| // generate predefined dictionary |
| GlobalDictionaryUtil |
| .generatePredefinedColDictionary(colDictFilePath, carbonTableIdentifier, |
| dimensions, carbonLoadModel, sqlContext, storePath, dictFolderPath) |
| } |
| val allDictPath: String = carbonLoadModel.getAllDictPath |
| if(!StringUtils.isEmpty(allDictPath)) { |
| carbonLoadModel.initPredefDictMap() |
| GlobalDictionaryUtil |
| .generateDictionaryFromDictionaryFiles(sqlContext, |
| carbonLoadModel, |
| storePath, |
| carbonTableIdentifier, |
| dictFolderPath, |
| dimensions, |
| allDictionaryPath) |
| } |
| // dictionaryServerClient dictionary generator |
| val dictionaryServerPort = CarbonProperties.getInstance() |
| .getProperty(CarbonCommonConstants.DICTIONARY_SERVER_PORT, |
| CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT) |
| val sparkDriverHost = sqlContext.sparkContext.getConf.get("spark.driver.host") |
| carbonLoadModel.setDictionaryServerHost(sparkDriverHost) |
| // start dictionary server when use one pass load and dimension with DICTIONARY |
| // encoding is present. |
| val allDimensions = table.getAllDimensions.asScala.toList |
| val createDictionary = allDimensions.exists { |
| carbonDimension => carbonDimension.hasEncoding(Encoding.DICTIONARY) && |
| !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) |
| } |
| val server: Option[DictionaryServer] = if (createDictionary) { |
| val dictionaryServer = DictionaryServer |
| .getInstance(dictionaryServerPort.toInt, carbonTable) |
| carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort) |
| sqlContext.sparkContext.addSparkListener(new SparkListener() { |
| override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { |
| dictionaryServer.shutdown() |
| } |
| }) |
| Some(dictionaryServer) |
| } else { |
| None |
| } |
| CarbonDataRDDFactory.loadCarbonData(sqlContext, |
| carbonLoadModel, |
| relation.tableMeta.storePath, |
| columnar, |
| partitionStatus, |
| server, |
| isOverwriteExist, |
| dataFrame, |
| updateModel) |
| } else { |
| val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) { |
| val fields = dataFrame.get.schema.fields |
| import org.apache.spark.sql.functions.udf |
| // extracting only segment from tupleId |
| val getSegIdUDF = udf((tupleId: String) => |
| CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID)) |
| // getting all fields except tupleId field as it is not required in the value |
| var otherFields = fields.toSeq |
| .filter(field => !field.name |
| .equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)) |
| .map(field => { |
| if (field.name.endsWith(CarbonCommonConstants.UPDATED_COL_EXTENSION) && false) { |
| new Column(field.name |
| .substring(0, |
| field.name.lastIndexOf(CarbonCommonConstants.UPDATED_COL_EXTENSION))) |
| } else { |
| |
| new Column(field.name) |
| } |
| }) |
| |
| // extract tupleId field which will be used as a key |
| val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute |
| .quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))). |
| as(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_SEGMENTID) |
| // use dataFrameWithoutTupleId as dictionaryDataFrame |
| val dataFrameWithoutTupleId = dataFrame.get.select(otherFields: _*) |
| otherFields = otherFields :+ segIdColumn |
| // use dataFrameWithTupleId as loadDataFrame |
| val dataFrameWithTupleId = dataFrame.get.select(otherFields: _*) |
| (Some(dataFrameWithoutTupleId), Some(dataFrameWithTupleId)) |
| } else { |
| (dataFrame, dataFrame) |
| } |
| GlobalDictionaryUtil |
| .generateGlobalDictionary(sqlContext, carbonLoadModel, relation.tableMeta.storePath, |
| dictionaryDataFrame) |
| CarbonDataRDDFactory.loadCarbonData(sqlContext, |
| carbonLoadModel, |
| relation.tableMeta.storePath, |
| columnar, |
| partitionStatus, |
| None, |
| isOverwriteExist, |
| loadDataFrame, |
| updateModel) |
| } |
| } catch { |
| case CausedBy(ex: NoRetryException) => |
| LOGGER.error(ex, s"Dataload failure for $dbName.$tableName") |
| throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}") |
| case ex: Exception => |
| LOGGER.error(ex) |
| LOGGER.audit(s"Dataload failure for $dbName.$tableName. Please check the logs") |
| throw ex |
| } finally { |
| // Once the data load is successful delete the unwanted partition files |
| try { |
| val fileType = FileFactory.getFileType(partitionLocation) |
| if (FileFactory.isFileExist(partitionLocation, fileType)) { |
| val file = FileFactory |
| .getCarbonFile(partitionLocation, fileType) |
| CarbonUtil.deleteFoldersAndFiles(file) |
| } |
| } catch { |
| case ex: Exception => |
| LOGGER.error(ex) |
| LOGGER.audit(s"Dataload failure for $dbName.$tableName. " + |
| "Problem deleting the partition folder") |
| throw ex |
| } |
| |
| } |
| } catch { |
| case dle: DataLoadingException => |
| LOGGER.audit(s"Dataload failed for $dbName.$tableName. " + dle.getMessage) |
| throw dle |
| case mce: MalformedCarbonCommandException => |
| LOGGER.audit(s"Dataload failed for $dbName.$tableName. " + mce.getMessage) |
| throw mce |
| } finally { |
| if (carbonLock != null) { |
| if (carbonLock.unlock()) { |
| logInfo("Table MetaData Unlocked Successfully after data load") |
| } else { |
| logError("Unable to unlock Table MetaData") |
| } |
| } |
| } |
| Seq.empty |
| } |
| |
| private def updateTableMetadata(carbonLoadModel: CarbonLoadModel, |
| sqlContext: SQLContext, |
| model: DictionaryLoadModel, |
| noDictDimension: Array[CarbonDimension]): Unit = { |
| |
| val carbonTablePath = CarbonStorePath.getCarbonTablePath(model.hdfsLocation, |
| model.table) |
| val schemaFilePath = carbonTablePath.getSchemaFilePath |
| |
| // read TableInfo |
| val tableInfo = CarbonMetastore.readSchemaFileToThriftTable(schemaFilePath) |
| |
| // modify TableInfo |
| val columns = tableInfo.getFact_table.getTable_columns |
| for (i <- 0 until columns.size) { |
| if (noDictDimension.exists(x => columns.get(i).getColumn_id.equals(x.getColumnId))) { |
| columns.get(i).encoders.remove(org.apache.carbondata.format.Encoding.DICTIONARY) |
| } |
| } |
| |
| // write TableInfo |
| CarbonMetastore.writeThriftTableToSchemaFile(schemaFilePath, tableInfo) |
| |
| |
| val catalog = CarbonEnv.get.carbonMetastore |
| |
| // upate the schema modified time |
| catalog.updateSchemasUpdatedTime(catalog.touchSchemaFileSystemTime( |
| carbonLoadModel.getDatabaseName, |
| carbonLoadModel.getTableName)) |
| |
| // update Metadata |
| catalog.updateMetadataByThriftTable(schemaFilePath, tableInfo, |
| model.table.getDatabaseName, model.table.getTableName, carbonLoadModel.getStorePath) |
| |
| // update CarbonDataLoadSchema |
| val carbonTable = catalog.lookupRelation1(Option(model.table.getDatabaseName), |
| model.table.getTableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable |
| carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable)) |
| } |
| |
| } |
| |
| private[sql] case class DropTableCommand(ifExistsSet: Boolean, databaseNameOp: Option[String], |
| tableName: String) |
| extends RunnableCommand { |
| |
| def run(sqlContext: SQLContext): Seq[Row] = { |
| val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) |
| val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext) |
| val identifier = TableIdentifier(tableName, Option(dbName)) |
| val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "") |
| val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK) |
| val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer() |
| val catalog = CarbonEnv.get.carbonMetastore |
| val storePath = catalog.storePath |
| try { |
| locksToBeAcquired foreach { |
| lock => carbonLocks += CarbonLockUtil.getLockObject(carbonTableIdentifier, lock) |
| } |
| LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]") |
| CarbonEnv.get.carbonMetastore.dropTable(storePath, identifier)(sqlContext) |
| LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]") |
| } catch { |
| case ex: Exception => |
| LOGGER.error(ex, s"Dropping table $dbName.$tableName failed") |
| sys.error(s"Dropping table $dbName.$tableName failed: ${ex.getMessage}") |
| } finally { |
| if (carbonLocks.nonEmpty) { |
| val unlocked = carbonLocks.forall(_.unlock()) |
| if (unlocked) { |
| logInfo("Table MetaData Unlocked Successfully") |
| // deleting any remaining files. |
| val metadataFilePath = CarbonStorePath |
| .getCarbonTablePath(storePath, carbonTableIdentifier).getMetadataDirectoryPath |
| val fileType = FileFactory.getFileType(metadataFilePath) |
| if (FileFactory.isFileExist(metadataFilePath, fileType)) { |
| val file = FileFactory.getCarbonFile(metadataFilePath, fileType) |
| CarbonUtil.deleteFoldersAndFiles(file.getParentFile) |
| } |
| } |
| } |
| } |
| Seq.empty |
| } |
| } |
| |
| private[sql] case class ShowLoads( |
| databaseNameOp: Option[String], |
| tableName: String, |
| limit: Option[String], |
| override val output: Seq[Attribute]) extends RunnableCommand { |
| |
| override def run(sqlContext: SQLContext): Seq[Row] = { |
| Checker.validateTableExists(databaseNameOp, tableName, sqlContext) |
| val carbonTable = CarbonEnv.get.carbonMetastore.lookupRelation1(databaseNameOp, |
| tableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable |
| CarbonStore.showSegments( |
| getDB.getDatabaseName(databaseNameOp, sqlContext), |
| tableName, |
| limit, |
| carbonTable.getMetaDataFilepath |
| ) |
| } |
| } |
| |
| private[sql] case class DescribeCommandFormatted( |
| child: SparkPlan, |
| override val output: Seq[Attribute], |
| tblIdentifier: TableIdentifier) |
| extends RunnableCommand { |
| |
| override def run(sqlContext: SQLContext): Seq[Row] = { |
| val relation = CarbonEnv.get.carbonMetastore |
| .lookupRelation1(tblIdentifier)(sqlContext).asInstanceOf[CarbonRelation] |
| val mapper = new ObjectMapper() |
| val colProps = StringBuilder.newBuilder |
| var results: Seq[(String, String, String)] = child.schema.fields.map { field => |
| val comment = if (relation.metaData.dims.contains(field.name)) { |
| val dimension = relation.metaData.carbonTable.getDimensionByName( |
| relation.tableMeta.carbonTableIdentifier.getTableName, |
| field.name) |
| if (null != dimension.getColumnProperties && dimension.getColumnProperties.size() > 0) { |
| colProps.append(field.name).append(".") |
| .append(mapper.writeValueAsString(dimension.getColumnProperties)) |
| .append(",") |
| } |
| if (dimension.hasEncoding(Encoding.DICTIONARY)) { |
| "DICTIONARY, KEY COLUMN" + (dimension.hasEncoding(Encoding.INVERTED_INDEX) match { |
| case false => ",NOINVERTEDINDEX" |
| case _ => "" |
| }) |
| } else { |
| "KEY COLUMN" + (dimension.hasEncoding(Encoding.INVERTED_INDEX) match { |
| case false => ",NOINVERTEDINDEX" |
| case _ => "" |
| }) |
| } |
| } else { |
| "MEASURE" |
| } |
| (field.name, field.dataType.simpleString, comment) |
| } |
| val colPropStr = if (colProps.toString().trim().length() > 0) { |
| // drops additional comma at endpom |
| colProps.toString().dropRight(1) |
| } else { |
| colProps.toString() |
| } |
| results ++= Seq(("", "", ""), ("##Detailed Table Information", "", "")) |
| results ++= Seq(("Database Name: ", relation.tableMeta.carbonTableIdentifier |
| .getDatabaseName, "") |
| ) |
| results ++= Seq(("Table Name: ", relation.tableMeta.carbonTableIdentifier.getTableName, "")) |
| results ++= Seq(("CARBON Store Path: ", relation.tableMeta.storePath, "")) |
| val carbonTable = relation.tableMeta.carbonTable |
| results ++= Seq(("Table Block Size : ", carbonTable.getBlockSizeInMB + " MB", "")) |
| results ++= Seq(("SORT_SCOPE", carbonTable.getTableInfo.getFactTable |
| .getTableProperties.getOrDefault("sort_scope", CarbonCommonConstants |
| .LOAD_SORT_SCOPE_DEFAULT), CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)) |
| results ++= Seq(("", "", ""), ("##Detailed Column property", "", "")) |
| if (colPropStr.length() > 0) { |
| results ++= Seq((colPropStr, "", "")) |
| } else { |
| results ++= Seq(("ADAPTIVE", "", "")) |
| } |
| results ++= Seq(("SORT_COLUMNS", relation.metaData.carbonTable.getSortColumns( |
| relation.tableMeta.carbonTableIdentifier.getTableName).asScala |
| .map(column => column).mkString(","), "")) |
| val dimension = carbonTable |
| .getDimensionByTableName(relation.tableMeta.carbonTableIdentifier.getTableName) |
| results ++= getColumnGroups(dimension.asScala.toList) |
| if (carbonTable.getPartitionInfo(carbonTable.getFactTableName) != null) { |
| results ++= |
| Seq(("Partition Columns: ", carbonTable.getPartitionInfo(carbonTable.getFactTableName) |
| .getColumnSchemaList.asScala.map(_.getColumnName).mkString(","), "")) |
| } |
| results.map { case (name, dataType, comment) => |
| Row(f"$name%-36s $dataType%-80s $comment%-72s") |
| } |
| } |
| |
| private def getColumnGroups(dimensions: List[CarbonDimension]): Seq[(String, String, String)] = { |
| var results: Seq[(String, String, String)] = |
| Seq(("", "", ""), ("##Column Group Information", "", "")) |
| val groupedDimensions = dimensions.groupBy(x => x.columnGroupId()).filter { |
| case (groupId, _) => groupId != -1 |
| }.toSeq.sortBy(_._1) |
| val groups = groupedDimensions.map(colGroups => { |
| colGroups._2.map(dim => dim.getColName).mkString(", ") |
| }) |
| var index = 1 |
| groups.foreach { x => |
| results = results :+ (s"Column Group $index", x, "") |
| index = index + 1 |
| } |
| results |
| } |
| } |
| |
| private[sql] case class DeleteLoadByDate( |
| databaseNameOp: Option[String], |
| tableName: String, |
| dateField: String, |
| dateValue: String |
| ) extends RunnableCommand { |
| |
| val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) |
| |
| def run(sqlContext: SQLContext): Seq[Row] = { |
| val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext) |
| LOGGER.audit(s"The delete load by date request has been received for $dbName.$tableName") |
| val identifier = TableIdentifier(tableName, Option(dbName)) |
| val relation = CarbonEnv.get.carbonMetastore |
| .lookupRelation1(identifier)(sqlContext).asInstanceOf[CarbonRelation] |
| var level: String = "" |
| val carbonTable = CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName) |
| if (relation == null) { |
| LOGGER.audit(s"The delete load by date is failed. Table $dbName.$tableName does not exist") |
| sys.error(s"Table $dbName.$tableName does not exist") |
| } |
| val matches: Seq[AttributeReference] = relation.dimensionsAttr.filter( |
| filter => filter.name.equalsIgnoreCase(dateField) && |
| filter.dataType.isInstanceOf[TimestampType]).toList |
| if (matches.isEmpty) { |
| LOGGER.audit("The delete load by date is failed. " + |
| s"Table $dbName.$tableName does not contain date field: $dateField") |
| sys.error(s"Table $dbName.$tableName does not contain date field $dateField") |
| } else { |
| level = matches.asJava.get(0).name |
| } |
| val actualColName = relation.metaData.carbonTable.getDimensionByName(tableName, level) |
| .getColName |
| DataManagementFunc.deleteLoadByDate( |
| sqlContext, |
| new CarbonDataLoadSchema(carbonTable), |
| dbName, |
| tableName, |
| CarbonEnv.get.carbonMetastore.storePath, |
| level, |
| actualColName, |
| dateValue) |
| LOGGER.audit(s"The delete load by date $dateValue is successful for $dbName.$tableName.") |
| Seq.empty |
| } |
| |
| } |
| |
| private[sql] case class CleanFiles( |
| databaseNameOp: Option[String], |
| tableName: String) extends RunnableCommand { |
| |
| val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) |
| |
| def run(sqlContext: SQLContext): Seq[Row] = { |
| Checker.validateTableExists(databaseNameOp, tableName, sqlContext) |
| val carbonTable = CarbonEnv.get.carbonMetastore.lookupRelation1(databaseNameOp, |
| tableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable |
| CarbonStore.cleanFiles( |
| getDB.getDatabaseName(databaseNameOp, sqlContext), |
| tableName, |
| sqlContext.asInstanceOf[CarbonContext].storePath, |
| carbonTable, |
| false) |
| Seq.empty |
| } |
| } |