blob: 0c6b5aa4d2104285253abac97cc2d269fd10d69e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive
import scala.collection.JavaConverters._
import org.apache.spark.SPARK_VERSION
import org.apache.spark.sql._
import org.apache.spark.sql.CarbonExpressions.CarbonUnresolvedRelation
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, NamedExpression}
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.execution.command.mutation.CarbonProjectForDeleteCommand
import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, FileFormat, HadoopFsRelation, LogicalRelation, SparkCarbonTableFormat}
import org.apache.spark.sql.util.CarbonException
import org.apache.spark.util.{CarbonReflectionUtils, DataMapUtil, SparkUtil}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.datamap.status.DataMapStatusManager
import org.apache.carbondata.core.util.CarbonUtil
case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[LogicalPlan] {
private lazy val parser = sparkSession.sessionState.sqlParser
private lazy val optimizer = sparkSession.sessionState.optimizer
private lazy val analyzer = sparkSession.sessionState.analyzer
private def processUpdateQuery(
table: UnresolvedRelation,
columns: List[String],
selectStmt: String,
alias: Option[String],
filter: String): LogicalPlan = {
var includedDestColumns = false
var includedDestRelation = false
var addedTupleId = false
def prepareTargetReleation(relation: UnresolvedRelation): SubqueryAlias = {
val tupleId = UnresolvedAlias(Alias(UnresolvedFunction("getTupleId",
Seq.empty, isDistinct = false), "tupleId")())
val projList = Seq(UnresolvedAlias(UnresolvedStar(alias.map(Seq(_)))), tupleId)
val carbonTable = CarbonEnv.getCarbonTable(table.tableIdentifier)(sparkSession)
if (carbonTable != null) {
val indexSchemas = DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable)
if (DataMapUtil.hasMVDataMap(carbonTable)) {
val allDataMapSchemas = DataMapStoreManager.getInstance
.getDataMapSchemasOfTable(carbonTable).asScala
.filter(dataMapSchema => null != dataMapSchema.getRelationIdentifier &&
!dataMapSchema.isIndexDataMap).asJava
allDataMapSchemas.asScala.foreach { dataMapSchema =>
DataMapStatusManager.disableDataMap(dataMapSchema.getDataMapName)
}
} else if (!indexSchemas.isEmpty) {
throw new UnsupportedOperationException(
"Update operation is not supported for table which has index datamaps")
}
if (carbonTable.isChildTableForMV) {
throw new UnsupportedOperationException(
"Update operation is not supported for mv datamap table")
}
}
val tableRelation = if (SparkUtil.isSparkVersionEqualTo("2.1")) {
relation
} else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
alias match {
case Some(_) =>
CarbonReflectionUtils.getSubqueryAlias(
sparkSession,
alias,
relation,
Some(table.tableIdentifier))
case _ => relation
}
} else {
throw new UnsupportedOperationException("Unsupported Spark version.")
}
CarbonReflectionUtils.getSubqueryAlias(
sparkSession,
alias,
Project(projList, tableRelation),
Some(table.tableIdentifier))
}
// get the un-analyzed logical plan
val targetTable = prepareTargetReleation(table)
val selectPlan = parser.parsePlan(selectStmt) transform {
case Project(projectList, child) if !includedDestColumns =>
includedDestColumns = true
if (projectList.size != columns.size) {
CarbonException.analysisException(
"The number of columns in source table and destination table columns mismatch")
}
val renamedProjectList = projectList.zip(columns).map { case (attr, col) =>
attr match {
case UnresolvedAlias(child22, _) =>
UnresolvedAlias(Alias(child22, col + CarbonCommonConstants.UPDATED_COL_EXTENSION)())
case UnresolvedAttribute(_) =>
UnresolvedAlias(Alias(attr, col + CarbonCommonConstants.UPDATED_COL_EXTENSION)())
case _ => attr
}
}
val tableName: Option[Seq[String]] = alias match {
case Some(_) => Some(alias.toSeq)
case _ => Some(Seq(child.asInstanceOf[UnresolvedRelation].tableIdentifier.table.toString))
}
val list = Seq(
UnresolvedAlias(UnresolvedStar(tableName))) ++ renamedProjectList
Project(list, child)
case Filter(cond, child) if !includedDestRelation =>
includedDestRelation = true
Filter(cond, Join(child, targetTable, Inner, None))
case r@CarbonUnresolvedRelation(t) if !includedDestRelation && t != table.tableIdentifier =>
includedDestRelation = true
Join(r, targetTable, Inner, None)
}
val updatedSelectPlan: LogicalPlan = if (!includedDestRelation) {
// special case to handle self join queries
// Eg. update tableName SET (column1) = (column1+1)
selectPlan transform {
case relation: UnresolvedRelation
if table.tableIdentifier == relation.tableIdentifier && !addedTupleId =>
addedTupleId = true
targetTable
}
} else {
selectPlan
}
val finalPlan = if (filter.length > 0) {
var transformed: Boolean = false
// Create a dummy projection to include filter conditions
var newPlan: LogicalPlan = null
if (table.tableIdentifier.database.isDefined) {
newPlan = parser.parsePlan("select * from " +
table.tableIdentifier.database.getOrElse("") + "." +
table.tableIdentifier.table + " " + alias.getOrElse("") + " " + filter)
}
else {
newPlan = parser.parsePlan("select * from " +
table.tableIdentifier.table + " " + alias.getOrElse("") + " " + filter)
}
newPlan transform {
case CarbonUnresolvedRelation(t)
if !transformed && t == table.tableIdentifier =>
transformed = true
CarbonReflectionUtils.getSubqueryAlias(
sparkSession,
alias,
updatedSelectPlan,
Some(table.tableIdentifier))
}
} else {
updatedSelectPlan
}
val destinationTable = CarbonReflectionUtils.getUnresolvedRelation(table.tableIdentifier, alias)
// In Spark 2.1 and 2.2, it uses Analyzer.execute method to transform LogicalPlan
// but in Spark 2.3, it uses Analyzer.executeAndCheck method
val analyzedPlan = CarbonReflectionUtils.invokeAnalyzerExecute(
analyzer, ProjectForUpdate(destinationTable, columns, Seq(finalPlan)))
// For all commands, they execute eagerly, and will be transformed to
// logical plan 'LocalRelation' in analyze phase(please see the code in 'Dataset.logicalPlan'),
// so it needs to return logical plan 'CarbonProjectForUpdateCommand' here
// instead of 'ProjectForUpdate'
optimizer.execute(analyzedPlan)
}
def processDeleteRecordsQuery(selectStmt: String,
alias: Option[String],
table: UnresolvedRelation): LogicalPlan = {
var addedTupleId = false
val parsePlan = parser.parsePlan(selectStmt)
val selectPlan = parsePlan transform {
case relation: UnresolvedRelation
if table.tableIdentifier == relation.tableIdentifier && !addedTupleId =>
addedTupleId = true
val tupleId = UnresolvedAlias(Alias(UnresolvedFunction("getTupleId",
Seq.empty, isDistinct = false), "tupleId")())
val projList = Seq(UnresolvedAlias(UnresolvedStar(alias.map(Seq(_)))), tupleId)
val carbonTable = CarbonEnv.getCarbonTable(table.tableIdentifier)(sparkSession)
if (carbonTable != null) {
if (carbonTable.isChildTableForMV) {
throw new UnsupportedOperationException(
"Delete operation is not supported for datamap table")
}
val indexSchemas = DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable)
if (DataMapUtil.hasMVDataMap(carbonTable)) {
val allDataMapSchemas = DataMapStoreManager.getInstance
.getDataMapSchemasOfTable(carbonTable).asScala
.filter(dataMapSchema => null != dataMapSchema.getRelationIdentifier &&
!dataMapSchema.isIndexDataMap).asJava
allDataMapSchemas.asScala.foreach { dataMapSchema =>
DataMapStatusManager.disableDataMap(dataMapSchema.getDataMapName)
}
} else if (!indexSchemas.isEmpty) {
throw new UnsupportedOperationException(
"Delete operation is not supported for table which has index datamaps")
}
}
// include tuple id in subquery
if (SparkUtil.isSparkVersionEqualTo("2.1")) {
Project(projList, relation)
} else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
alias match {
case Some(_) =>
val subqueryAlias = CarbonReflectionUtils.getSubqueryAlias(
sparkSession,
alias,
relation,
Some(table.tableIdentifier))
Project(projList, subqueryAlias)
case _ => Project(projList, relation)
}
} else {
throw new UnsupportedOperationException("Unsupported Spark version.")
}
}
CarbonProjectForDeleteCommand(
selectPlan,
table.tableIdentifier.database,
table.tableIdentifier.table,
System.currentTimeMillis().toString)
}
override def apply(logicalplan: LogicalPlan): LogicalPlan = {
logicalplan transform {
case UpdateTable(t, cols, sel, alias, where) => processUpdateQuery(t, cols, sel, alias, where)
case DeleteRecords(statement, alias, table) =>
processDeleteRecordsQuery(
statement,
alias,
table)
}
}
}
/**
* Insert into carbon table from other source
*/
case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = {
plan.transform {
// Wait until children are resolved.
case p: LogicalPlan if !p.childrenResolved => p
case p@InsertIntoTable(relation: LogicalRelation, _, child, _, _)
if relation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
castChildOutput(p, relation, child)
}
}
def castChildOutput(p: InsertIntoTable,
relation: LogicalRelation,
child: LogicalPlan): LogicalPlan = {
val carbonDSRelation = relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
if (carbonDSRelation.carbonRelation.output.size > CarbonCommonConstants
.DEFAULT_MAX_NUMBER_OF_COLUMNS) {
CarbonException.analysisException(
s"Maximum number of columns supported: " +
s"${CarbonCommonConstants.DEFAULT_MAX_NUMBER_OF_COLUMNS}")
}
// In spark, PreprocessTableInsertion rule has below cast logic.
// It was missed in carbon when implemented insert into rules.
var newChildOutput = if (child.output.size >= carbonDSRelation.carbonRelation.output.size) {
val expectedOutput = carbonDSRelation.carbonRelation.output
child.output.zip(expectedOutput).map {
case (actual, expected) =>
if (expected.dataType.sameType(actual.dataType) &&
expected.name == actual.name &&
expected.metadata == actual.metadata) {
actual
} else {
// Renaming is needed for handling the following cases like
// 1) Column names/types do not match, e.g., INSERT INTO TABLE tab1 SELECT 1, 2
// 2) Target tables have column metadata
Alias(Cast(actual, expected.dataType), expected.name)(
explicitMetadata = Option(expected.metadata))
}
}
} else {
child.output
}
if (newChildOutput.size >= carbonDSRelation.carbonRelation.output.size ||
carbonDSRelation.carbonTable.isHivePartitionTable) {
newChildOutput = newChildOutput.zipWithIndex.map { columnWithIndex =>
columnWithIndex._1 match {
case attr: Attribute =>
Alias(attr, s"col${ columnWithIndex._2 }")(NamedExpression.newExprId)
case attr => attr
}
}
val newChild: LogicalPlan = if (newChildOutput == child.output) {
if (SparkUtil.isSparkVersionEqualTo("2.1")) {
CarbonReflectionUtils.getField("child", p).asInstanceOf[LogicalPlan]
} else if (SparkUtil.isSparkVersionEqualTo("2.2")) {
CarbonReflectionUtils.getField("query", p).asInstanceOf[LogicalPlan]
} else {
throw new UnsupportedOperationException(s"Spark version $SPARK_VERSION is not supported")
}
} else {
Project(newChildOutput, child)
}
val overwrite = CarbonReflectionUtils.getOverWriteOption("overwrite", p)
InsertIntoCarbonTable(carbonDSRelation, p.partition, newChild, overwrite, true)
} else {
CarbonException.analysisException(
"Cannot insert into target table because number of columns mismatch")
}
}
}