blob: 6b8f49b72f02e3b2bf0516481b750b72fd863dd7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command.management
import java.util
import scala.collection.JavaConverters._
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSession}
import org.apache.spark.sql.execution.command.{DataLoadTableFileMapping, UpdateTableModel}
import org.apache.spark.util.CausedBy
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.schema.table.TableInfo
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.OperationContext
import org.apache.carbondata.events.exception.PreEventException
import org.apache.carbondata.processing.loading.TableProcessingOperations
import org.apache.carbondata.processing.loading.exception.NoRetryException
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.util.CarbonLoaderUtil
import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
/*
* insert into with df, doesn't use logical plan
*
*/
case class CarbonInsertIntoWithDf(databaseNameOp: Option[String],
tableName: String,
options: Map[String, String],
isOverwriteTable: Boolean,
var dataFrame: DataFrame,
var updateModel: Option[UpdateTableModel] = None,
var tableInfoOp: Option[TableInfo] = None,
var internalOptions: Map[String, String] = Map.empty,
var partition: Map[String, Option[String]] = Map.empty,
var operationContext: OperationContext = new OperationContext) {
def process(sparkSession: SparkSession): Seq[Row] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
ThreadLocalSessionInfo
.setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
val (sizeInBytes, table, dbName, logicalPartitionRelation, finalPartition) = CommonLoadUtils
.processMetadataCommon(
sparkSession,
databaseNameOp,
tableName,
tableInfoOp,
partition)
val hadoopConf = sparkSession.sessionState.newHadoopConf()
CarbonProperties.getInstance().addProperty("zookeeper.enable.lock", "false")
val factPath = ""
val currPartitions = CommonLoadUtils.getCurrentParitions(sparkSession, table)
CommonLoadUtils.setNumberOfCoresWhileLoading(sparkSession)
val optionsFinal: util.Map[String, String] =
CommonLoadUtils.getFinalLoadOptions(table, options)
val carbonLoadModel: CarbonLoadModel = CommonLoadUtils.prepareLoadModel(
hadoopConf,
factPath,
optionsFinal,
parentTablePath = null,
table = table,
isDataFrame = true,
internalOptions = internalOptions,
partition = finalPartition,
options = options)
val (timeStampFormat, dateFormat) = CommonLoadUtils.getTimeAndDateFormatFromLoadModel(
carbonLoadModel)
// Delete stale segment folders that are not in table status but are physically present in
// the Fact folder
LOGGER.info(s"Deleting stale folders if present for table $dbName.$tableName")
TableProcessingOperations.deletePartialLoadDataIfExist(table, false)
var isUpdateTableStatusRequired = false
val uuid = ""
try {
val (tableDataMaps, dataMapOperationContext) =
CommonLoadUtils.firePreLoadEvents(
sparkSession = sparkSession,
carbonLoadModel = carbonLoadModel,
uuid = uuid,
factPath = factPath,
optionsFinal = optionsFinal,
options = options.asJava,
isOverwriteTable = isOverwriteTable,
isDataFrame = true,
updateModel = updateModel,
operationContext = operationContext)
// Clean up the old invalid segment data before creating a new entry for new load.
SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false, currPartitions)
// add the start entry for the new load in the table status file
if (updateModel.isEmpty && !table.isHivePartitionTable) {
CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
carbonLoadModel,
isOverwriteTable)
isUpdateTableStatusRequired = true
}
if (isOverwriteTable) {
LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress")
}
// Create table and metadata folders if not exist
if (carbonLoadModel.isCarbonTransactionalTable) {
val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath)
if (!FileFactory.isFileExist(metadataDirectoryPath)) {
FileFactory.mkdirs(metadataDirectoryPath)
}
} else {
carbonLoadModel.setSegmentId(System.nanoTime().toString)
}
val partitionStatus = SegmentStatus.SUCCESS
val loadParams = CarbonLoadParams(sparkSession,
tableName,
sizeInBytes,
isOverwriteTable,
carbonLoadModel,
hadoopConf,
logicalPartitionRelation,
dateFormat,
timeStampFormat,
options,
finalPartition,
currPartitions,
partitionStatus,
Some(dataFrame),
None,
updateModel,
operationContext)
LOGGER.info("Sort Scope : " + carbonLoadModel.getSortScope)
val (rows, loadResult) = insertData(loadParams)
val info = CommonLoadUtils.makeAuditInfo(loadResult)
CommonLoadUtils.firePostLoadEvents(sparkSession,
carbonLoadModel,
tableDataMaps,
dataMapOperationContext,
table,
operationContext)
} catch {
case CausedBy(ex: NoRetryException) =>
// update the load entry in table status file for changing the status to marked for delete
if (isUpdateTableStatusRequired) {
CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
}
LOGGER.error(s"Dataload failure for $dbName.$tableName", ex)
throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}")
// In case of event related exception
case preEventEx: PreEventException =>
LOGGER.error(s"Dataload failure for $dbName.$tableName", preEventEx)
throw new AnalysisException(preEventEx.getMessage)
case ex: Exception =>
LOGGER.error(ex)
// update the load entry in table status file for changing the status to marked for delete
if (isUpdateTableStatusRequired) {
CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
}
throw ex
}
Seq.empty
}
def insertData(loadParams: CarbonLoadParams): (Seq[Row], LoadMetadataDetails) = {
var rows = Seq.empty[Row]
val loadDataFrame = if (updateModel.isDefined) {
Some(CommonLoadUtils.getDataFrameWithTupleID(Some(dataFrame)))
} else {
Some(dataFrame)
}
val table = loadParams.carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
var loadResult : LoadMetadataDetails = null
loadParams.dataFrame = loadDataFrame
if (table.isHivePartitionTable) {
rows = CommonLoadUtils.loadDataWithPartition(loadParams)
} else {
loadResult = CarbonDataRDDFactory.loadCarbonData(loadParams.sparkSession.sqlContext,
loadParams.carbonLoadModel,
loadParams.partitionStatus,
isOverwriteTable,
loadParams.hadoopConf,
loadDataFrame,
None,
updateModel,
operationContext)
}
(rows, loadResult)
}
}