| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.spark.sql.execution.command.management |
| |
| import java.text.SimpleDateFormat |
| import java.util |
| |
| import scala.collection.JavaConverters._ |
| import scala.collection.mutable |
| |
| import org.apache.spark.sql._ |
| import org.apache.spark.sql.catalyst.catalog.CatalogTable |
| import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping, UpdateTableModel} |
| import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, LogicalRelation, SparkCarbonTableFormat} |
| import org.apache.spark.sql.types.{DateType, IntegerType, LongType, StringType, StructField, StructType, TimestampType} |
| import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils, SparkUtil} |
| |
| import org.apache.carbondata.common.logging.LogServiceFactory |
| import org.apache.carbondata.core.datastore.impl.FileFactory |
| import org.apache.carbondata.core.indexstore.PartitionSpec |
| import org.apache.carbondata.core.metadata.encoder.Encoding |
| import org.apache.carbondata.core.metadata.schema.table.CarbonTable |
| import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} |
| import org.apache.carbondata.core.util._ |
| import org.apache.carbondata.core.util.path.CarbonTablePath |
| import org.apache.carbondata.events.OperationContext |
| import org.apache.carbondata.events.exception.PreEventException |
| import org.apache.carbondata.processing.loading.TableProcessingOperations |
| import org.apache.carbondata.processing.loading.exception.NoRetryException |
| import org.apache.carbondata.processing.loading.model.CarbonLoadModel |
| import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil} |
| import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory |
| |
| case class CarbonLoadDataCommand(databaseNameOp: Option[String], |
| tableName: String, |
| factPathFromUser: String, |
| dimFilesPath: Seq[DataLoadTableFileMapping], |
| options: Map[String, String], |
| isOverwriteTable: Boolean, |
| partition: Map[String, Option[String]] = Map.empty, |
| var operationContext: OperationContext = new OperationContext) |
| extends AtomicRunnableCommand { |
| |
| var table: CarbonTable = _ |
| |
| var logicalPartitionRelation: LogicalRelation = _ |
| |
| var sizeInBytes: Long = _ |
| |
| var currPartitions: util.List[PartitionSpec] = _ |
| |
| var parentTablePath: String = _ |
| |
| val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) |
| |
| var finalPartition: Map[String, Option[String]] = Map.empty |
| |
| var timeStampFormat: SimpleDateFormat = _ |
| |
| var dateFormat: SimpleDateFormat = _ |
| |
| override def processMetadata(sparkSession: SparkSession): Seq[Row] = { |
| val (sizeInBytes, table, dbName, logicalPartitionRelation, finalPartition) = |
| CommonLoadUtils.processMetadataCommon(sparkSession, |
| databaseNameOp, |
| tableName, |
| None, |
| partition) |
| this.sizeInBytes = sizeInBytes |
| this.table = table |
| this.logicalPartitionRelation = logicalPartitionRelation |
| this.finalPartition = finalPartition |
| setAuditTable(dbName, tableName) |
| Seq.empty |
| } |
| |
| override def processData(sparkSession: SparkSession): Seq[Row] = { |
| val hadoopConf = sparkSession.sessionState.newHadoopConf() |
| val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession) |
| CarbonProperties.getInstance().addProperty("zookeeper.enable.lock", "false") |
| val factPath = FileUtils.getPaths(factPathFromUser, hadoopConf) |
| currPartitions = CommonLoadUtils.getCurrentPartitions(sparkSession, table) |
| CommonLoadUtils.setNumberOfCoresWhileLoading(sparkSession) |
| val optionsFinal: util.Map[String, String] = |
| CommonLoadUtils.getFinalLoadOptions(table, options) |
| val carbonLoadModel: CarbonLoadModel = CommonLoadUtils.prepareLoadModel( |
| hadoopConf, |
| factPath, |
| optionsFinal, parentTablePath, table, isDataFrame = false, Map.empty, finalPartition, options) |
| val (tf, df) = CommonLoadUtils.getTimeAndDateFormatFromLoadModel(carbonLoadModel) |
| timeStampFormat = tf |
| dateFormat = df |
| // Delete stale segment folders that are not in table status but are physically present in |
| // the Fact folder |
| LOGGER.info(s"Deleting stale folders if present for table $dbName.$tableName") |
| TableProcessingOperations.deletePartialLoadDataIfExist(table, false) |
| var isUpdateTableStatusRequired = false |
| val uuid = "" |
| try { |
| val (tableIndexes, indexOperationContext) = |
| CommonLoadUtils.firePreLoadEvents( |
| sparkSession = sparkSession, |
| carbonLoadModel = carbonLoadModel, |
| uuid = uuid, |
| factPath = factPath, |
| optionsFinal = optionsFinal, |
| options = options.asJava, |
| isOverwriteTable = isOverwriteTable, |
| isDataFrame = false, |
| updateModel = None, |
| operationContext = operationContext) |
| // Clean up the old invalid segment data before creating a new entry for new load. |
| SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false, currPartitions) |
| // add the start entry for the new load in the table status file |
| if (!table.isHivePartitionTable) { |
| CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta( |
| carbonLoadModel, |
| isOverwriteTable) |
| isUpdateTableStatusRequired = true |
| } |
| if (isOverwriteTable) { |
| LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress") |
| } |
| // Create table and metadata folders if not exist |
| if (carbonLoadModel.isCarbonTransactionalTable) { |
| val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath) |
| if (!FileFactory.isFileExist(metadataDirectoryPath)) { |
| FileFactory.mkdirs(metadataDirectoryPath) |
| } |
| } else { |
| carbonLoadModel.setSegmentId(System.nanoTime().toString) |
| } |
| val partitionStatus = SegmentStatus.SUCCESS |
| LOGGER.info("Sort Scope : " + carbonLoadModel.getSortScope) |
| val loadParams = CarbonLoadParams(sparkSession, |
| tableName, |
| sizeInBytes, |
| isOverwriteTable, |
| carbonLoadModel, |
| hadoopConf, |
| logicalPartitionRelation, |
| dateFormat, |
| timeStampFormat, |
| options, |
| finalPartition, |
| currPartitions, |
| partitionStatus, |
| None, |
| None, |
| None, |
| operationContext) |
| val (rows, loadResult) = loadData(loadParams) |
| val info = CommonLoadUtils.makeAuditInfo(loadResult) |
| setAuditInfo(info) |
| CommonLoadUtils.firePostLoadEvents(sparkSession, |
| carbonLoadModel, |
| tableIndexes, |
| indexOperationContext, |
| table, |
| operationContext) |
| } catch { |
| case CausedBy(ex: NoRetryException) => |
| // update the load entry in table status file for changing the status to marked for delete |
| if (isUpdateTableStatusRequired) { |
| CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid) |
| } |
| LOGGER.error(s"Dataload failure for $dbName.$tableName", ex) |
| throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}") |
| // In case of event related exception |
| case preEventEx: PreEventException => |
| LOGGER.error(s"Dataload failure for $dbName.$tableName", preEventEx) |
| throw new AnalysisException(preEventEx.getMessage) |
| case ex: Exception => |
| LOGGER.error(ex.getMessage, ex) |
| // update the load entry in table status file for changing the status to marked for delete |
| if (isUpdateTableStatusRequired) { |
| CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid) |
| } |
| throw ex |
| } |
| Seq.empty |
| } |
| |
| def loadData(loadParams: CarbonLoadParams): (Seq[Row], LoadMetadataDetails) = { |
| var rows = Seq.empty[Row] |
| val table = loadParams.carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable |
| var loadResult : LoadMetadataDetails = null |
| if (table.isHivePartitionTable) { |
| rows = CommonLoadUtils.loadDataWithPartition(loadParams) |
| } else { |
| loadResult = CarbonDataRDDFactory.loadCarbonData(loadParams.sparkSession.sqlContext, |
| loadParams.carbonLoadModel, |
| loadParams.partitionStatus, |
| isOverwriteTable, |
| loadParams.hadoopConf, |
| None, |
| None, |
| None, |
| operationContext) |
| } |
| (rows, loadResult) |
| } |
| |
| def convertToLogicalRelation( |
| catalogTable: CatalogTable, |
| sizeInBytes: Long, |
| overWrite: Boolean, |
| loadModel: CarbonLoadModel, |
| sparkSession: SparkSession, |
| operationContext: OperationContext, |
| partition: Map[String, Option[String]], |
| optionsOriginal: mutable.Map[String, String], |
| currPartitions: util.List[PartitionSpec]): LogicalRelation = { |
| val table = loadModel.getCarbonDataLoadSchema.getCarbonTable |
| val metastoreSchema = StructType(catalogTable.schema.fields.map { f => |
| val column = table.getColumnByName(f.name) |
| if (column.hasEncoding(Encoding.DICTIONARY)) { |
| f.copy(dataType = IntegerType) |
| } else if (f.dataType == TimestampType || f.dataType == DateType) { |
| f.copy(dataType = LongType) |
| } else { |
| f |
| } |
| }) |
| val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions |
| val catalog = new CatalogFileIndex( |
| sparkSession, catalogTable, sizeInBytes) |
| if (!lazyPruningEnabled) { |
| catalog.filterPartitions(Nil) // materialize all the partitions in memory |
| } |
| var partitionSchema = |
| StructType(table.getPartitionInfo().getColumnSchemaList.asScala.map(field => |
| metastoreSchema.fields.find(_.name.equalsIgnoreCase(field.getColumnName))).map(_.get)) |
| val dataSchema = |
| StructType(metastoreSchema |
| .filterNot(field => partitionSchema.contains(field))) |
| if (partition.nonEmpty) { |
| partitionSchema = StructType(partitionSchema.fields.map(_.copy(dataType = StringType))) |
| } |
| val options = new mutable.HashMap[String, String]() |
| options ++= catalogTable.storage.properties |
| options += (("overwrite", overWrite.toString)) |
| if (partition.nonEmpty) { |
| val staticPartitionStr = ObjectSerializationUtil.convertObjectToString( |
| new util.HashMap[String, Boolean]( |
| partition.map { case (col, value) => (col.toLowerCase, value.isDefined) }.asJava)) |
| options += (("staticpartition", staticPartitionStr)) |
| } |
| options ++= optionsOriginal |
| if (currPartitions != null) { |
| val currPartStr = ObjectSerializationUtil.convertObjectToString(currPartitions) |
| options += (("currentpartition", currPartStr)) |
| } |
| if (loadModel.getSegmentId != null) { |
| val currLoadEntry = |
| ObjectSerializationUtil.convertObjectToString(loadModel.getCurrentLoadMetadataDetail) |
| options += (("currentloadentry", currLoadEntry)) |
| } |
| val hdfsRelation = HadoopFsRelation( |
| location = catalog, |
| partitionSchema = partitionSchema, |
| dataSchema = dataSchema, |
| bucketSpec = catalogTable.bucketSpec, |
| fileFormat = new SparkCarbonTableFormat, |
| options = options.toMap)(sparkSession = sparkSession) |
| |
| CarbonReflectionUtils.getLogicalRelation(hdfsRelation, |
| hdfsRelation.schema.toAttributes, |
| Some(catalogTable), |
| false) |
| } |
| |
| |
| override protected def opName: String = { |
| if (isOverwriteTable) { |
| "LOAD DATA OVERWRITE" |
| } else { |
| "LOAD DATA" |
| } |
| } |
| } |