blob: 06ec415c78c1a4d5c4fdc802415891de9097bbd2 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.analysis;
import static java.lang.String.format;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.impala.authorization.Privilege;
import org.apache.impala.catalog.Column;
import org.apache.impala.catalog.FeKuduTable;
import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.Pair;
import org.apache.impala.planner.DataSink;
import org.apache.impala.rewrite.ExprRewriter;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
/**
* Abstract super class for statements that modify existing data like
* UPDATE and DELETE.
*
* The ModifyStmt has four major parts:
* - targetTablePath (not null)
* - fromClause (not null)
* - assignmentExprs (not null, can be empty)
* - wherePredicate (nullable)
*
* In the analysis phase, a SelectStmt is created with the result expressions set to
* match the right-hand side of the assignments in addition to projecting the key columns
* of the underlying table. During query execution, the plan that
* is generated from this SelectStmt produces all rows that need to be modified.
*
* Currently, only Kudu tables can be modified.
*/
public abstract class ModifyStmt extends StatementBase {
// List of explicitly mentioned assignment expressions in the UPDATE's SET clause
protected final List<Pair<SlotRef, Expr>> assignments_;
// Optional WHERE clause of the statement
protected final Expr wherePredicate_;
// Path identifying the target table.
protected final List<String> targetTablePath_;
// TableRef identifying the target table, set during analysis.
protected TableRef targetTableRef_;
protected FromClause fromClause_;
// Result of the analysis of the internal SelectStmt that produces the rows that
// will be modified.
protected SelectStmt sourceStmt_;
// Target Kudu table. Since currently only Kudu tables are supported, we use a
// concrete table class. Result of analysis.
protected FeKuduTable table_;
// END: Members that need to be reset()
/////////////////////////////////////////
// Position mapping of output expressions of the sourceStmt_ to column indices in the
// target table. The i'th position in this list maps to the referencedColumns_[i]'th
// position in the target table. Set in createSourceStmt() during analysis.
protected List<Integer> referencedColumns_;
// SQL string of the ModifyStmt. Set in analyze().
protected String sqlString_;
public ModifyStmt(List<String> targetTablePath, FromClause fromClause,
List<Pair<SlotRef, Expr>> assignmentExprs, Expr wherePredicate) {
targetTablePath_ = Preconditions.checkNotNull(targetTablePath);
fromClause_ = Preconditions.checkNotNull(fromClause);
assignments_ = Preconditions.checkNotNull(assignmentExprs);
wherePredicate_ = wherePredicate;
}
@Override
public void collectTableRefs(List<TableRef> tblRefs) {
tblRefs.add(new TableRef(targetTablePath_, null));
fromClause_.collectTableRefs(tblRefs);
if (wherePredicate_ != null) {
// Collect TableRefs in WHERE-clause subqueries.
List<Subquery> subqueries = new ArrayList<>();
wherePredicate_.collect(Subquery.class, subqueries);
for (Subquery sq : subqueries) {
sq.getStatement().collectTableRefs(tblRefs);
}
}
}
/**
* The analysis of the ModifyStmt proceeds as follows: First, the FROM clause is
* analyzed and the targetTablePath is verified to be a valid alias into the FROM
* clause. When the target table is identified, the assignment expressions are
* validated and as a last step the internal SelectStmt is produced and analyzed.
* Potential query rewrites for the select statement are implemented here and are not
* triggered externally by the statement rewriter.
*/
@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
super.analyze(analyzer);
fromClause_.analyze(analyzer);
List<Path> candidates = analyzer.getTupleDescPaths(targetTablePath_);
if (candidates.isEmpty()) {
throw new AnalysisException(format("'%s' is not a valid table alias or reference.",
Joiner.on(".").join(targetTablePath_)));
}
Preconditions.checkState(candidates.size() == 1);
Path path = candidates.get(0);
path.resolve();
if (path.destTupleDesc() == null) {
throw new AnalysisException(format(
"'%s' is not a table alias. Using the FROM clause requires the target table " +
"to be a table alias.",
Joiner.on(".").join(targetTablePath_)));
}
targetTableRef_ = analyzer.getTableRef(path.getRootDesc().getId());
if (targetTableRef_ instanceof InlineViewRef) {
throw new AnalysisException(format("Cannot modify view: '%s'",
targetTableRef_.toSql()));
}
Preconditions.checkNotNull(targetTableRef_);
FeTable dstTbl = targetTableRef_.getTable();
// Only Kudu tables can be updated
if (!(dstTbl instanceof FeKuduTable)) {
throw new AnalysisException(
format("Impala does not support modifying a non-Kudu table: %s",
dstTbl.getFullName()));
}
table_ = (FeKuduTable) dstTbl;
// Make sure that the user is allowed to modify the target table. Use ALL because no
// UPDATE / DELETE privilege exists yet (IMPALA-3840).
analyzer.registerAuthAndAuditEvent(dstTbl, Privilege.ALL);
// Validates the assignments_ and creates the sourceStmt_.
if (sourceStmt_ == null) createSourceStmt(analyzer);
sourceStmt_.analyze(analyzer);
// Add target table to descriptor table.
analyzer.getDescTbl().setTargetTable(table_);
sqlString_ = toSql();
}
@Override
public void reset() {
super.reset();
fromClause_.reset();
if (sourceStmt_ != null) sourceStmt_.reset();
table_ = null;
}
/**
* Builds and validates the sourceStmt_. The select list of the sourceStmt_ contains
* first the SlotRefs for the key Columns, followed by the expressions representing the
* assignments. This method sets the member variables for the sourceStmt_ and the
* referencedColumns_.
*
* This is only run once, on the first analysis. Following analysis will reset() and
* reuse previously created statements.
*/
private void createSourceStmt(Analyzer analyzer)
throws AnalysisException {
// Builds the select list and column position mapping for the target table.
ArrayList<SelectListItem> selectList = new ArrayList<>();
referencedColumns_ = new ArrayList<>();
buildAndValidateAssignmentExprs(analyzer, selectList, referencedColumns_);
// Analyze the generated select statement.
sourceStmt_ = new SelectStmt(new SelectList(selectList), fromClause_, wherePredicate_,
null, null, null, null);
// cast result expressions to the correct type of the referenced slot of the
// target table
int keyColumnsOffset = table_.getPrimaryKeyColumnNames().size();
for (int i = keyColumnsOffset; i < sourceStmt_.resultExprs_.size(); ++i) {
sourceStmt_.resultExprs_.set(i, sourceStmt_.resultExprs_.get(i).castTo(
assignments_.get(i - keyColumnsOffset).first.getType()));
}
}
/**
* Validates the list of value assignments that should be used to modify the target
* table. It verifies that only those columns are referenced that belong to the target
* table, no key columns are modified, and that a single column is not modified multiple
* times. Analyzes the Exprs and SlotRefs of assignments_ and writes a list of
* SelectListItems to the out parameter selectList that is used to build the select list
* for sourceStmt_. A list of integers indicating the column position of an entry in the
* select list in the target table is written to the out parameter referencedColumns.
*
* In addition to the expressions that are generated for each assignment, the
* expression list contains an expression for each key column. The key columns
* are always prepended to the list of expression representing the assignments.
*/
private void buildAndValidateAssignmentExprs(Analyzer analyzer,
List<SelectListItem> selectList, List<Integer> referencedColumns)
throws AnalysisException {
// The order of the referenced columns equals the order of the result expressions
Set<SlotId> uniqueSlots = new HashSet<>();
Set<SlotId> keySlots = new HashSet<>();
// Mapping from column name to index
List<Column> cols = table_.getColumnsInHiveOrder();
Map<String, Integer> colIndexMap = new HashMap<>();
for (int i = 0; i < cols.size(); i++) {
colIndexMap.put(cols.get(i).getName(), i);
}
// Add the key columns as slot refs
for (String k : table_.getPrimaryKeyColumnNames()) {
List<String> path = Path.createRawPath(targetTableRef_.getUniqueAlias(), k);
SlotRef ref = new SlotRef(path);
ref.analyze(analyzer);
selectList.add(new SelectListItem(ref, null));
uniqueSlots.add(ref.getSlotId());
keySlots.add(ref.getSlotId());
referencedColumns.add(colIndexMap.get(k));
}
// Assignments are only used in the context of updates.
for (Pair<SlotRef, Expr> valueAssignment : assignments_) {
SlotRef lhsSlotRef = valueAssignment.first;
lhsSlotRef.analyze(analyzer);
Expr rhsExpr = valueAssignment.second;
// No subqueries for rhs expression
if (rhsExpr.contains(Subquery.class)) {
throw new AnalysisException(
format("Subqueries are not supported as update expressions for column '%s'",
lhsSlotRef.toSql()));
}
rhsExpr.analyze(analyzer);
// Correct target table
if (!lhsSlotRef.isBoundByTupleIds(targetTableRef_.getId().asList())) {
throw new AnalysisException(
format("Left-hand side column '%s' in assignment expression '%s=%s' does not "
+ "belong to target table '%s'", lhsSlotRef.toSql(), lhsSlotRef.toSql(),
rhsExpr.toSql(), targetTableRef_.getDesc().getTable().getFullName()));
}
Column c = lhsSlotRef.getResolvedPath().destColumn();
// TODO(Kudu) Add test for this code-path when Kudu supports nested types
if (c == null) {
throw new AnalysisException(
format("Left-hand side in assignment expression '%s=%s' must be a column " +
"reference", lhsSlotRef.toSql(), rhsExpr.toSql()));
}
if (keySlots.contains(lhsSlotRef.getSlotId())) {
throw new AnalysisException(format("Key column '%s' cannot be updated.",
lhsSlotRef.toSql()));
}
if (uniqueSlots.contains(lhsSlotRef.getSlotId())) {
throw new AnalysisException(
format("Duplicate value assignment to column: '%s'", lhsSlotRef.toSql()));
}
rhsExpr = checkTypeCompatibility(targetTableRef_.getDesc().getTable().getFullName(),
c, rhsExpr, analyzer.isDecimalV2(), null /*widestTypeSrcExpr*/);
uniqueSlots.add(lhsSlotRef.getSlotId());
selectList.add(new SelectListItem(rhsExpr, null));
referencedColumns.add(colIndexMap.get(c.getName()));
}
}
@Override
public List<Expr> getResultExprs() { return sourceStmt_.getResultExprs(); }
@Override
public void castResultExprs(List<Type> types) throws AnalysisException {
sourceStmt_.castResultExprs(types);
}
@Override
public void rewriteExprs(ExprRewriter rewriter) throws AnalysisException {
Preconditions.checkState(isAnalyzed());
for (Pair<SlotRef, Expr> assignment: assignments_) {
assignment.second = rewriter.rewrite(assignment.second, analyzer_);
}
sourceStmt_.rewriteExprs(rewriter);
}
public QueryStmt getQueryStmt() { return sourceStmt_; }
public abstract DataSink createDataSink(List<Expr> resultExprs);
@Override
public abstract String toSql(ToSqlOptions options);
}