blob: 6b609edab8f4102d8107881f6e998448596e5660 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command.mutation
import scala.collection.JavaConverters._
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.strategy.MixedFormatHandler
import org.apache.spark.sql.types.LongType
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.datamap.status.DataMapStatusManager
import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.features.TableOperation
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.events.{DeleteFromTablePostEvent, DeleteFromTablePreEvent, IndexServerLoadEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.processing.loading.FailureCauses
/**
* IUD update delete and compaction framework.
*
*/
private[sql] case class CarbonProjectForDeleteCommand(
plan: LogicalPlan,
databaseNameOp: Option[String],
tableName: String,
timestamp: String)
extends DataCommand {
override val output: Seq[Attribute] = {
Seq(AttributeReference("Deleted Row Count", LongType, nullable = false)())
}
override def processData(sparkSession: SparkSession): Seq[Row] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
setAuditTable(carbonTable)
setAuditInfo(Map("plan" -> plan.simpleString))
if (!carbonTable.getTableInfo.isTransactionalTable) {
throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
}
// Block the delete operation for non carbon formats
if (MixedFormatHandler.otherFormatSegmentsExist(carbonTable.getMetadataPath)) {
throw new MalformedCarbonCommandException(
s"Unsupported delete operation on table containing mixed format segments")
}
if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) {
throw new ConcurrentOperationException(carbonTable, "loading", "data delete")
}
if (!carbonTable.canAllow(carbonTable, TableOperation.DELETE)) {
throw new MalformedCarbonCommandException(
"delete operation is not supported for index datamap")
}
IUDCommonUtil.checkIfSegmentListIsSet(sparkSession, plan)
val dataFrame = Dataset.ofRows(sparkSession, plan)
val dataRdd = dataFrame.rdd
// trigger event for Delete from table
val operationContext = new OperationContext
val deleteFromTablePreEvent: DeleteFromTablePreEvent =
DeleteFromTablePreEvent(sparkSession, carbonTable)
OperationListenerBus.getInstance.fireEvent(deleteFromTablePreEvent, operationContext)
val metadataLock = CarbonLockFactory
.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
LockUsage.METADATA_LOCK)
val compactionLock = CarbonLockFactory
.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
LockUsage.COMPACTION_LOCK)
val updateLock = CarbonLockFactory
.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
LockUsage.UPDATE_LOCK)
var lockStatus = false
try {
lockStatus = metadataLock.lockWithRetries()
if (lockStatus) {
if (!compactionLock.lockWithRetries(3, 3)) {
throw new ConcurrentOperationException(carbonTable, "compaction", "delete")
}
if (!updateLock.lockWithRetries(3, 3)) {
throw new ConcurrentOperationException(carbonTable, "update/delete", "delete")
}
LOGGER.info("Successfully able to get the table metadata file lock")
} else {
throw new Exception("Table is locked for deletion. Please try after some time")
}
val executorErrors = ExecutionErrors(FailureCauses.NONE, "")
// handle the clean up of IUD.
CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
val (deletedSegments, deletedRowCount) = DeleteExecution.deleteDeltaExecution(
databaseNameOp,
tableName,
sparkSession,
dataRdd,
timestamp,
isUpdateOperation = false,
executorErrors)
// Check for any failures occured during delete delta execution
if (executorErrors.failureCauses != FailureCauses.NONE) {
throw new Exception(executorErrors.errorMsg)
}
// call IUD Compaction.
HorizontalCompaction.tryHorizontalCompaction(sparkSession, carbonTable,
isUpdateOperation = false)
val allDataMapSchemas = DataMapStoreManager.getInstance
.getDataMapSchemasOfTable(carbonTable).asScala
.filter(dataMapSchema => null != dataMapSchema.getRelationIdentifier &&
!dataMapSchema.isIndexDataMap).asJava
if (!allDataMapSchemas.isEmpty) {
DataMapStatusManager.truncateDataMap(allDataMapSchemas)
}
// prepriming for delete command
DeleteExecution.reloadDistributedSegmentCache(carbonTable,
deletedSegments, operationContext)(sparkSession)
// trigger post event for Delete from table
val deleteFromTablePostEvent: DeleteFromTablePostEvent =
DeleteFromTablePostEvent(sparkSession, carbonTable)
OperationListenerBus.getInstance.fireEvent(deleteFromTablePostEvent, operationContext)
Seq(Row(deletedRowCount))
} catch {
case e: HorizontalCompactionException =>
LOGGER.error("Delete operation passed. Exception in Horizontal Compaction." +
" Please check logs. " + e.getMessage)
CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString)
Seq(Row(0L))
case e: Exception =>
LOGGER.error("Exception in Delete data operation " + e.getMessage, e)
// ****** start clean up.
// In case of failure , clean all related delete delta files
CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
// clean up. Null check is required as for executor error some times message is null
if (null != e.getMessage) {
sys.error("Delete data operation is failed. " + e.getMessage)
}
else {
sys.error("Delete data operation is failed. Please check logs.")
}
} finally {
if (lockStatus) {
CarbonLockUtil.fileUnlock(metadataLock, LockUsage.METADATA_LOCK)
}
updateLock.unlock()
compactionLock.unlock()
}
}
override protected def opName: String = "DELETE DATA"
}