blob: 983e6b406c7b4b1e27a625599dc3e170747b30e8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command
import scala.collection.JavaConverters._
import scala.language.implicitConversions
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.processing.util.Auditor
import org.apache.carbondata.spark.exception.ProcessMetaDataException
object Checker {
def validateTableExists(
dbName: Option[String],
tableName: String,
session: SparkSession): Unit = {
val database = dbName.getOrElse(session.catalog.currentDatabase)
val identifier = TableIdentifier(tableName, dbName)
if (!CarbonEnv.getInstance(session).carbonMetaStore.tableExists(identifier)(session)) {
val err = s"table $dbName.$tableName not found"
LogServiceFactory.getLogService(this.getClass.getName).error(err)
throw new NoSuchTableException(database, tableName)
}
}
}
/**
* Operation that modifies metadata(schema, table_status, etc)
*/
trait MetadataProcessOperation {
def processMetadata(sparkSession: SparkSession): Seq[Row]
// call this to throw exception when processMetadata failed
def throwMetadataException(dbName: String, tableName: String, msg: String): Unit = {
throw new ProcessMetaDataException(dbName, tableName, msg)
}
}
/**
* Operation that process data
*/
trait DataProcessOperation {
def processData(sparkSession: SparkSession): Seq[Row]
}
/**
* An utility that run the command with audit log
*/
trait Auditable {
// operation id that will be written in audit log
private val operationId: String = String.valueOf(System.nanoTime())
// extra info to be written in audit log, set by subclass of AtomicRunnableCommand
var auditInfo: Map[String, String] = _
// holds the dbName and tableName for which this command is executed for
// used for audit log, set by subclass of AtomicRunnableCommand
private var table: String = _
// implement by subclass, return the operation name that record in audit log
protected def opName: String
protected def opTime(startTime: Long) = s"${System.currentTimeMillis() - startTime} ms"
protected def setAuditTable(dbName: String, tableName: String): Unit =
table = s"$dbName.$tableName"
protected def setAuditTable(carbonTable: CarbonTable): Unit =
table = s"${carbonTable.getDatabaseName}.${carbonTable.getTableName}"
protected def setAuditInfo(map: Map[String, String]): Unit = auditInfo = map
/**
* Run the passed command and record the audit log.
* Two audit log will be output, one for operation start another for operation success/failure
* @param runCmd command to run
* @param spark session
* @return command result
*/
protected def runWithAudit(runCmd: (SparkSession => Seq[Row]), spark: SparkSession): Seq[Row] = {
val start = System.currentTimeMillis()
Auditor.logOperationStart(opName, operationId)
val rows = try {
runCmd(spark)
} catch {
case e: Throwable =>
val map = Map("Exception" -> e.getClass.getName, "Message" -> e.getMessage)
Auditor.logOperationEnd(opName, operationId, false, table, opTime(start), map.asJava)
throw e
}
Auditor.logOperationEnd(opName, operationId, true, table, opTime(start),
if (auditInfo != null) auditInfo.asJava else new java.util.HashMap[String, String]())
rows
}
}
/**
* Command that modifies metadata(schema, table_status, etc) only without processing data
*/
abstract class MetadataCommand
extends RunnableCommand with MetadataProcessOperation with Auditable {
override def run(sparkSession: SparkSession): Seq[Row] = {
runWithAudit(processMetadata, sparkSession)
}
}
/**
* Command that process data only without modifying metadata
*/
abstract class DataCommand extends RunnableCommand with DataProcessOperation with Auditable {
override def run(sparkSession: SparkSession): Seq[Row] = {
runWithAudit(processData, sparkSession)
}
}
/**
* Subclass of this command is executed in an atomic fashion, either all or nothing.
* Subclass need to process both metadata and data, processMetadata should be undoable
* if process data failed.
*/
abstract class AtomicRunnableCommand
extends RunnableCommand with MetadataProcessOperation with DataProcessOperation with Auditable {
override def run(sparkSession: SparkSession): Seq[Row] = {
runWithAudit(spark => {
processMetadata(spark)
try {
processData(spark)
} catch {
case e: Exception =>
undoMetadata(spark, e)
throw e
}
}, sparkSession)
}
/**
* Developer should override this function to undo the changes in processMetadata.
* @param sparkSession spark session
* @param exception exception raised when processing data
* @return rows to return to spark
*/
def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = {
val msg = s"Got exception $exception when processing data. " +
s"But this command does not support undo yet, skipping the undo part."
LogServiceFactory.getLogService(this.getClass.getCanonicalName).error(msg)
Seq.empty
}
}