blob: a87042a288423605a71c8131523d942c8eb39423 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command.management
import java.text.SimpleDateFormat
import java.util
import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, CarbonEnv, CarbonToSparkAdapter, DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.command.AtomicRunnableCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.util.CausedBy
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.converter.SparkDataTypeConverterImpl
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, ColumnSchema}
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.exception.PreEventException
import org.apache.carbondata.events.OperationContext
import org.apache.carbondata.processing.loading.TableProcessingOperations
import org.apache.carbondata.processing.loading.exception.NoRetryException
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.util.CarbonLoaderUtil
import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
import org.apache.carbondata.spark.util.CarbonScalaUtil
/*
* insert into without df, by just using logical plan
*
*/
case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
tableName: String,
options: Map[String, String],
isOverwriteTable: Boolean,
var logicalPlan: LogicalPlan,
var tableInfo: TableInfo,
var internalOptions: Map[String, String] = Map.empty,
var partition: Map[String, Option[String]] = Map.empty,
var operationContext: OperationContext = new OperationContext)
extends AtomicRunnableCommand {
var table: CarbonTable = _
var logicalPartitionRelation: LogicalRelation = _
var sizeInBytes: Long = _
var currPartitions: util.List[PartitionSpec] = _
var parentTablePath: String = _
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
var scanResultRdd: RDD[InternalRow] = _
var timeStampFormat: SimpleDateFormat = _
var dateFormat: SimpleDateFormat = _
var finalPartition: Map[String, Option[String]] = Map.empty
var isInsertIntoWithConverterFlow: Boolean = false
var dataFrame: DataFrame = _
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
if (tableInfo == null) {
throw new RuntimeException(" table info must be present when logical relation exist")
}
// If logical plan is unresolved, need to convert it to resolved.
dataFrame = Dataset.ofRows(sparkSession, logicalPlan)
logicalPlan = dataFrame.queryExecution.analyzed
if (CarbonProperties.isBadRecordHandlingEnabledForInsert) {
// use old converter flow
isInsertIntoWithConverterFlow = true
return Seq.empty
}
setAuditTable(CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession), tableName)
ThreadLocalSessionInfo
.setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
val (sizeInBytes, table, dbName, logicalPartitionRelation, finalPartition) = CommonLoadUtils
.processMetadataCommon(
sparkSession,
databaseNameOp,
tableName,
Some(tableInfo),
partition)
this.sizeInBytes = sizeInBytes
this.table = table
this.logicalPartitionRelation = logicalPartitionRelation
this.finalPartition = finalPartition
setAuditTable(dbName, tableName)
Seq.empty
}
override def processData(sparkSession: SparkSession): Seq[Row] = {
if (isInsertIntoWithConverterFlow) {
return CarbonInsertIntoWithDf(
databaseNameOp = databaseNameOp,
tableName = tableName,
options = options,
isOverwriteTable = isOverwriteTable,
dataFrame = dataFrame,
updateModel = None,
tableInfoOp = Some(tableInfo),
internalOptions = internalOptions,
partition = partition).process(sparkSession)
}
val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
val hadoopConf = sparkSession.sessionState.newHadoopConf()
CarbonProperties.getInstance().addProperty("zookeeper.enable.lock", "false")
val factPath = ""
currPartitions = CommonLoadUtils.getCurrentPartitions(sparkSession, table)
CommonLoadUtils.setNumberOfCoresWhileLoading(sparkSession)
val optionsFinal: util.Map[String, String] = CommonLoadUtils.getFinalLoadOptions(table, options)
val carbonLoadModel: CarbonLoadModel = CommonLoadUtils.prepareLoadModel(
hadoopConf = hadoopConf,
factPath = factPath,
optionsFinal = optionsFinal,
parentTablePath = parentTablePath,
table = table,
isDataFrame = true,
internalOptions = internalOptions,
partition = partition,
options = options)
val (tf, df) = CommonLoadUtils.getTimeAndDateFormatFromLoadModel(carbonLoadModel)
timeStampFormat = tf
dateFormat = df
val partitionInfo = tableInfo.getFactTable.getPartitionInfo
val partitionColumnSchema =
if (partitionInfo != null && partitionInfo.getColumnSchemaList.size() != 0) {
partitionInfo.getColumnSchemaList.asScala
} else {
null
}
val convertedStaticPartition = getConvertedStaticPartitionMap(partitionColumnSchema)
val (reArrangedIndex, selectedColumnSchema) = getReArrangedIndexAndSelectedSchema(tableInfo,
partitionColumnSchema,
carbonLoadModel)
val newLogicalPlan = getReArrangedLogicalPlan(
reArrangedIndex,
selectedColumnSchema,
convertedStaticPartition)
scanResultRdd = sparkSession.sessionState.executePlan(newLogicalPlan).toRdd
if (logicalPartitionRelation != null) {
if (selectedColumnSchema.length != logicalPartitionRelation.output.length) {
throw new RuntimeException(" schema length doesn't match partition length")
}
val isNotReArranged = selectedColumnSchema.zipWithIndex.exists {
case (cs, i) => !cs.getColumnName.equals(logicalPartitionRelation.output(i).name)
}
if (isNotReArranged) {
// Re-arrange the catalog table schema and output for partition relation
logicalPartitionRelation =
getReArrangedSchemaLogicalRelation(reArrangedIndex, logicalPartitionRelation)
}
}
// Delete stale segment folders that are not in table status but are physically present in
// the Fact folder
LOGGER.info(s"Deleting stale folders if present for table $dbName.$tableName")
TableProcessingOperations.deletePartialLoadDataIfExist(table, false)
var isUpdateTableStatusRequired = false
val uuid = ""
try {
val (tableIndexes, indexOperationContext) =
CommonLoadUtils.firePreLoadEvents(
sparkSession = sparkSession,
carbonLoadModel = carbonLoadModel,
uuid = uuid,
factPath = factPath,
optionsFinal = optionsFinal,
options = options.asJava,
isOverwriteTable = isOverwriteTable,
isDataFrame = true,
updateModel = None,
operationContext = operationContext)
// add the start entry for the new load in the table status file
if (!table.isHivePartitionTable) {
CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
carbonLoadModel,
isOverwriteTable)
isUpdateTableStatusRequired = true
}
if (isOverwriteTable) {
LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress")
}
// Create table and metadata folders if not exist
if (carbonLoadModel.isCarbonTransactionalTable) {
val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath)
if (!FileFactory.isFileExist(metadataDirectoryPath)) {
FileFactory.mkdirs(metadataDirectoryPath)
}
} else {
carbonLoadModel.setSegmentId(System.nanoTime().toString)
}
val partitionStatus = SegmentStatus.SUCCESS
val loadParams = CarbonLoadParams(sparkSession,
tableName,
sizeInBytes,
isOverwriteTable,
carbonLoadModel,
hadoopConf,
logicalPartitionRelation,
dateFormat,
timeStampFormat,
options,
finalPartition,
currPartitions,
partitionStatus,
None,
Some(scanResultRdd),
None,
operationContext)
LOGGER.info("Sort Scope : " + carbonLoadModel.getSortScope)
val (rows, loadResult) = insertData(loadParams)
val info = CommonLoadUtils.makeAuditInfo(loadResult)
setAuditInfo(info)
CommonLoadUtils.firePostLoadEvents(sparkSession,
carbonLoadModel,
tableIndexes,
indexOperationContext,
table,
operationContext)
} catch {
case CausedBy(ex: NoRetryException) =>
// update the load entry in table status file for changing the status to marked for delete
if (isUpdateTableStatusRequired) {
CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
}
LOGGER.error(s"Dataload failure for $dbName.$tableName", ex)
throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}")
// In case of event related exception
case preEventEx: PreEventException =>
LOGGER.error(s"Dataload failure for $dbName.$tableName", preEventEx)
throw new AnalysisException(preEventEx.getMessage)
case ex: Exception =>
LOGGER.error(ex)
// update the load entry in table status file for changing the status to marked for delete
if (isUpdateTableStatusRequired) {
CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
}
throw ex
}
Seq.empty
}
private def isAlteredSchema(tableSchema: TableSchema): Boolean = {
if (tableInfo.getFactTable.getSchemaEvolution != null) {
tableInfo
.getFactTable
.getSchemaEvolution
.getSchemaEvolutionEntryList.asScala.exists(entry =>
(entry.getAdded != null && entry.getAdded.size() > 0) ||
(entry.getRemoved != null && entry.getRemoved.size() > 0)
)
} else {
false
}
}
def getReArrangedLogicalPlan(
reArrangedIndex: Seq[Int],
selectedColumnSchema: Seq[ColumnSchema],
convertedStaticPartition: mutable.Map[String, AnyRef]): LogicalPlan = {
var processedProject: Boolean = false
// check first node is the projection or not
logicalPlan match {
case _: Project =>
// project is already present as first node
case _ =>
// If project is not present, add the projection to re-arrange it
logicalPlan = Project(logicalPlan.output, logicalPlan)
}
// Re-arrange the project as per columnSchema
val newLogicalPlan = logicalPlan.transformDown {
case p: Project if !processedProject =>
var oldProjectionList = p.projectList
if (partition.nonEmpty) {
// partition keyword is present in insert and
// select query partition projections may not be same as create order.
// So, bring to create table order
val dynamicPartition = partition.filterNot(entry => entry._2.isDefined)
var index = 0
val map = mutable.Map[String, Int]()
for (part <- dynamicPartition) {
map(part._1) = index
index = index + 1
}
var tempList = oldProjectionList.take(oldProjectionList.size - dynamicPartition.size)
val partitionList = oldProjectionList.takeRight(dynamicPartition.size)
val partitionSchema = table.getPartitionInfo.getColumnSchemaList.asScala
for (partitionCol <- partitionSchema) {
if (map.get(partitionCol.getColumnName).isDefined) {
tempList = tempList :+ partitionList(map(partitionCol.getColumnName))
}
}
oldProjectionList = tempList
}
if (reArrangedIndex.size != oldProjectionList.size) {
// for non-partition table columns must match
if (partition.isEmpty) {
throw new AnalysisException(
s"Cannot insert into table $tableName because the number of columns are " +
s"different: " +
s"need ${ reArrangedIndex.size } columns, " +
s"but query has ${ oldProjectionList.size } columns.")
} else {
if (reArrangedIndex.size - oldProjectionList.size !=
convertedStaticPartition.size) {
throw new AnalysisException(
s"Cannot insert into table $tableName because the number of columns are " +
s"different: need ${ reArrangedIndex.size } columns, " +
s"but query has ${ oldProjectionList.size } columns.")
} else {
// TODO: For partition case, remaining projections need to validate ?
}
}
}
var newProjectionList: Seq[NamedExpression] = Seq.empty
var i = 0
while (i < reArrangedIndex.size) {
// column schema is already has sortColumns-dimensions-measures. Collect the ordinal &
// re-arrange the projection in the same order
if (partition.nonEmpty &&
convertedStaticPartition.contains(selectedColumnSchema(i).getColumnName
.toLowerCase())) {
// If column schema present in partitionSchema means it is a static partition,
// then add a value literal expression in the project.
val value = convertedStaticPartition(selectedColumnSchema(i).getColumnName
.toLowerCase())
newProjectionList = newProjectionList :+
CarbonToSparkAdapter.createAliasRef(
new Literal(value,
SparkDataTypeConverterImpl.convertCarbonToSparkDataType(
selectedColumnSchema(i).getDataType)),
value.toString,
NamedExpression.newExprId).asInstanceOf[NamedExpression]
} else {
// If column schema NOT present in partition column,
// get projection column mapping its ordinal.
if (partition.contains(selectedColumnSchema(i).getColumnName.toLowerCase())) {
// static partition + dynamic partition case,
// here dynamic partition ordinal will be more than projection size
newProjectionList = newProjectionList :+
oldProjectionList(
reArrangedIndex(i) - convertedStaticPartition.size)
} else {
newProjectionList = newProjectionList :+
oldProjectionList(reArrangedIndex(i))
}
}
i = i + 1
}
processedProject = true
Project(newProjectionList, p.child)
}
newLogicalPlan
}
private def getConvertedStaticPartitionMap(partitionColumnSchema: mutable.Buffer[ColumnSchema])
: mutable.Map[String, AnyRef] = {
val convertedStaticPartition = mutable.Map[String, AnyRef]()
// setting a converter will remove the thread local entries of previous load configurations.
DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
if (partition.nonEmpty) {
for (col <- partitionColumnSchema) {
if (partition(col.getColumnName.toLowerCase).isDefined) {
convertedStaticPartition(col.getColumnName.toLowerCase) =
CarbonScalaUtil.convertStaticPartitionToValues(
partition(col.getColumnName.toLowerCase).get,
SparkDataTypeConverterImpl.convertCarbonToSparkDataType(col.getDataType),
timeStampFormat,
dateFormat)
}
}
}
convertedStaticPartition
}
def getReArrangedSchemaLogicalRelation(reArrangedIndex: Seq[Int],
logicalRelation: LogicalRelation): LogicalRelation = {
// rearrange the schema in catalog table and output attributes of logical relation
if (reArrangedIndex.size != logicalRelation.schema.size) {
throw new AnalysisException(
s"Cannot insert into table $tableName because the number of columns are different: " +
s"need ${ reArrangedIndex.size } columns, " +
s"but query has ${ logicalRelation.schema.size } columns.")
}
val reArrangedFields = new Array[StructField](logicalRelation.schema.size)
val reArrangedAttributes = new Array[AttributeReference](logicalRelation.schema.size)
val fields = logicalRelation.schema.fields
val output = logicalRelation.output
var i = 0
for (index <- reArrangedIndex) {
// rearranged schema
reArrangedFields(i) = fields(index)
// rearranged output attributes of logical relation
reArrangedAttributes(i) = output(index)
i = i + 1
}
// update the new schema and output attributes
val catalogTable = logicalRelation.catalogTable
.get
.copy(schema = new StructType(reArrangedFields))
logicalRelation.copy(logicalRelation.relation,
reArrangedAttributes,
Some(catalogTable))
}
def insertData(loadParams: CarbonLoadParams): (Seq[Row], LoadMetadataDetails) = {
var rows = Seq.empty[Row]
val table = loadParams.carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
var loadResult : LoadMetadataDetails = null
if (table.isHivePartitionTable) {
rows = CommonLoadUtils.loadDataWithPartition(loadParams)
} else {
loadResult = CarbonDataRDDFactory.loadCarbonData(loadParams.sparkSession.sqlContext,
loadParams.carbonLoadModel,
loadParams.partitionStatus,
isOverwriteTable,
loadParams.hadoopConf,
None,
loadParams.scanResultRDD,
None,
operationContext)
}
(rows, loadResult)
}
override protected def opName: String = {
if (isOverwriteTable) {
"INSERT OVERWRITE"
} else {
"INSERT INTO"
}
}
def getReArrangedIndexAndSelectedSchema(
tableInfo: TableInfo,
partitionColumnSchema: mutable.Buffer[ColumnSchema],
carbonLoadModel: CarbonLoadModel): (Seq[Int], Seq[ColumnSchema]) = {
var reArrangedIndex: Seq[Int] = Seq()
var selectedColumnSchema: Seq[ColumnSchema] = Seq()
var partitionIndex: Seq[Int] = Seq()
val properties = tableInfo.getFactTable.getTableProperties.asScala
val spatialProperty = properties.get(CarbonCommonConstants.SPATIAL_INDEX)
// internal order ColumnSchema (non-flat structure)
val columnSchema = (table.getVisibleDimensions.asScala ++
table.getVisibleMeasures.asScala).map(_.getColumnSchema)
val partitionColumnNames = if (partitionColumnSchema != null) {
partitionColumnSchema.map(x => x.getColumnName).toSet
} else {
null
}
var createOrderColumns = table.getCreateOrderColumn.asScala
val createOrderMap = mutable.Map[String, Int]()
if (partitionColumnNames != null && isAlteredSchema(tableInfo.getFactTable)) {
// For alter table drop/add column scenarios, partition column may not be in the end.
// Need to keep it in the end.
createOrderColumns = createOrderColumns.filterNot(col =>
partitionColumnNames.contains(col.getColumnSchema.getColumnName)) ++
createOrderColumns.filter(col =>
partitionColumnNames.contains(col.getColumnSchema.getColumnName))
}
createOrderColumns.zipWithIndex.map {
case (col, index) => createOrderMap.put(col.getColName, index)
}
columnSchema.foreach {
col =>
if (spatialProperty.isDefined &&
col.getColumnName.equalsIgnoreCase(spatialProperty.get.trim)) {
carbonLoadModel.setNonSchemaColumnsPresent(true)
}
var skipPartitionColumn = false
if (partitionColumnNames != null &&
partitionColumnNames.contains(col.getColumnName)) {
partitionIndex = partitionIndex :+ createOrderMap(col.getColumnName)
skipPartitionColumn = true
} else {
reArrangedIndex = reArrangedIndex :+ createOrderMap(col.getColumnName)
}
if (!skipPartitionColumn) {
selectedColumnSchema = selectedColumnSchema :+ col
}
}
if (partitionColumnSchema != null) {
// keep partition columns in the end
selectedColumnSchema = selectedColumnSchema ++ partitionColumnSchema
}
if (partitionIndex.nonEmpty) {
// keep partition columns in the end and in the original create order
reArrangedIndex = reArrangedIndex ++ partitionIndex.sortBy(x => x)
}
(reArrangedIndex, selectedColumnSchema)
}
}