| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.spark.sql.catalyst.analysis |
| |
| import scala.collection.mutable |
| |
| import org.apache.spark.SparkException |
| import org.apache.spark.sql.AnalysisException |
| import org.apache.spark.sql.catalyst.ExtendedAnalysisException |
| import org.apache.spark.sql.catalyst.expressions._ |
| import org.apache.spark.sql.catalyst.expressions.SubExprUtils._ |
| import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction, Median, PercentileCont, PercentileDisc} |
| import org.apache.spark.sql.catalyst.optimizer.{BooleanSimplification, DecorrelateInnerQuery, InlineCTE} |
| import org.apache.spark.sql.catalyst.plans._ |
| import org.apache.spark.sql.catalyst.plans.logical._ |
| import org.apache.spark.sql.catalyst.trees.TreeNodeTag |
| import org.apache.spark.sql.catalyst.trees.TreePattern.{LATERAL_COLUMN_ALIAS_REFERENCE, PLAN_EXPRESSION, UNRESOLVED_WINDOW_EXPRESSION} |
| import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, StringUtils, TypeUtils} |
| import org.apache.spark.sql.connector.catalog.{LookupCatalog, SupportsPartitionManagement} |
| import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase} |
| import org.apache.spark.sql.internal.SQLConf |
| import org.apache.spark.sql.types._ |
| import org.apache.spark.sql.util.SchemaUtils |
| import org.apache.spark.util.ArrayImplicits._ |
| import org.apache.spark.util.Utils |
| |
| /** |
| * Throws user facing errors when passed invalid queries that fail to analyze. |
| */ |
| trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsBase { |
| |
| protected def isView(nameParts: Seq[String]): Boolean |
| |
| import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ |
| |
| /** |
| * Override to provide additional checks for correct analysis. |
| * These rules will be evaluated after our built-in check rules. |
| */ |
| val extendedCheckRules: Seq[LogicalPlan => Unit] = Nil |
| |
| val DATA_TYPE_MISMATCH_ERROR = TreeNodeTag[Unit]("dataTypeMismatchError") |
| val INVALID_FORMAT_ERROR = TreeNodeTag[Unit]("invalidFormatError") |
| |
| /** |
| * Fails the analysis at the point where a specific tree node was parsed using a provided |
| * error class and message parameters. |
| */ |
| def failAnalysis(errorClass: String, messageParameters: Map[String, String]): Nothing = { |
| throw new AnalysisException( |
| errorClass = errorClass, |
| messageParameters = messageParameters) |
| } |
| |
| protected def hasMapType(dt: DataType): Boolean = { |
| dt.existsRecursively(_.isInstanceOf[MapType]) |
| } |
| |
| protected def mapColumnInSetOperation(plan: LogicalPlan): Option[Attribute] = plan match { |
| case _: Intersect | _: Except | _: Distinct => |
| plan.output.find(a => hasMapType(a.dataType)) |
| case d: Deduplicate => |
| d.keys.find(a => hasMapType(a.dataType)) |
| case _ => None |
| } |
| |
| private def checkLimitLikeClause(name: String, limitExpr: Expression): Unit = { |
| limitExpr match { |
| case e if !e.foldable => limitExpr.failAnalysis( |
| errorClass = "INVALID_LIMIT_LIKE_EXPRESSION.IS_UNFOLDABLE", |
| messageParameters = Map( |
| "name" -> name, |
| "expr" -> toSQLExpr(limitExpr))) |
| case e if e.dataType != IntegerType => limitExpr.failAnalysis( |
| errorClass = "INVALID_LIMIT_LIKE_EXPRESSION.DATA_TYPE", |
| messageParameters = Map( |
| "name" -> name, |
| "expr" -> toSQLExpr(limitExpr), |
| "dataType" -> toSQLType(e.dataType))) |
| case e => |
| e.eval() match { |
| case null => limitExpr.failAnalysis( |
| errorClass = "INVALID_LIMIT_LIKE_EXPRESSION.IS_NULL", |
| messageParameters = Map( |
| "name" -> name, |
| "expr" -> toSQLExpr(limitExpr))) |
| case v: Int if v < 0 => limitExpr.failAnalysis( |
| errorClass = "INVALID_LIMIT_LIKE_EXPRESSION.IS_NEGATIVE", |
| messageParameters = Map( |
| "name" -> name, |
| "expr" -> toSQLExpr(limitExpr), |
| "v" -> toSQLValue(v, IntegerType))) |
| case _ => // OK |
| } |
| } |
| } |
| |
| /** Check and throw exception when a given resolved plan contains LateralColumnAliasReference. */ |
| private def checkNotContainingLCA(exprs: Seq[Expression], plan: LogicalPlan): Unit = { |
| exprs.foreach(_.transformDownWithPruning(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) { |
| case lcaRef: LateralColumnAliasReference => |
| throw SparkException.internalError("Resolved plan should not contain any " + |
| s"LateralColumnAliasReference.\nDebugging information: plan:\n$plan", |
| context = lcaRef.origin.getQueryContext, |
| summary = lcaRef.origin.context.summary) |
| }) |
| } |
| |
| private def isMapWithStringKey(e: Expression): Boolean = if (e.resolved) { |
| e.dataType match { |
| case m: MapType => m.keyType.isInstanceOf[StringType] |
| case _ => false |
| } |
| } else { |
| false |
| } |
| |
| private def failUnresolvedAttribute( |
| operator: LogicalPlan, |
| a: Attribute, |
| errorClass: String): Nothing = { |
| val missingCol = a.sql |
| val candidates = operator.inputSet.toSeq |
| .map(attr => attr.qualifier :+ attr.name) |
| val orderedCandidates = |
| StringUtils.orderSuggestedIdentifiersBySimilarity(missingCol, candidates) |
| throw QueryCompilationErrors.unresolvedAttributeError( |
| errorClass, missingCol, orderedCandidates, a.origin) |
| } |
| |
| private def checkUnreferencedCTERelations( |
| cteMap: mutable.Map[Long, (CTERelationDef, Int, mutable.Map[Long, Int])], |
| visited: mutable.Map[Long, Boolean], |
| danglingCTERelations: mutable.ArrayBuffer[CTERelationDef], |
| cteId: Long): Unit = { |
| if (visited(cteId)) { |
| return |
| } |
| val (cteDef, _, refMap) = cteMap(cteId) |
| refMap.foreach { case (id, _) => |
| checkUnreferencedCTERelations(cteMap, visited, danglingCTERelations, id) |
| } |
| danglingCTERelations.append(cteDef) |
| visited(cteId) = true |
| } |
| |
| def checkAnalysis(plan: LogicalPlan): Unit = { |
| val inlineCTE = InlineCTE(alwaysInline = true) |
| val cteMap = mutable.HashMap.empty[Long, (CTERelationDef, Int, mutable.Map[Long, Int])] |
| inlineCTE.buildCTEMap(plan, cteMap) |
| val danglingCTERelations = mutable.ArrayBuffer.empty[CTERelationDef] |
| val visited: mutable.Map[Long, Boolean] = mutable.Map.empty.withDefaultValue(false) |
| // If a CTE relation is never used, it will disappear after inline. Here we explicitly collect |
| // these dangling CTE relations, and put them back in the main query, to make sure the entire |
| // query plan is valid. |
| cteMap.foreach { case (cteId, (_, refCount, _)) => |
| // If a CTE relation ref count is 0, the other CTE relations that reference it should also be |
| // collected. This code will also guarantee the leaf relations that do not reference |
| // any others are collected first. |
| if (refCount == 0) { |
| checkUnreferencedCTERelations(cteMap, visited, danglingCTERelations, cteId) |
| } |
| } |
| // Inline all CTEs in the plan to help check query plan structures in subqueries. |
| var inlinedPlan: LogicalPlan = plan |
| try { |
| inlinedPlan = inlineCTE(plan) |
| } catch { |
| case e: AnalysisException => |
| throw new ExtendedAnalysisException(e, plan) |
| } |
| if (danglingCTERelations.nonEmpty) { |
| inlinedPlan = WithCTE(inlinedPlan, danglingCTERelations.toSeq) |
| } |
| try { |
| checkAnalysis0(inlinedPlan) |
| } catch { |
| case e: AnalysisException => |
| throw new ExtendedAnalysisException(e, inlinedPlan) |
| } |
| plan.setAnalyzed() |
| } |
| |
| def checkAnalysis0(plan: LogicalPlan): Unit = { |
| // The target table is not a child plan of the insert command. We should report errors for table |
| // not found first, instead of errors in the input query of the insert command, by doing a |
| // top-down traversal. |
| plan.foreach { |
| case InsertIntoStatement(u: UnresolvedRelation, _, _, _, _, _, _) => |
| u.tableNotFound(u.multipartIdentifier) |
| |
| // TODO (SPARK-27484): handle streaming write commands when we have them. |
| case write: V2WriteCommand if write.table.isInstanceOf[UnresolvedRelation] => |
| val tblName = write.table.asInstanceOf[UnresolvedRelation].multipartIdentifier |
| write.table.tableNotFound(tblName) |
| |
| case _ => |
| } |
| |
| // We transform up and order the rules so as to catch the first possible failure instead |
| // of the result of cascading resolution failures. |
| plan.foreachUp { |
| case p if p.analyzed => // Skip already analyzed sub-plans |
| |
| case leaf: LeafNode if leaf.output.map(_.dataType).exists(CharVarcharUtils.hasCharVarchar) => |
| throw SparkException.internalError( |
| "Logical plan should not have output of char/varchar type: " + leaf) |
| |
| case u: UnresolvedNamespace => |
| u.schemaNotFound(u.multipartIdentifier) |
| |
| case u: UnresolvedTable => |
| u.tableNotFound(u.multipartIdentifier) |
| |
| case u: UnresolvedView => |
| u.tableNotFound(u.multipartIdentifier) |
| |
| case u: UnresolvedTableOrView => |
| u.tableNotFound(u.multipartIdentifier) |
| |
| case u: UnresolvedRelation => |
| u.tableNotFound(u.multipartIdentifier) |
| |
| case u: UnresolvedFunctionName => |
| val catalogPath = (currentCatalog.name +: catalogManager.currentNamespace).mkString(".") |
| throw QueryCompilationErrors.unresolvedRoutineError( |
| u.multipartIdentifier, |
| Seq("system.builtin", "system.session", catalogPath), |
| u.origin) |
| |
| case u: UnresolvedHint => |
| throw SparkException.internalError( |
| msg = s"Hint not found: ${toSQLId(u.name)}", |
| context = u.origin.getQueryContext, |
| summary = u.origin.context.summary) |
| |
| case command: V2PartitionCommand => |
| command.table match { |
| case r @ ResolvedTable(_, _, table, _) => table match { |
| case t: SupportsPartitionManagement => |
| if (t.partitionSchema.isEmpty) { |
| r.failAnalysis( |
| errorClass = "INVALID_PARTITION_OPERATION.PARTITION_SCHEMA_IS_EMPTY", |
| messageParameters = Map("name" -> toSQLId(r.name))) |
| } |
| case _ => |
| r.failAnalysis( |
| errorClass = "INVALID_PARTITION_OPERATION.PARTITION_MANAGEMENT_IS_UNSUPPORTED", |
| messageParameters = Map("name" -> toSQLId(r.name))) |
| } |
| case _ => |
| } |
| |
| case o: OverwriteByExpression if o.deleteExpr.exists(_.isInstanceOf[SubqueryExpression]) => |
| o.deleteExpr.failAnalysis ( |
| errorClass = "UNSUPPORTED_FEATURE.OVERWRITE_BY_SUBQUERY", |
| messageParameters = Map.empty) |
| |
| case operator: LogicalPlan => |
| operator transformExpressionsDown { |
| // Check argument data types of higher-order functions downwards first. |
| // If the arguments of the higher-order functions are resolved but the type check fails, |
| // the argument functions will not get resolved, but we should report the argument type |
| // check failure instead of claiming the argument functions are unresolved. |
| case hof: HigherOrderFunction |
| if hof.argumentsResolved && hof.checkArgumentDataTypes().isFailure => |
| hof.checkArgumentDataTypes() match { |
| case checkRes: TypeCheckResult.DataTypeMismatch => |
| hof.dataTypeMismatch(hof, checkRes) |
| case checkRes: TypeCheckResult.InvalidFormat => |
| hof.setTagValue(INVALID_FORMAT_ERROR, ()) |
| hof.invalidFormat(checkRes) |
| } |
| |
| // If an attribute can't be resolved as a map key of string type, either the key should be |
| // surrounded with single quotes, or there is a typo in the attribute name. |
| case GetMapValue(map, key: Attribute) if isMapWithStringKey(map) && !key.resolved => |
| failUnresolvedAttribute(operator, key, "UNRESOLVED_MAP_KEY") |
| } |
| |
| // Fail if we still have an unresolved all in group by. This needs to run before the |
| // general unresolved check below to throw a more tailored error message. |
| new ResolveReferencesInAggregate(catalogManager).checkUnresolvedGroupByAll(operator) |
| |
| // Early checks for column definitions, to produce better error messages |
| ColumnDefinition.checkColumnDefinitions(operator) |
| |
| getAllExpressions(operator).foreach(_.foreachUp { |
| case a: Attribute if !a.resolved => |
| failUnresolvedAttribute(operator, a, "UNRESOLVED_COLUMN") |
| |
| case s: Star => |
| withPosition(s) { |
| throw QueryCompilationErrors.invalidStarUsageError(operator.nodeName, Seq(s)) |
| } |
| |
| case e: Expression if e.checkInputDataTypes().isFailure => |
| e.checkInputDataTypes() match { |
| case checkRes: TypeCheckResult.DataTypeMismatch => |
| e.setTagValue(DATA_TYPE_MISMATCH_ERROR, ()) |
| e.dataTypeMismatch(e, checkRes) |
| case TypeCheckResult.TypeCheckFailure(message) => |
| e.setTagValue(DATA_TYPE_MISMATCH_ERROR, ()) |
| val extraHint = extraHintForAnsiTypeCoercionExpression(operator) |
| e.failAnalysis( |
| errorClass = "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT", |
| messageParameters = Map( |
| "sqlExpr" -> toSQLExpr(e), |
| "msg" -> message, |
| "hint" -> extraHint)) |
| case checkRes: TypeCheckResult.InvalidFormat => |
| e.setTagValue(INVALID_FORMAT_ERROR, ()) |
| e.invalidFormat(checkRes) |
| } |
| |
| case c: Cast if !c.resolved => |
| throw SparkException.internalError( |
| msg = s"Found the unresolved Cast: ${c.simpleString(SQLConf.get.maxToStringFields)}", |
| context = c.origin.getQueryContext, |
| summary = c.origin.context.summary) |
| case e: RuntimeReplaceable if !e.replacement.resolved => |
| throw SparkException.internalError( |
| s"Cannot resolve the runtime replaceable expression ${toSQLExpr(e)}. " + |
| s"The replacement is unresolved: ${toSQLExpr(e.replacement)}.") |
| |
| case g: Grouping => |
| g.failAnalysis( |
| errorClass = "UNSUPPORTED_GROUPING_EXPRESSION", messageParameters = Map.empty) |
| case g: GroupingID => |
| g.failAnalysis( |
| errorClass = "UNSUPPORTED_GROUPING_EXPRESSION", messageParameters = Map.empty) |
| |
| case e: Expression if e.children.exists(_.isInstanceOf[WindowFunction]) && |
| !e.isInstanceOf[WindowExpression] && e.resolved => |
| val w = e.children.find(_.isInstanceOf[WindowFunction]).get |
| e.failAnalysis( |
| errorClass = "WINDOW_FUNCTION_WITHOUT_OVER_CLAUSE", |
| messageParameters = Map("funcName" -> toSQLExpr(w))) |
| |
| case w @ WindowExpression(AggregateExpression(_, _, true, _, _), _) => |
| w.failAnalysis( |
| errorClass = "DISTINCT_WINDOW_FUNCTION_UNSUPPORTED", |
| messageParameters = Map("windowExpr" -> toSQLExpr(w))) |
| |
| case w @ WindowExpression(wf: FrameLessOffsetWindowFunction, |
| WindowSpecDefinition(_, order, frame: SpecifiedWindowFrame)) |
| if order.isEmpty || !frame.isOffset => |
| w.failAnalysis( |
| errorClass = "WINDOW_FUNCTION_AND_FRAME_MISMATCH", |
| messageParameters = Map( |
| "funcName" -> toSQLExpr(wf), |
| "windowExpr" -> toSQLExpr(w))) |
| |
| case w: WindowExpression => |
| // Only allow window functions with an aggregate expression or an offset window |
| // function or a Pandas window UDF. |
| w.windowFunction match { |
| case agg @ AggregateExpression( |
| _: PercentileCont | _: PercentileDisc | _: Median, _, _, _, _) |
| if w.windowSpec.orderSpec.nonEmpty || w.windowSpec.frameSpecification != |
| SpecifiedWindowFrame(RowFrame, UnboundedPreceding, UnboundedFollowing) => |
| agg.failAnalysis( |
| errorClass = "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC", |
| messageParameters = Map("aggFunc" -> toSQLExpr(agg.aggregateFunction))) |
| case _: AggregateExpression | _: FrameLessOffsetWindowFunction | |
| _: AggregateWindowFunction => // OK |
| case other => |
| other.failAnalysis( |
| errorClass = "UNSUPPORTED_EXPR_FOR_WINDOW", |
| messageParameters = Map("sqlExpr" -> toSQLExpr(other))) |
| } |
| |
| case s: SubqueryExpression => |
| checkSubqueryExpression(operator, s) |
| |
| case e: ExpressionWithRandomSeed if !e.seedExpression.foldable => |
| e.failAnalysis( |
| errorClass = "SEED_EXPRESSION_IS_UNFOLDABLE", |
| messageParameters = Map( |
| "seedExpr" -> toSQLExpr(e.seedExpression), |
| "exprWithSeed" -> toSQLExpr(e))) |
| |
| case p: Parameter => |
| p.failAnalysis( |
| errorClass = "UNBOUND_SQL_PARAMETER", |
| messageParameters = Map("name" -> p.name)) |
| |
| case _ => |
| }) |
| |
| operator match { |
| case RelationTimeTravel(u: UnresolvedRelation, _, _) => |
| u.tableNotFound(u.multipartIdentifier) |
| |
| case etw: EventTimeWatermark => |
| etw.eventTime.dataType match { |
| case s: StructType |
| if s.find(_.name == "end").map(_.dataType) == Some(TimestampType) => |
| case _: TimestampType => |
| case _ => |
| etw.failAnalysis( |
| errorClass = "EVENT_TIME_IS_NOT_ON_TIMESTAMP_TYPE", |
| messageParameters = Map( |
| "eventName" -> toSQLId(etw.eventTime.name), |
| "eventType" -> toSQLType(etw.eventTime.dataType))) |
| } |
| |
| case f: Filter if f.condition.dataType != BooleanType => |
| f.failAnalysis( |
| errorClass = "DATATYPE_MISMATCH.FILTER_NOT_BOOLEAN", |
| messageParameters = Map( |
| "sqlExpr" -> f.expressions.map(toSQLExpr).mkString(","), |
| "filter" -> toSQLExpr(f.condition), |
| "type" -> toSQLType(f.condition.dataType))) |
| |
| case j @ Join(_, _, _, Some(condition), _) if condition.dataType != BooleanType => |
| j.failAnalysis( |
| errorClass = "JOIN_CONDITION_IS_NOT_BOOLEAN_TYPE", |
| messageParameters = Map( |
| "joinCondition" -> toSQLExpr(condition), |
| "conditionType" -> toSQLType(condition.dataType))) |
| |
| case j @ AsOfJoin(_, _, _, Some(condition), _, _, _) |
| if condition.dataType != BooleanType => |
| throw SparkException.internalError( |
| msg = s"join condition '${toSQLExpr(condition)}' " + |
| s"of type ${toSQLType(condition.dataType)} is not a boolean.", |
| context = j.origin.getQueryContext, |
| summary = j.origin.context.summary) |
| |
| case j @ AsOfJoin(_, _, _, _, _, _, Some(toleranceAssertion)) => |
| if (!toleranceAssertion.foldable) { |
| j.failAnalysis( |
| errorClass = "AS_OF_JOIN.TOLERANCE_IS_UNFOLDABLE", |
| messageParameters = Map.empty) |
| } |
| if (!toleranceAssertion.eval().asInstanceOf[Boolean]) { |
| j.failAnalysis( |
| errorClass = "AS_OF_JOIN.TOLERANCE_IS_NON_NEGATIVE", |
| messageParameters = Map.empty) |
| } |
| |
| case a: Aggregate => ExprUtils.assertValidAggregation(a) |
| |
| case CollectMetrics(name, metrics, _, _) => |
| if (name == null || name.isEmpty) { |
| operator.failAnalysis( |
| errorClass = "INVALID_OBSERVED_METRICS.MISSING_NAME", |
| messageParameters = Map("operator" -> planToString(operator))) |
| } |
| // Check if an expression is a valid metric. A metric must meet the following criteria: |
| // - Is not a window function; |
| // - Is not nested aggregate function; |
| // - Is not a distinct aggregate function; |
| // - Has only non-deterministic functions that are nested inside an aggregate function; |
| // - Has only attributes that are nested inside an aggregate function. |
| def checkMetric(s: Expression, e: Expression, seenAggregate: Boolean = false): Unit = { |
| e match { |
| case _: WindowExpression => |
| e.failAnalysis( |
| "INVALID_OBSERVED_METRICS.WINDOW_EXPRESSIONS_UNSUPPORTED", |
| Map("expr" -> toSQLExpr(s))) |
| case a: AggregateExpression if seenAggregate => |
| e.failAnalysis( |
| "INVALID_OBSERVED_METRICS.NESTED_AGGREGATES_UNSUPPORTED", |
| Map("expr" -> toSQLExpr(s))) |
| case a: AggregateExpression if a.isDistinct => |
| e.failAnalysis( |
| "INVALID_OBSERVED_METRICS.AGGREGATE_EXPRESSION_WITH_DISTINCT_UNSUPPORTED", |
| Map("expr" -> toSQLExpr(s))) |
| case a: AggregateExpression if a.filter.isDefined => |
| e.failAnalysis( |
| "INVALID_OBSERVED_METRICS.AGGREGATE_EXPRESSION_WITH_FILTER_UNSUPPORTED", |
| Map("expr" -> toSQLExpr(s))) |
| case _: AggregateExpression | _: AggregateFunction => |
| e.children.foreach(checkMetric (s, _, seenAggregate = true)) |
| case _: Attribute if !seenAggregate => |
| e.failAnalysis( |
| "INVALID_OBSERVED_METRICS.NON_AGGREGATE_FUNC_ARG_IS_ATTRIBUTE", |
| Map("expr" -> toSQLExpr(s))) |
| case a: Alias => |
| checkMetric(s, a.child, seenAggregate) |
| case a if !e.deterministic && !seenAggregate => |
| e.failAnalysis( |
| "INVALID_OBSERVED_METRICS.NON_AGGREGATE_FUNC_ARG_IS_NON_DETERMINISTIC", |
| Map("expr" -> toSQLExpr(s))) |
| case _ => |
| e.children.foreach(checkMetric (s, _, seenAggregate)) |
| } |
| } |
| metrics.foreach(m => checkMetric(m, m)) |
| |
| // see Analyzer.ResolveUnpivot |
| // given ids must be AttributeReference when no values given |
| case up @Unpivot(Some(ids), None, _, _, _, _) |
| if up.childrenResolved && ids.forall(_.resolved) && |
| ids.exists(! _.isInstanceOf[AttributeReference]) => |
| throw QueryCompilationErrors.unpivotRequiresAttributes("id", "value", up.ids.get) |
| // given values must be AttributeReference when no ids given |
| case up @Unpivot(None, Some(values), _, _, _, _) |
| if up.childrenResolved && values.forall(_.forall(_.resolved)) && |
| values.exists(_.exists(! _.isInstanceOf[AttributeReference])) => |
| throw QueryCompilationErrors.unpivotRequiresAttributes("value", "id", values.flatten) |
| // given values must not be empty seq |
| case up @Unpivot(Some(ids), Some(Seq()), _, _, _, _) |
| if up.childrenResolved && ids.forall(_.resolved) => |
| throw QueryCompilationErrors.unpivotRequiresValueColumns() |
| // all values must have same length as there are value column names |
| case up @Unpivot(Some(ids), Some(values), _, _, _, _) |
| if up.childrenResolved && ids.forall(_.resolved) && |
| values.exists(_.length != up.valueColumnNames.length) => |
| throw QueryCompilationErrors.unpivotValueSizeMismatchError(up.valueColumnNames.length) |
| // see TypeCoercionBase.UnpivotCoercion |
| case up: Unpivot if up.canBeCoercioned && !up.valuesTypeCoercioned => |
| throw QueryCompilationErrors.unpivotValueDataTypeMismatchError(up.values.get) |
| |
| case Sort(orders, _, _) => |
| orders.foreach { order => |
| if (!RowOrdering.isOrderable(order.dataType)) { |
| order.failAnalysis( |
| errorClass = "EXPRESSION_TYPE_IS_NOT_ORDERABLE", |
| messageParameters = Map("exprType" -> toSQLType(order.dataType))) |
| } |
| } |
| |
| case Window(_, partitionSpec, _, _) => |
| // Both `partitionSpec` and `orderSpec` must be orderable. We only need an extra check |
| // for `partitionSpec` here because `orderSpec` has the type check itself. |
| partitionSpec.foreach { p => |
| if (!RowOrdering.isOrderable(p.dataType)) { |
| p.failAnalysis( |
| errorClass = "EXPRESSION_TYPE_IS_NOT_ORDERABLE", |
| messageParameters = Map( |
| "expr" -> toSQLExpr(p), |
| "exprType" -> toSQLType(p.dataType))) |
| } |
| } |
| |
| case GlobalLimit(limitExpr, _) => checkLimitLikeClause("limit", limitExpr) |
| |
| case LocalLimit(limitExpr, child) => |
| checkLimitLikeClause("limit", limitExpr) |
| child match { |
| case Offset(offsetExpr, _) => |
| val limit = limitExpr.eval().asInstanceOf[Int] |
| val offset = offsetExpr.eval().asInstanceOf[Int] |
| if (Int.MaxValue - limit < offset) { |
| child.failAnalysis( |
| errorClass = "SUM_OF_LIMIT_AND_OFFSET_EXCEEDS_MAX_INT", |
| messageParameters = Map( |
| "limit" -> limit.toString, |
| "offset" -> offset.toString)) |
| } |
| case _ => |
| } |
| |
| case Offset(offsetExpr, _) => checkLimitLikeClause("offset", offsetExpr) |
| |
| case Tail(limitExpr, _) => checkLimitLikeClause("tail", limitExpr) |
| |
| case e @ (_: Union | _: SetOperation) if operator.children.length > 1 => |
| def dataTypes(plan: LogicalPlan): Seq[DataType] = plan.output.map(_.dataType) |
| |
| val ref = dataTypes(operator.children.head) |
| operator.children.tail.zipWithIndex.foreach { case (child, ti) => |
| // Check the number of columns |
| if (child.output.length != ref.length) { |
| e.failAnalysis( |
| errorClass = "NUM_COLUMNS_MISMATCH", |
| messageParameters = Map( |
| "operator" -> toSQLStmt(operator.nodeName), |
| "firstNumColumns" -> ref.length.toString, |
| "invalidOrdinalNum" -> ordinalNumber(ti + 1), |
| "invalidNumColumns" -> child.output.length.toString)) |
| } |
| |
| val dataTypesAreCompatibleFn = getDataTypesAreCompatibleFn(operator) |
| // Check if the data types match. |
| dataTypes(child).zip(ref).zipWithIndex.foreach { case ((dt1, dt2), ci) => |
| // SPARK-18058: we shall not care about the nullability of columns |
| if (!dataTypesAreCompatibleFn(dt1, dt2)) { |
| e.failAnalysis( |
| errorClass = "INCOMPATIBLE_COLUMN_TYPE", |
| messageParameters = Map( |
| "operator" -> toSQLStmt(operator.nodeName), |
| "columnOrdinalNumber" -> ordinalNumber(ci), |
| "tableOrdinalNumber" -> ordinalNumber(ti + 1), |
| "dataType1" -> toSQLType(dt1), |
| "dataType2" -> toSQLType(dt2), |
| "hint" -> extraHintForAnsiTypeCoercionPlan(operator))) |
| } |
| } |
| } |
| |
| case create: V2CreateTablePlan => |
| val references = create.partitioning.flatMap(_.references).toSet |
| val badReferences = references.map(_.fieldNames).flatMap { column => |
| create.tableSchema.findNestedField(column.toImmutableArraySeq) match { |
| case Some(_) => |
| None |
| case _ => |
| Some(column.quoted) |
| } |
| } |
| |
| if (badReferences.nonEmpty) { |
| create.failAnalysis( |
| errorClass = "UNSUPPORTED_FEATURE.PARTITION_WITH_NESTED_COLUMN_IS_UNSUPPORTED", |
| messageParameters = Map( |
| "cols" -> badReferences.map(r => toSQLId(r)).mkString(", "))) |
| } |
| |
| create.tableSchema.foreach(f => TypeUtils.failWithIntervalType(f.dataType)) |
| |
| case write: V2WriteCommand if write.resolved => |
| write.query.schema.foreach(f => TypeUtils.failWithIntervalType(f.dataType)) |
| |
| case alter: AlterTableCommand => |
| checkAlterTableCommand(alter) |
| |
| case c: CreateVariable |
| if c.resolved && c.defaultExpr.child.containsPattern(PLAN_EXPRESSION) => |
| val ident = c.name.asInstanceOf[ResolvedIdentifier] |
| val varName = toSQLId( |
| (ident.catalog.name +: ident.identifier.namespace :+ ident.identifier.name) |
| .toImmutableArraySeq) |
| throw QueryCompilationErrors.defaultValuesMayNotContainSubQueryExpressions( |
| "DECLARE VARIABLE", |
| varName, |
| c.defaultExpr.originalSQL) |
| |
| case _ => // Falls back to the following checks |
| } |
| |
| operator match { |
| case o if o.children.nonEmpty && o.missingInput.nonEmpty => |
| val missingAttributes = o.missingInput.map(attr => toSQLExpr(attr)).mkString(", ") |
| val input = o.inputSet.map(attr => toSQLExpr(attr)).mkString(", ") |
| |
| val resolver = plan.conf.resolver |
| val attrsWithSameName = o.missingInput.filter { missing => |
| o.inputSet.exists(input => resolver(missing.name, input.name)) |
| } |
| |
| if (attrsWithSameName.nonEmpty) { |
| val sameNames = attrsWithSameName.map(attr => toSQLExpr(attr)).mkString(", ") |
| o.failAnalysis( |
| errorClass = "MISSING_ATTRIBUTES.RESOLVED_ATTRIBUTE_APPEAR_IN_OPERATION", |
| messageParameters = Map( |
| "missingAttributes" -> missingAttributes, |
| "input" -> input, |
| "operator" -> operator.simpleString(SQLConf.get.maxToStringFields), |
| "operation" -> sameNames |
| )) |
| } else { |
| o.failAnalysis( |
| errorClass = "MISSING_ATTRIBUTES.RESOLVED_ATTRIBUTE_MISSING_FROM_INPUT", |
| messageParameters = Map( |
| "missingAttributes" -> missingAttributes, |
| "input" -> input, |
| "operator" -> operator.simpleString(SQLConf.get.maxToStringFields) |
| )) |
| } |
| |
| case p @ Project(projectList, _) => |
| projectList.foreach(_.transformDownWithPruning( |
| _.containsPattern(UNRESOLVED_WINDOW_EXPRESSION)) { |
| case UnresolvedWindowExpression(_, windowSpec) => |
| throw QueryCompilationErrors.windowSpecificationNotDefinedError(windowSpec.name) |
| }) |
| |
| case j: Join if !j.duplicateResolved => |
| val conflictingAttributes = |
| j.left.outputSet.intersect(j.right.outputSet).map(toSQLExpr(_)).mkString(", ") |
| throw SparkException.internalError( |
| msg = s""" |
| |Failure when resolving conflicting references in ${j.nodeName}: |
| |${planToString(plan)} |
| |Conflicting attributes: $conflictingAttributes.""".stripMargin, |
| context = j.origin.getQueryContext, |
| summary = j.origin.context.summary) |
| |
| case i: Intersect if !i.duplicateResolved => |
| val conflictingAttributes = |
| i.left.outputSet.intersect(i.right.outputSet).map(toSQLExpr(_)).mkString(", ") |
| throw SparkException.internalError( |
| msg = s""" |
| |Failure when resolving conflicting references in ${i.nodeName}: |
| |${planToString(plan)} |
| |Conflicting attributes: $conflictingAttributes.""".stripMargin, |
| context = i.origin.getQueryContext, |
| summary = i.origin.context.summary) |
| |
| case e: Except if !e.duplicateResolved => |
| val conflictingAttributes = |
| e.left.outputSet.intersect(e.right.outputSet).map(toSQLExpr(_)).mkString(", ") |
| throw SparkException.internalError( |
| msg = s""" |
| |Failure when resolving conflicting references in ${e.nodeName}: |
| |${planToString(plan)} |
| |Conflicting attributes: $conflictingAttributes.""".stripMargin, |
| context = e.origin.getQueryContext, |
| summary = e.origin.context.summary) |
| |
| case j: AsOfJoin if !j.duplicateResolved => |
| val conflictingAttributes = |
| j.left.outputSet.intersect(j.right.outputSet).map(toSQLExpr(_)).mkString(", ") |
| throw SparkException.internalError( |
| msg = s""" |
| |Failure when resolving conflicting references in ${j.nodeName}: |
| |${planToString(plan)} |
| |Conflicting attributes: $conflictingAttributes.""".stripMargin, |
| context = j.origin.getQueryContext, |
| summary = j.origin.context.summary) |
| |
| // TODO: although map type is not orderable, technically map type should be able to be |
| // used in equality comparison, remove this type check once we support it. |
| case o if mapColumnInSetOperation(o).isDefined => |
| val mapCol = mapColumnInSetOperation(o).get |
| o.failAnalysis( |
| errorClass = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_MAP_TYPE", |
| messageParameters = Map( |
| "colName" -> toSQLId(mapCol.name), |
| "dataType" -> toSQLType(mapCol.dataType))) |
| |
| case o if o.expressions.exists(!_.deterministic) && |
| !o.isInstanceOf[Project] && |
| // non-deterministic expressions inside CollectMetrics have been |
| // already validated inside checkMetric function |
| !o.isInstanceOf[CollectMetrics] && |
| !o.isInstanceOf[Filter] && |
| !o.isInstanceOf[Aggregate] && |
| !o.isInstanceOf[Window] && |
| !o.isInstanceOf[Expand] && |
| !o.isInstanceOf[Generate] && |
| !o.isInstanceOf[CreateVariable] && |
| !o.isInstanceOf[MapInPandas] && |
| !o.isInstanceOf[MapInArrow] && |
| // Lateral join is checked in checkSubqueryExpression. |
| !o.isInstanceOf[LateralJoin] => |
| // The rule above is used to check Aggregate operator. |
| o.failAnalysis( |
| errorClass = "INVALID_NON_DETERMINISTIC_EXPRESSIONS", |
| messageParameters = Map("sqlExprs" -> o.expressions.map(toSQLExpr(_)).mkString(", ")) |
| ) |
| |
| case _: UnresolvedHint => throw SparkException.internalError( |
| "Logical hint operator should be removed during analysis.") |
| |
| case f @ Filter(condition, _) |
| if PlanHelper.specialExpressionsInUnsupportedOperator(f).nonEmpty => |
| val invalidExprSqls = PlanHelper.specialExpressionsInUnsupportedOperator(f).map(_.sql) |
| f.failAnalysis( |
| errorClass = "INVALID_WHERE_CONDITION", |
| messageParameters = Map( |
| "condition" -> toSQLExpr(condition), |
| "expressionList" -> invalidExprSqls.mkString(", "))) |
| |
| case other if PlanHelper.specialExpressionsInUnsupportedOperator(other).nonEmpty => |
| val invalidExprSqls = |
| PlanHelper.specialExpressionsInUnsupportedOperator(other).map(toSQLExpr) |
| other.failAnalysis( |
| errorClass = "UNSUPPORTED_EXPR_FOR_OPERATOR", |
| messageParameters = Map( |
| "invalidExprSqls" -> invalidExprSqls.mkString(", "))) |
| |
| case _ => // Analysis successful! |
| } |
| } |
| checkCollectedMetrics(plan) |
| extendedCheckRules.foreach(_(plan)) |
| plan.foreachUp { |
| case o if !o.resolved => |
| throw SparkException.internalError( |
| msg = s"Found the unresolved operator: ${o.simpleString(SQLConf.get.maxToStringFields)}", |
| context = o.origin.getQueryContext, |
| summary = o.origin.context.summary) |
| // If the plan is resolved, all lateral column alias references should have been either |
| // restored or resolved. Add check for extra safe. |
| case o if o.expressions.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) => |
| checkNotContainingLCA(o.expressions, o) |
| case _ => |
| } |
| } |
| |
| private def getAllExpressions(plan: LogicalPlan): Seq[Expression] = { |
| plan match { |
| // We only resolve `groupingExpressions` if `aggregateExpressions` is resolved first (See |
| // `ResolveReferencesInAggregate`). We should check errors in `aggregateExpressions` first. |
| case a: Aggregate => a.aggregateExpressions ++ a.groupingExpressions |
| case _ => plan.expressions |
| } |
| } |
| |
| private def getDataTypesAreCompatibleFn(plan: LogicalPlan): (DataType, DataType) => Boolean = { |
| val isUnion = plan.isInstanceOf[Union] |
| if (isUnion) { |
| (dt1: DataType, dt2: DataType) => |
| DataType.equalsStructurally(dt1, dt2, true) |
| } else { |
| // SPARK-18058: we shall not care about the nullability of columns |
| (dt1: DataType, dt2: DataType) => |
| TypeCoercion.findWiderTypeForTwo(dt1.asNullable, dt2.asNullable).nonEmpty |
| } |
| } |
| |
| private def getDefaultTypeCoercionPlan(plan: LogicalPlan): LogicalPlan = |
| TypeCoercion.typeCoercionRules.foldLeft(plan) { case (p, rule) => rule(p) } |
| |
| private def extraHintMessage(issueFixedIfAnsiOff: Boolean): String = { |
| if (issueFixedIfAnsiOff) { |
| "\nTo fix the error, you might need to add explicit type casts. If necessary set " + |
| s"${SQLConf.ANSI_ENABLED.key} to false to bypass this error." |
| } else { |
| "" |
| } |
| } |
| |
| private[analysis] def extraHintForAnsiTypeCoercionExpression(plan: LogicalPlan): String = { |
| if (!SQLConf.get.ansiEnabled) { |
| "" |
| } else { |
| val nonAnsiPlan = getDefaultTypeCoercionPlan(plan) |
| var issueFixedIfAnsiOff = true |
| getAllExpressions(nonAnsiPlan).foreach(_.foreachUp { |
| case e: Expression if e.getTagValue(DATA_TYPE_MISMATCH_ERROR).isDefined && |
| e.checkInputDataTypes().isFailure => |
| e.checkInputDataTypes() match { |
| case TypeCheckResult.TypeCheckFailure(_) | _: TypeCheckResult.DataTypeMismatch => |
| issueFixedIfAnsiOff = false |
| } |
| |
| case _ => |
| }) |
| extraHintMessage(issueFixedIfAnsiOff) |
| } |
| } |
| |
| private def extraHintForAnsiTypeCoercionPlan(plan: LogicalPlan): String = { |
| if (!SQLConf.get.ansiEnabled) { |
| "" |
| } else { |
| val nonAnsiPlan = getDefaultTypeCoercionPlan(plan) |
| var issueFixedIfAnsiOff = true |
| nonAnsiPlan match { |
| case _: Union | _: SetOperation if nonAnsiPlan.children.length > 1 => |
| def dataTypes(plan: LogicalPlan): Seq[DataType] = plan.output.map(_.dataType) |
| |
| val ref = dataTypes(nonAnsiPlan.children.head) |
| val dataTypesAreCompatibleFn = getDataTypesAreCompatibleFn(nonAnsiPlan) |
| nonAnsiPlan.children.tail.zipWithIndex.foreach { case (child, ti) => |
| // Check if the data types match. |
| dataTypes(child).zip(ref).zipWithIndex.foreach { case ((dt1, dt2), ci) => |
| if (!dataTypesAreCompatibleFn(dt1, dt2)) { |
| issueFixedIfAnsiOff = false |
| } |
| } |
| } |
| |
| case _ => |
| } |
| extraHintMessage(issueFixedIfAnsiOff) |
| } |
| } |
| |
| private def scrubOutIds(string: String): String = |
| string.replaceAll("#\\d+", "#x") |
| .replaceAll("operator id = \\d+", "operator id = #x") |
| .replaceAll("rand\\(-?\\d+\\)", "rand(number)") |
| |
| private def planToString(plan: LogicalPlan): String = { |
| if (Utils.isTesting) scrubOutIds(plan.toString) else plan.toString |
| } |
| |
| private def exprsToString(exprs: Seq[Expression]): String = { |
| val result = exprs.map(_.toString).mkString("\n") |
| if (Utils.isTesting) scrubOutIds(result) else result |
| } |
| |
| /** |
| * Validates subquery expressions in the plan. Upon failure, returns an user facing error. |
| */ |
| def checkSubqueryExpression(plan: LogicalPlan, expr: SubqueryExpression): Unit = { |
| def checkAggregateInScalarSubquery( |
| conditions: Seq[Expression], |
| query: LogicalPlan, agg: Aggregate): Unit = { |
| // Make sure correlated scalar subqueries contain one row for every outer row by |
| // enforcing that they are aggregates containing exactly one aggregate expression. |
| val aggregates = agg.expressions.flatMap(_.collect { |
| case a: AggregateExpression => a |
| }) |
| if (aggregates.isEmpty) { |
| expr.failAnalysis( |
| errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." + |
| "MUST_AGGREGATE_CORRELATED_SCALAR_SUBQUERY", |
| messageParameters = Map.empty) |
| } |
| |
| // SPARK-18504/SPARK-18814: Block cases where GROUP BY columns |
| // are not part of the correlated columns. |
| val groupByCols = AttributeSet(agg.groupingExpressions.flatMap(_.references)) |
| // Collect the local references from the correlated predicate in the subquery. |
| val subqueryColumns = getCorrelatedPredicates(query).flatMap(_.references) |
| .filterNot(conditions.flatMap(_.references).contains) |
| val correlatedCols = AttributeSet(subqueryColumns) |
| val invalidCols = groupByCols -- correlatedCols |
| // GROUP BY columns must be a subset of columns in the predicates |
| if (invalidCols.nonEmpty) { |
| expr.failAnalysis( |
| errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." + |
| "NON_CORRELATED_COLUMNS_IN_GROUP_BY", |
| messageParameters = Map("value" -> invalidCols.map(_.name).mkString(","))) |
| } |
| } |
| |
| // Skip subquery aliases added by the Analyzer as well as hints. |
| // For projects, do the necessary mapping and skip to its child. |
| @scala.annotation.tailrec |
| def cleanQueryInScalarSubquery(p: LogicalPlan): LogicalPlan = p match { |
| case s: SubqueryAlias => cleanQueryInScalarSubquery(s.child) |
| case p: Project => cleanQueryInScalarSubquery(p.child) |
| case h: ResolvedHint => cleanQueryInScalarSubquery(h.child) |
| case child => child |
| } |
| |
| // Check whether the given expressions contains the subquery expression. |
| def containsExpr(expressions: Seq[Expression]): Boolean = { |
| expressions.exists(_.exists(_.semanticEquals(expr))) |
| } |
| |
| def checkOuterReference(p: LogicalPlan, expr: SubqueryExpression): Unit = p match { |
| case f: Filter => |
| if (hasOuterReferences(expr.plan)) { |
| expr.plan.expressions.foreach(_.foreachUp { |
| case o: OuterReference => |
| p.children.foreach(e => |
| if (!e.output.exists(_.exprId == o.exprId)) { |
| o.failAnalysis( |
| errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." + |
| "CORRELATED_COLUMN_NOT_FOUND", |
| messageParameters = Map("value" -> o.name)) |
| }) |
| case _ => |
| }) |
| } |
| case _ => |
| } |
| |
| // Validate the subquery plan. |
| checkAnalysis0(expr.plan) |
| |
| // Check if there is outer attribute that cannot be found from the plan. |
| checkOuterReference(plan, expr) |
| |
| expr match { |
| case ScalarSubquery(query, outerAttrs, _, _, _, _) => |
| // Scalar subquery must return one column as output. |
| if (query.output.size != 1) { |
| throw QueryCompilationErrors.subqueryReturnMoreThanOneColumn(query.output.size, |
| expr.origin) |
| } |
| |
| if (outerAttrs.nonEmpty) { |
| cleanQueryInScalarSubquery(query) match { |
| case a: Aggregate => checkAggregateInScalarSubquery(outerAttrs, query, a) |
| case Filter(_, a: Aggregate) => checkAggregateInScalarSubquery(outerAttrs, query, a) |
| case p: LogicalPlan if p.maxRows.exists(_ <= 1) => // Ok |
| case other => |
| expr.failAnalysis( |
| errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." + |
| "MUST_AGGREGATE_CORRELATED_SCALAR_SUBQUERY", |
| messageParameters = Map.empty) |
| } |
| |
| // Only certain operators are allowed to host subquery expression containing |
| // outer references. |
| plan match { |
| case _: Filter | _: Project | _: SupportsSubquery => // Ok |
| case a: Aggregate => |
| // If the correlated scalar subquery is in the grouping expressions of an Aggregate, |
| // it must also be in the aggregate expressions to be rewritten in the optimization |
| // phase. |
| if (containsExpr(a.groupingExpressions) && !containsExpr(a.aggregateExpressions)) { |
| a.failAnalysis( |
| errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." + |
| "MUST_AGGREGATE_CORRELATED_SCALAR_SUBQUERY", |
| messageParameters = Map.empty) |
| } |
| case other => |
| other.failAnalysis( |
| errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." + |
| "UNSUPPORTED_CORRELATED_SCALAR_SUBQUERY", |
| messageParameters = Map("treeNode" -> planToString(other))) |
| } |
| } |
| // Validate to make sure the correlations appearing in the query are valid and |
| // allowed by spark. |
| checkCorrelationsInSubquery(expr.plan, isScalar = true) |
| |
| case _: LateralSubquery => |
| assert(plan.isInstanceOf[LateralJoin]) |
| val join = plan.asInstanceOf[LateralJoin] |
| // A lateral join with a multi-row outer query and a non-deterministic lateral subquery |
| // cannot be decorrelated. Otherwise it may produce incorrect results. |
| if (!expr.deterministic && !join.left.maxRows.exists(_ <= 1)) { |
| cleanQueryInScalarSubquery(join.right.plan) match { |
| // Python UDTFs are by default non-deterministic. They are constructed as a |
| // OneRowRelation subquery and can be rewritten by the optimizer without |
| // any decorrelation. |
| case Generate(_: PythonUDTF, _, _, _, _, _: OneRowRelation) |
| if SQLConf.get.getConf(SQLConf.OPTIMIZE_ONE_ROW_RELATION_SUBQUERY) => // Ok |
| case _ => |
| expr.failAnalysis( |
| errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." + |
| "NON_DETERMINISTIC_LATERAL_SUBQUERIES", |
| messageParameters = Map("treeNode" -> planToString(plan))) |
| } |
| } |
| // Check if the lateral join's join condition is deterministic. |
| if (join.condition.exists(!_.deterministic)) { |
| join.condition.get.failAnalysis( |
| errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." + |
| "LATERAL_JOIN_CONDITION_NON_DETERMINISTIC", |
| messageParameters = Map("condition" -> join.condition.get.sql)) |
| } |
| // Validate to make sure the correlations appearing in the query are valid and |
| // allowed by spark. |
| checkCorrelationsInSubquery(expr.plan, isLateral = true) |
| |
| case _: FunctionTableSubqueryArgumentExpression => |
| expr.failAnalysis( |
| errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.UNSUPPORTED_TABLE_ARGUMENT", |
| messageParameters = Map("treeNode" -> planToString(plan))) |
| |
| case inSubqueryOrExistsSubquery => |
| plan match { |
| case _: Filter | _: SupportsSubquery | _: Join | |
| _: Project | _: Aggregate | _: Window => // Ok |
| case _ => |
| expr.failAnalysis( |
| errorClass = |
| "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.UNSUPPORTED_IN_EXISTS_SUBQUERY", |
| messageParameters = Map("treeNode" -> planToString(plan))) |
| } |
| // Validate to make sure the correlations appearing in the query are valid and |
| // allowed by spark. |
| checkCorrelationsInSubquery(expr.plan) |
| } |
| } |
| |
| /** |
| * Validate that collected metrics names are unique. The same name cannot be used for metrics |
| * with different results. However multiple instances of metrics with with same result and name |
| * are allowed (e.g. self-joins). |
| */ |
| private def checkCollectedMetrics(plan: LogicalPlan): Unit = { |
| val metricsMap = mutable.Map.empty[String, CollectMetrics] |
| def check(plan: LogicalPlan): Unit = plan.foreach { node => |
| node match { |
| case metrics @ CollectMetrics(name, _, _, dataframeId) => |
| metricsMap.get(name) match { |
| case Some(other) => |
| // Exact duplicates are allowed. They can be the result |
| // of a CTE that is used multiple times or a self join. |
| if (dataframeId != other.dataframeId) { |
| failAnalysis( |
| errorClass = "DUPLICATED_METRICS_NAME", |
| messageParameters = Map("metricName" -> name)) |
| } |
| case None => |
| metricsMap.put(name, metrics) |
| } |
| case _ => |
| } |
| node.expressions.foreach(_.foreach { |
| case subquery: SubqueryExpression => |
| check(subquery.plan) |
| case _ => |
| }) |
| } |
| check(plan) |
| } |
| |
| /** |
| * Validates to make sure the outer references appearing inside the subquery |
| * are allowed. |
| */ |
| private def checkCorrelationsInSubquery( |
| sub: LogicalPlan, |
| isScalar: Boolean = false, |
| isLateral: Boolean = false): Unit = { |
| // Some query shapes are only supported with the DecorrelateInnerQuery framework. |
| // Support for Exists and IN subqueries is subject to a separate config flag |
| // 'decorrelateInnerQueryEnabledForExistsIn'. |
| val usingDecorrelateInnerQueryFramework = |
| (SQLConf.get.decorrelateInnerQueryEnabledForExistsIn || isScalar || isLateral) && |
| SQLConf.get.decorrelateInnerQueryEnabled |
| |
| // Validate that correlated aggregate expression do not contain a mixture |
| // of outer and local references. |
| def checkMixedReferencesInsideAggregateExpr(expr: Expression): Unit = { |
| expr.foreach { |
| case a: AggregateExpression if containsOuter(a) => |
| if (a.references.nonEmpty) { |
| a.failAnalysis( |
| errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." + |
| "AGGREGATE_FUNCTION_MIXED_OUTER_LOCAL_REFERENCES", |
| messageParameters = Map("function" -> a.sql)) |
| } |
| case _ => |
| } |
| } |
| |
| // Make sure expressions of a plan do not contain outer references. |
| def failOnOuterReferenceInPlan(p: LogicalPlan): Unit = { |
| if (p.expressions.exists(containsOuter)) { |
| p.failAnalysis( |
| errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." + |
| "ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED", |
| messageParameters = Map("treeNode" -> planToString(p))) |
| } |
| } |
| |
| // Check whether the logical plan node can host outer references. |
| // A `Project` can host outer references if it is inside a scalar or a lateral subquery and |
| // DecorrelateInnerQuery is enabled. Otherwise, only Filter can only outer references. |
| def canHostOuter(plan: LogicalPlan): Boolean = plan match { |
| case _: Filter => true |
| case _: Project => usingDecorrelateInnerQueryFramework |
| case _: Join => usingDecorrelateInnerQueryFramework |
| case _ => false |
| } |
| |
| // Make sure a plan's expressions do not contain : |
| // 1. Aggregate expressions that have mixture of outer and local references. |
| // 2. Expressions containing outer references on plan nodes other than allowed operators. |
| def failOnInvalidOuterReference(p: LogicalPlan): Unit = { |
| p.expressions.foreach(checkMixedReferencesInsideAggregateExpr) |
| val exprs = stripOuterReferences(p.expressions.filter(expr => containsOuter(expr))) |
| if (!canHostOuter(p) && !exprs.isEmpty) { |
| p.failAnalysis( |
| errorClass = |
| "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.CORRELATED_REFERENCE", |
| messageParameters = Map("sqlExprs" -> exprs.map(toSQLExpr).mkString(","))) |
| } |
| } |
| |
| // SPARK-17348: A potential incorrect result case. |
| // When a correlated predicate is a non-equality predicate, |
| // certain operators are not permitted from the operator |
| // hosting the correlated predicate up to the operator on the outer table. |
| // Otherwise, the pull up of the correlated predicate |
| // will generate a plan with a different semantics |
| // which could return incorrect result. |
| // Currently we check for Aggregate and Window operators |
| // |
| // Below shows an example of a Logical Plan during Analyzer phase that |
| // show this problem. Pulling the correlated predicate [outer(c2#77) >= ..] |
| // through the Aggregate (or Window) operator could alter the result of |
| // the Aggregate. |
| // |
| // Project [c1#76] |
| // +- Project [c1#87, c2#88] |
| // : (Aggregate or Window operator) |
| // : +- Filter [outer(c2#77) >= c2#88)] |
| // : +- SubqueryAlias t2, `t2` |
| // : +- Project [_1#84 AS c1#87, _2#85 AS c2#88] |
| // : +- LocalRelation [_1#84, _2#85] |
| // +- SubqueryAlias t1, `t1` |
| // +- Project [_1#73 AS c1#76, _2#74 AS c2#77] |
| // +- LocalRelation [_1#73, _2#74] |
| // SPARK-35080: The same issue can happen to correlated equality predicates when |
| // they do not guarantee one-to-one mapping between inner and outer attributes. |
| // For example: |
| // Table: |
| // t1(a, b): [(0, 6), (1, 5), (2, 4)] |
| // t2(c): [(6)] |
| // |
| // Query: |
| // SELECT c, (SELECT COUNT(*) FROM t1 WHERE a + b = c) FROM t2 |
| // |
| // Original subquery plan: |
| // Aggregate [count(1)] |
| // +- Filter ((a + b) = outer(c)) |
| // +- LocalRelation [a, b] |
| // |
| // Plan after pulling up correlated predicates: |
| // Aggregate [a, b] [count(1), a, b] |
| // +- LocalRelation [a, b] |
| // |
| // Plan after rewrite: |
| // Project [c1, count(1)] |
| // +- Join LeftOuter ((a + b) = c) |
| // :- LocalRelation [c] |
| // +- Aggregate [a, b] [count(1), a, b] |
| // +- LocalRelation [a, b] |
| // |
| // The right hand side of the join transformed from the subquery will output |
| // count(1) | a | b |
| // 1 | 0 | 6 |
| // 1 | 1 | 5 |
| // 1 | 2 | 4 |
| // and the plan after rewrite will give the original query incorrect results. |
| def failOnUnsupportedCorrelatedPredicate(predicates: Seq[Expression], p: LogicalPlan): Unit = { |
| // Correlated non-equality predicates are only supported with the decorrelate |
| // inner query framework. Currently we only use this new framework for scalar |
| // and lateral subqueries. |
| val allowNonEqualityPredicates = usingDecorrelateInnerQueryFramework |
| if (!allowNonEqualityPredicates && predicates.nonEmpty) { |
| // Report a non-supported case as an exception |
| p.failAnalysis( |
| errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." + |
| "CORRELATED_COLUMN_IS_NOT_ALLOWED_IN_PREDICATE", |
| messageParameters = |
| Map("treeNode" -> s"${exprsToString(predicates)}\n${planToString(p)}")) |
| } |
| } |
| |
| // Recursively check invalid outer references in the plan. |
| def checkPlan( |
| plan: LogicalPlan, |
| aggregated: Boolean = false, |
| canContainOuter: Boolean = true): Unit = { |
| |
| if (!canContainOuter) { |
| failOnOuterReferenceInPlan(plan) |
| } |
| |
| // Approve operators allowed in a correlated subquery |
| // There are 4 categories: |
| // 1. Operators that are allowed anywhere in a correlated subquery, and, |
| // by definition of the operators, they either do not contain |
| // any columns or cannot host outer references. |
| // 2. Operators that are allowed anywhere in a correlated subquery |
| // so long as they do not host outer references. |
| // 3. Operators that need special handling. These operators are |
| // Filter, Join, Aggregate, and Generate. |
| // |
| // Any operators that are not in the above list are allowed |
| // in a correlated subquery only if they are not on a correlation path. |
| // In other word, these operators are allowed only under a correlation point. |
| // |
| // A correlation path is defined as the sub-tree of all the operators that |
| // are on the path from the operator hosting the correlated expressions |
| // up to the operator producing the correlated values. |
| plan match { |
| // Category 1: |
| // ResolvedHint, LeafNode, Repartition, and SubqueryAlias |
| case p @ (_: ResolvedHint | _: LeafNode | _: Repartition | _: SubqueryAlias) => |
| p.children.foreach(child => checkPlan(child, aggregated, canContainOuter)) |
| |
| case p @ (_ : Union | _: SetOperation) => |
| // Set operations (e.g. UNION) containing correlated values are only supported |
| // with DecorrelateInnerQuery framework. |
| val childCanContainOuter = (canContainOuter |
| && usingDecorrelateInnerQueryFramework |
| && SQLConf.get.getConf(SQLConf.DECORRELATE_SET_OPS_ENABLED)) |
| p.children.foreach(child => checkPlan(child, aggregated, childCanContainOuter)) |
| |
| // Category 2: |
| // These operators can be anywhere in a correlated subquery. |
| // so long as they do not host outer references in the operators. |
| case p: Project => |
| failOnInvalidOuterReference(p) |
| checkPlan(p.child, aggregated, canContainOuter) |
| |
| case s: Sort => |
| failOnInvalidOuterReference(s) |
| checkPlan(s.child, aggregated, canContainOuter) |
| |
| case r: RepartitionByExpression => |
| failOnInvalidOuterReference(r) |
| checkPlan(r.child, aggregated, canContainOuter) |
| |
| case l: LateralJoin => |
| failOnInvalidOuterReference(l) |
| checkPlan(l.child, aggregated, canContainOuter) |
| |
| // Category 3: |
| // Filter is one of the two operators allowed to host correlated expressions. |
| // The other operator is Join. Filter can be anywhere in a correlated subquery. |
| case f: Filter => |
| failOnInvalidOuterReference(f) |
| val (correlated, _) = splitConjunctivePredicates(f.condition).partition(containsOuter) |
| val unsupportedPredicates = correlated.filterNot(DecorrelateInnerQuery.canPullUpOverAgg) |
| if (aggregated) { |
| failOnUnsupportedCorrelatedPredicate(unsupportedPredicates, f) |
| } |
| checkPlan(f.child, aggregated, canContainOuter) |
| |
| // Aggregate cannot host any correlated expressions |
| // It can be on a correlation path if the correlation contains |
| // only supported correlated equality predicates. |
| // It cannot be on a correlation path if the correlation has |
| // non-equality correlated predicates. |
| case a: Aggregate => |
| failOnInvalidOuterReference(a) |
| checkPlan(a.child, aggregated = true, canContainOuter) |
| |
| // Same as Aggregate above. |
| case w: Window => |
| failOnInvalidOuterReference(w) |
| checkPlan(w.child, aggregated = true, canContainOuter) |
| |
| // Distinct does not host any correlated expressions, but during the optimization phase |
| // it will be rewritten as Aggregate, which can only be on a correlation path if the |
| // correlation contains only the supported correlated equality predicates. |
| // Only block it for lateral subqueries because scalar subqueries must be aggregated |
| // and it does not impact the results for IN/EXISTS subqueries. |
| case d: Distinct => |
| checkPlan(d.child, aggregated = isLateral, canContainOuter) |
| |
| // Join can host correlated expressions. |
| case j @ Join(left, right, joinType, _, _) => |
| failOnInvalidOuterReference(j) |
| joinType match { |
| // Inner join, like Filter, can be anywhere. |
| case _: InnerLike => |
| j.children.foreach(child => checkPlan(child, aggregated, canContainOuter)) |
| |
| // Left outer join's right operand cannot be on a correlation path. |
| // LeftAnti and ExistenceJoin are special cases of LeftOuter. |
| // Note that ExistenceJoin cannot be expressed externally in both SQL and DataFrame |
| // so it should not show up here in Analysis phase. This is just a safety net. |
| // |
| // LeftSemi does not allow output from the right operand. |
| // Any correlated references in the subplan |
| // of the right operand cannot be pulled up. |
| case LeftOuter | LeftSemi | LeftAnti | ExistenceJoin(_) => |
| checkPlan(left, aggregated, canContainOuter) |
| checkPlan(right, aggregated, canContainOuter = false) |
| |
| // Likewise, Right outer join's left operand cannot be on a correlation path. |
| case RightOuter => |
| checkPlan(left, aggregated, canContainOuter = false) |
| checkPlan(right, aggregated, canContainOuter) |
| |
| // Any other join types not explicitly listed above, |
| // including Full outer join, are treated as Category 4. |
| case _ => |
| j.children.foreach(child => checkPlan(child, aggregated, canContainOuter = false)) |
| } |
| |
| // Generator with join=true, i.e., expressed with |
| // LATERAL VIEW [OUTER], similar to inner join, |
| // allows to have correlation under it |
| // but must not host any outer references. |
| // Note: |
| // Generator with requiredChildOutput.isEmpty is treated as Category 4. |
| case g: Generate if g.requiredChildOutput.nonEmpty => |
| failOnInvalidOuterReference(g) |
| checkPlan(g.child, aggregated, canContainOuter) |
| |
| // Correlated subquery can have a LIMIT clause |
| case l @ Limit(_, input) => |
| failOnInvalidOuterReference(l) |
| checkPlan( |
| input, |
| aggregated, |
| canContainOuter && SQLConf.get.getConf(SQLConf.DECORRELATE_LIMIT_ENABLED)) |
| |
| case o @ Offset(_, input) => |
| failOnInvalidOuterReference(o) |
| checkPlan( |
| input, |
| aggregated, |
| canContainOuter && SQLConf.get.getConf(SQLConf.DECORRELATE_OFFSET_ENABLED)) |
| |
| // Category 4: Any other operators not in the above 3 categories |
| // cannot be on a correlation path, that is they are allowed only |
| // under a correlation point but they and their descendant operators |
| // are not allowed to have any correlated expressions. |
| case p => |
| p.children.foreach(p => checkPlan(p, aggregated, canContainOuter = false)) |
| } |
| } |
| |
| // Simplify the predicates before validating any unsupported correlation patterns in the plan. |
| AnalysisHelper.allowInvokingTransformsInAnalyzer { |
| checkPlan(BooleanSimplification(sub)) |
| } |
| } |
| |
| /** |
| * Validates the options used for alter table commands after table and columns are resolved. |
| */ |
| private def checkAlterTableCommand(alter: AlterTableCommand): Unit = { |
| def checkColumnNotExists(op: String, fieldNames: Seq[String], struct: StructType): Unit = { |
| if (struct.findNestedField( |
| fieldNames, includeCollections = true, alter.conf.resolver).isDefined) { |
| alter.failAnalysis( |
| errorClass = "FIELD_ALREADY_EXISTS", |
| messageParameters = Map( |
| "op" -> op, |
| "fieldNames" -> toSQLId(fieldNames), |
| "struct" -> toSQLType(struct))) |
| } |
| } |
| |
| def checkColumnNameDuplication(colsToAdd: Seq[QualifiedColType]): Unit = { |
| SchemaUtils.checkColumnNameDuplication( |
| colsToAdd.map(_.name.quoted), |
| alter.conf.resolver) |
| } |
| |
| alter match { |
| case AddColumns(table: ResolvedTable, colsToAdd) => |
| colsToAdd.foreach { colToAdd => |
| checkColumnNotExists("add", colToAdd.name, table.schema) |
| } |
| checkColumnNameDuplication(colsToAdd) |
| |
| case ReplaceColumns(_: ResolvedTable, colsToAdd) => |
| checkColumnNameDuplication(colsToAdd) |
| |
| case RenameColumn(table: ResolvedTable, col: ResolvedFieldName, newName) => |
| checkColumnNotExists("rename", col.path :+ newName, table.schema) |
| |
| case a @ AlterColumn(table: ResolvedTable, col: ResolvedFieldName, _, _, _, _, _) => |
| val fieldName = col.name.quoted |
| if (a.dataType.isDefined) { |
| val field = CharVarcharUtils.getRawType(col.field.metadata) |
| .map(dt => col.field.copy(dataType = dt)) |
| .getOrElse(col.field) |
| val newDataType = a.dataType.get |
| newDataType match { |
| case _: StructType => alter.failAnalysis( |
| "CANNOT_UPDATE_FIELD.STRUCT_TYPE", |
| Map("table" -> toSQLId(table.name), "fieldName" -> toSQLId(fieldName))) |
| case _: MapType => alter.failAnalysis( |
| "CANNOT_UPDATE_FIELD.MAP_TYPE", |
| Map("table" -> toSQLId(table.name), "fieldName" -> toSQLId(fieldName))) |
| case _: ArrayType => alter.failAnalysis( |
| "CANNOT_UPDATE_FIELD.ARRAY_TYPE", |
| Map("table" -> toSQLId(table.name), "fieldName" -> toSQLId(fieldName))) |
| case u: UserDefinedType[_] => alter.failAnalysis( |
| "CANNOT_UPDATE_FIELD.USER_DEFINED_TYPE", |
| Map( |
| "table" -> toSQLId(table.name), |
| "fieldName" -> toSQLId(fieldName), |
| "udtSql" -> toSQLType(u))) |
| case _: CalendarIntervalType | _: AnsiIntervalType => alter.failAnalysis( |
| "CANNOT_UPDATE_FIELD.INTERVAL_TYPE", |
| Map("table" -> toSQLId(table.name), "fieldName" -> toSQLId(fieldName))) |
| case _ => // update is okay |
| } |
| |
| // We don't need to handle nested types here which shall fail before. |
| def canAlterColumnType(from: DataType, to: DataType): Boolean = (from, to) match { |
| case (CharType(l1), CharType(l2)) => l1 == l2 |
| case (CharType(l1), VarcharType(l2)) => l1 <= l2 |
| case (VarcharType(l1), VarcharType(l2)) => l1 <= l2 |
| case _ => Cast.canUpCast(from, to) |
| } |
| |
| if (!canAlterColumnType(field.dataType, newDataType)) { |
| alter.failAnalysis( |
| errorClass = "NOT_SUPPORTED_CHANGE_COLUMN", |
| messageParameters = Map( |
| "table" -> toSQLId(table.name), |
| "originName" -> toSQLId(fieldName), |
| "originType" -> toSQLType(field.dataType), |
| "newName" -> toSQLId(fieldName), |
| "newType" -> toSQLType(newDataType))) |
| } |
| } |
| if (a.nullable.isDefined) { |
| if (!a.nullable.get && col.field.nullable) { |
| alter.failAnalysis( |
| errorClass = "_LEGACY_ERROR_TEMP_2330", |
| messageParameters = Map("fieldName" -> fieldName)) |
| } |
| } |
| case _ => |
| } |
| } |
| } |