blob: b17969bab858760ee93caa2cef2df744b3225617 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command.management
import java.text.SimpleDateFormat
import java.util
import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping, UpdateTableModel}
import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, LogicalRelation, SparkCarbonTableFormat}
import org.apache.spark.sql.types.{DateType, IntegerType, LongType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils, SparkUtil}
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util._
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.OperationContext
import org.apache.carbondata.events.exception.PreEventException
import org.apache.carbondata.processing.loading.TableProcessingOperations
import org.apache.carbondata.processing.loading.exception.NoRetryException
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
case class CarbonLoadDataCommand(databaseNameOp: Option[String],
tableName: String,
factPathFromUser: String,
dimFilesPath: Seq[DataLoadTableFileMapping],
options: Map[String, String],
isOverwriteTable: Boolean,
partition: Map[String, Option[String]] = Map.empty,
var operationContext: OperationContext = new OperationContext)
extends AtomicRunnableCommand {
var table: CarbonTable = _
var logicalPartitionRelation: LogicalRelation = _
var sizeInBytes: Long = _
var currPartitions: util.List[PartitionSpec] = _
var parentTablePath: String = _
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
var finalPartition: Map[String, Option[String]] = Map.empty
var timeStampFormat: SimpleDateFormat = _
var dateFormat: SimpleDateFormat = _
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
val (sizeInBytes, table, dbName, logicalPartitionRelation, finalPartition) =
CommonLoadUtils.processMetadataCommon(sparkSession,
databaseNameOp,
tableName,
None,
partition)
this.sizeInBytes = sizeInBytes
this.table = table
this.logicalPartitionRelation = logicalPartitionRelation
this.finalPartition = finalPartition
setAuditTable(dbName, tableName)
Seq.empty
}
override def processData(sparkSession: SparkSession): Seq[Row] = {
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
CarbonProperties.getInstance().addProperty("zookeeper.enable.lock", "false")
val factPath = FileUtils.getPaths(factPathFromUser, hadoopConf)
currPartitions = CommonLoadUtils.getCurrentPartitions(sparkSession, table)
CommonLoadUtils.setNumberOfCoresWhileLoading(sparkSession)
val optionsFinal: util.Map[String, String] =
CommonLoadUtils.getFinalLoadOptions(table, options)
val carbonLoadModel: CarbonLoadModel = CommonLoadUtils.prepareLoadModel(
hadoopConf,
factPath,
optionsFinal, parentTablePath, table, isDataFrame = false, Map.empty, finalPartition, options)
val (tf, df) = CommonLoadUtils.getTimeAndDateFormatFromLoadModel(carbonLoadModel)
timeStampFormat = tf
dateFormat = df
// Delete stale segment folders that are not in table status but are physically present in
// the Fact folder
LOGGER.info(s"Deleting stale folders if present for table $dbName.$tableName")
TableProcessingOperations.deletePartialLoadDataIfExist(table, false)
var isUpdateTableStatusRequired = false
val uuid = ""
try {
val (tableIndexes, indexOperationContext) =
CommonLoadUtils.firePreLoadEvents(
sparkSession = sparkSession,
carbonLoadModel = carbonLoadModel,
uuid = uuid,
factPath = factPath,
optionsFinal = optionsFinal,
options = options.asJava,
isOverwriteTable = isOverwriteTable,
isDataFrame = false,
updateModel = None,
operationContext = operationContext)
// Clean up the old invalid segment data before creating a new entry for new load.
SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false, currPartitions)
// add the start entry for the new load in the table status file
if (!table.isHivePartitionTable) {
CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
carbonLoadModel,
isOverwriteTable)
isUpdateTableStatusRequired = true
}
if (isOverwriteTable) {
LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress")
}
// Create table and metadata folders if not exist
if (carbonLoadModel.isCarbonTransactionalTable) {
val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath)
if (!FileFactory.isFileExist(metadataDirectoryPath)) {
FileFactory.mkdirs(metadataDirectoryPath)
}
} else {
carbonLoadModel.setSegmentId(System.nanoTime().toString)
}
val partitionStatus = SegmentStatus.SUCCESS
LOGGER.info("Sort Scope : " + carbonLoadModel.getSortScope)
val loadParams = CarbonLoadParams(sparkSession,
tableName,
sizeInBytes,
isOverwriteTable,
carbonLoadModel,
hadoopConf,
logicalPartitionRelation,
dateFormat,
timeStampFormat,
options,
finalPartition,
currPartitions,
partitionStatus,
None,
None,
None,
operationContext)
val (rows, loadResult) = loadData(loadParams)
val info = CommonLoadUtils.makeAuditInfo(loadResult)
setAuditInfo(info)
CommonLoadUtils.firePostLoadEvents(sparkSession,
carbonLoadModel,
tableIndexes,
indexOperationContext,
table,
operationContext)
} catch {
case CausedBy(ex: NoRetryException) =>
// update the load entry in table status file for changing the status to marked for delete
if (isUpdateTableStatusRequired) {
CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
}
LOGGER.error(s"Dataload failure for $dbName.$tableName", ex)
throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}")
// In case of event related exception
case preEventEx: PreEventException =>
LOGGER.error(s"Dataload failure for $dbName.$tableName", preEventEx)
throw new AnalysisException(preEventEx.getMessage)
case ex: Exception =>
LOGGER.error(ex.getMessage, ex)
// update the load entry in table status file for changing the status to marked for delete
if (isUpdateTableStatusRequired) {
CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
}
throw ex
}
Seq.empty
}
def loadData(loadParams: CarbonLoadParams): (Seq[Row], LoadMetadataDetails) = {
var rows = Seq.empty[Row]
val table = loadParams.carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
var loadResult : LoadMetadataDetails = null
if (table.isHivePartitionTable) {
rows = CommonLoadUtils.loadDataWithPartition(loadParams)
} else {
loadResult = CarbonDataRDDFactory.loadCarbonData(loadParams.sparkSession.sqlContext,
loadParams.carbonLoadModel,
loadParams.partitionStatus,
isOverwriteTable,
loadParams.hadoopConf,
None,
None,
None,
operationContext)
}
(rows, loadResult)
}
def convertToLogicalRelation(
catalogTable: CatalogTable,
sizeInBytes: Long,
overWrite: Boolean,
loadModel: CarbonLoadModel,
sparkSession: SparkSession,
operationContext: OperationContext,
partition: Map[String, Option[String]],
optionsOriginal: mutable.Map[String, String],
currPartitions: util.List[PartitionSpec]): LogicalRelation = {
val table = loadModel.getCarbonDataLoadSchema.getCarbonTable
val metastoreSchema = StructType(catalogTable.schema.fields.map { f =>
val column = table.getColumnByName(f.name)
if (column.hasEncoding(Encoding.DICTIONARY)) {
f.copy(dataType = IntegerType)
} else if (f.dataType == TimestampType || f.dataType == DateType) {
f.copy(dataType = LongType)
} else {
f
}
})
val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions
val catalog = new CatalogFileIndex(
sparkSession, catalogTable, sizeInBytes)
if (!lazyPruningEnabled) {
catalog.filterPartitions(Nil) // materialize all the partitions in memory
}
var partitionSchema =
StructType(table.getPartitionInfo().getColumnSchemaList.asScala.map(field =>
metastoreSchema.fields.find(_.name.equalsIgnoreCase(field.getColumnName))).map(_.get))
val dataSchema =
StructType(metastoreSchema
.filterNot(field => partitionSchema.contains(field)))
if (partition.nonEmpty) {
partitionSchema = StructType(partitionSchema.fields.map(_.copy(dataType = StringType)))
}
val options = new mutable.HashMap[String, String]()
options ++= catalogTable.storage.properties
options += (("overwrite", overWrite.toString))
if (partition.nonEmpty) {
val staticPartitionStr = ObjectSerializationUtil.convertObjectToString(
new util.HashMap[String, Boolean](
partition.map { case (col, value) => (col.toLowerCase, value.isDefined) }.asJava))
options += (("staticpartition", staticPartitionStr))
}
options ++= optionsOriginal
if (currPartitions != null) {
val currPartStr = ObjectSerializationUtil.convertObjectToString(currPartitions)
options += (("currentpartition", currPartStr))
}
if (loadModel.getSegmentId != null) {
val currLoadEntry =
ObjectSerializationUtil.convertObjectToString(loadModel.getCurrentLoadMetadataDetail)
options += (("currentloadentry", currLoadEntry))
}
val hdfsRelation = HadoopFsRelation(
location = catalog,
partitionSchema = partitionSchema,
dataSchema = dataSchema,
bucketSpec = catalogTable.bucketSpec,
fileFormat = new SparkCarbonTableFormat,
options = options.toMap)(sparkSession = sparkSession)
CarbonReflectionUtils.getLogicalRelation(hdfsRelation,
hdfsRelation.schema.toAttributes,
Some(catalogTable),
false)
}
override protected def opName: String = {
if (isOverwriteTable) {
"LOAD DATA OVERWRITE"
} else {
"LOAD DATA"
}
}
}