blob: 16d90efe71863cb4ddd93d5b6452b0f79a978c6a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command.mutation
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.command.management.CarbonInsertIntoWithDf
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.strategy.MixedFormatHandler
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{ArrayType, LongType}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.AlterTableUtil
import scala.collection.JavaConverters._
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.features.TableOperation
import org.apache.carbondata.core.index.Segment
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.events.{OperationContext, OperationListenerBus, UpdateTablePostEvent, UpdateTablePreEvent}
import org.apache.carbondata.processing.loading.FailureCauses
import org.apache.carbondata.view.MVManagerInSpark
private[sql] case class CarbonProjectForUpdateCommand(
plan: LogicalPlan,
databaseNameOp: Option[String],
tableName: String,
columns: List[String])
extends DataCommand {
override val output: Seq[Attribute] = {
Seq(AttributeReference("Updated Row Count", LongType, nullable = false)())
}
override def processData(sparkSession: SparkSession): Seq[Row] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
var updatedRowCount = 0L
IUDCommonUtil.checkIfSegmentListIsSet(sparkSession, plan)
val res = plan find {
case relation: LogicalRelation if relation.relation
.isInstanceOf[CarbonDatasourceHadoopRelation] =>
true
case _ => false
}
if (res.isEmpty) {
return Array(Row(updatedRowCount)).toSeq
}
val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
setAuditTable(carbonTable)
setAuditInfo(Map("plan" -> plan.simpleString))
// Do not allow spatial index and its source columns to be updated.
AlterTableUtil.validateColumnsWithSpatialIndexProperties(carbonTable, columns)
columns.foreach { col =>
val dataType = carbonTable.getColumnByName(col).getColumnSchema.getDataType
if (dataType.isComplexType) {
throw new UnsupportedOperationException("Unsupported operation on Complex data type")
}
}
if (!carbonTable.getTableInfo.isTransactionalTable) {
throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
}
if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) {
throw new ConcurrentOperationException(carbonTable, "loading", "data update")
}
if (!carbonTable.canAllow(carbonTable, TableOperation.UPDATE)) {
throw new MalformedCarbonCommandException(
"update operation is not supported for index")
}
// Block the update operation for non carbon formats
if (MixedFormatHandler.otherFormatSegmentsExist(carbonTable.getMetadataPath)) {
throw new MalformedCarbonCommandException(
s"Unsupported update operation on table containing mixed format segments")
}
// trigger event for Update table
val operationContext = new OperationContext
val updateTablePreEvent: UpdateTablePreEvent =
UpdateTablePreEvent(sparkSession, carbonTable)
operationContext.setProperty("isLoadOrCompaction", false)
OperationListenerBus.getInstance.fireEvent(updateTablePreEvent, operationContext)
val metadataLock = CarbonLockFactory
.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
LockUsage.METADATA_LOCK)
val compactionLock = CarbonLockFactory.getCarbonLockObj(carbonTable
.getAbsoluteTableIdentifier, LockUsage.COMPACTION_LOCK)
val updateLock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
LockUsage.UPDATE_LOCK)
var lockStatus = false
// get the current time stamp which should be same for delete and update.
val currentTime = CarbonUpdateUtil.readCurrentTime
// var dataFrame: DataFrame = null
var dataSet: DataFrame = null
val isPersistEnabled = CarbonProperties.getInstance.isPersistUpdateDataset
try {
lockStatus = metadataLock.lockWithRetries()
if (lockStatus) {
logInfo("Successfully able to get the table metadata file lock")
}
else {
throw new Exception("Table is locked for update. Please try after some time")
}
val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
if (updateLock.lockWithRetries(3, 3)) {
if (compactionLock.lockWithRetries(3, 3)) {
// Get RDD.
dataSet = if (isPersistEnabled) {
Dataset.ofRows(sparkSession, plan).persist(StorageLevel.fromString(
CarbonProperties.getInstance.getUpdateDatasetStorageLevel()))
}
else {
Dataset.ofRows(sparkSession, plan)
}
if (CarbonProperties.isUniqueValueCheckEnabled) {
// If more than one value present for the update key, should fail the update
val ds = dataSet.select(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)
.groupBy(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)
.count()
.select("count")
.filter(col("count") > lit(1))
.limit(1)
.collect()
// tupleId represents the source rows that are going to get replaced.
// If same tupleId appeared more than once means key has more than one value to replace.
// which is undefined behavior.
if (ds.length > 0 && ds(0).getLong(0) > 1) {
throw new UnsupportedOperationException(
" update cannot be supported for 1 to N mapping, as more than one value present " +
"for the update key")
}
}
// handle the clean up of IUD.
CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
// do delete operation.
val (segmentsToBeDeleted, updatedRowCountTemp) = DeleteExecution.deleteDeltaExecution(
databaseNameOp,
tableName,
sparkSession,
dataSet.rdd,
currentTime + "",
isUpdateOperation = true,
executionErrors)
if (executionErrors.failureCauses != FailureCauses.NONE) {
throw new Exception(executionErrors.errorMsg)
}
updatedRowCount = updatedRowCountTemp
// do update operation.
performUpdate(dataSet,
databaseNameOp,
tableName,
plan,
sparkSession,
currentTime,
executionErrors,
segmentsToBeDeleted)
// pre-priming for update command
DeleteExecution.reloadDistributedSegmentCache(carbonTable,
segmentsToBeDeleted, operationContext)(sparkSession)
} else {
throw new ConcurrentOperationException(carbonTable, "compaction", "update")
}
} else {
throw new ConcurrentOperationException(carbonTable, "update/delete", "update")
}
if (executionErrors.failureCauses != FailureCauses.NONE) {
throw new Exception(executionErrors.errorMsg)
}
// Do IUD Compaction.
HorizontalCompaction.tryHorizontalCompaction(
sparkSession, carbonTable, isUpdateOperation = true)
// Truncate materialized views on the current table.
val viewManager = MVManagerInSpark.get(sparkSession)
val viewSchemas = viewManager.getSchemasOnTable(carbonTable)
if (!viewSchemas.isEmpty) {
viewManager.onTruncate(viewSchemas)
}
// trigger event for Update table
val updateTablePostEvent: UpdateTablePostEvent =
UpdateTablePostEvent(sparkSession, carbonTable)
OperationListenerBus.getInstance.fireEvent(updateTablePostEvent, operationContext)
} catch {
case e: HorizontalCompactionException =>
LOGGER.error(
"Update operation passed. Exception in Horizontal Compaction. Please check logs." + e)
// In case of failure , clean all related delta files
CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString)
case e: Exception =>
LOGGER.error("Exception in update operation", e)
// ****** start clean up.
// In case of failure , clean all related delete delta files
CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, currentTime + "")
// *****end clean up.
if (null != e.getMessage) {
sys.error("Update operation failed. " + e.getMessage)
}
if (null != e.getCause && null != e.getCause.getMessage) {
sys.error("Update operation failed. " + e.getCause.getMessage)
}
sys.error("Update operation failed. please check logs.")
} finally {
if (null != dataSet && isPersistEnabled) {
dataSet.unpersist()
}
updateLock.unlock()
compactionLock.unlock()
if (lockStatus) {
CarbonLockUtil.fileUnlock(metadataLock, LockUsage.METADATA_LOCK)
}
}
Seq(Row(updatedRowCount))
}
private def performUpdate(
dataFrame: Dataset[Row],
databaseNameOp: Option[String],
tableName: String,
plan: LogicalPlan,
sparkSession: SparkSession,
currentTime: Long,
executorErrors: ExecutionErrors,
deletedSegments: Seq[Segment]): Unit = {
def isDestinationRelation(relation: CarbonDatasourceHadoopRelation): Boolean = {
val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
(databaseNameOp.isDefined &&
databaseNameOp.get == dbName &&
tableName == relation.identifier.getCarbonTableIdentifier.getTableName) ||
(tableName == relation.identifier.getCarbonTableIdentifier.getTableName)
}
// from the dataFrame schema iterate through all the column to be updated and
// check for the data type, if the data type is complex then throw exception
def checkForUnsupportedDataType(dataFrame: DataFrame): Unit = {
dataFrame.schema.foreach(col => {
// the new column to be updated will be appended with "-updatedColumn" suffix
if (col.name.endsWith(CarbonCommonConstants.UPDATED_COL_EXTENSION) &&
col.dataType.isInstanceOf[ArrayType]) {
throw new UnsupportedOperationException("Unsupported data type: Array")
}
})
}
def getHeader(relation: CarbonDatasourceHadoopRelation, plan: LogicalPlan): String = {
var header = ""
var found = false
plan match {
case Project(pList, _) if (!found) =>
found = true
header = pList
.filter(field => !field.name
.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
.map(col => if (col.name.endsWith(CarbonCommonConstants.UPDATED_COL_EXTENSION)) {
col.name
.substring(0, col.name.lastIndexOf(CarbonCommonConstants.UPDATED_COL_EXTENSION))
}
else {
col.name
}).mkString(",")
}
header
}
// check for the data type of the new value to be updated
checkForUnsupportedDataType(dataFrame)
val ex = dataFrame.queryExecution.analyzed
val res = ex find {
case relation: LogicalRelation
if relation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
isDestinationRelation(relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]) =>
true
case _ => false
}
val carbonRelation: CarbonDatasourceHadoopRelation = res match {
case Some(relation: LogicalRelation) =>
relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
case _ => sys.error("")
}
val updateTableModel = UpdateTableModel(true, currentTime, executorErrors, deletedSegments)
val header = getHeader(carbonRelation, plan)
CarbonInsertIntoWithDf(
databaseNameOp = Some(carbonRelation.identifier.getCarbonTableIdentifier.getDatabaseName),
tableName = carbonRelation.identifier.getCarbonTableIdentifier.getTableName,
options = Map(("fileheader" -> header)),
isOverwriteTable = false,
dataFrame = dataFrame,
updateModel = Some(updateTableModel)).process(sparkSession)
executorErrors.errorMsg = updateTableModel.executorErrors.errorMsg
executorErrors.failureCauses = updateTableModel.executorErrors.failureCauses
}
override protected def opName: String = "UPDATE DATA"
}