blob: eaf1a7b690dd90e999ff910319231f51ccb9e54f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.analysis;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.impala.authorization.Privilege;
import org.apache.impala.catalog.Column;
import org.apache.impala.catalog.FeFsTable;
import org.apache.impala.catalog.FeHBaseTable;
import org.apache.impala.catalog.FeKuduTable;
import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.FeView;
import org.apache.impala.catalog.KuduColumn;
import org.apache.impala.catalog.Type;
import org.apache.impala.catalog.View;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.Pair;
import org.apache.impala.planner.DataSink;
import org.apache.impala.planner.TableSink;
import org.apache.impala.rewrite.ExprRewriter;
import org.apache.impala.thrift.TSortingOrder;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
/**
* Representation of a single insert or upsert statement, including the select statement
* whose results are to be inserted.
*/
public class InsertStmt extends StatementBase {
// Determines the location of optional hints. The "Start" option is motivated by
// Oracle's hint placement at the start of the statement and the "End" option places
// the hint right before the query (if specified).
//
// Examples:
// Start: INSERT /* +my hint */ <tablename> ... SELECT ...
// End: INSERT <tablename> /* +my hint */ SELECT ...
public enum HintLocation { Start, End };
// Target table name as seen by the parser
private final TableName originalTableName_;
// Differentiates between INSERT INTO and INSERT OVERWRITE.
private final boolean overwrite_;
// List of column:value elements from the PARTITION (...) clause.
// Set to null if no partition was given.
private final List<PartitionKeyValue> partitionKeyValues_;
// User-supplied hints to control hash partitioning before the table sink in the plan.
private List<PlanHint> planHints_ = new ArrayList<>();
// The location of given hints.
private HintLocation hintLoc_;
// False if the original insert statement had a query statement, true if we need to
// auto-generate one (for insert into tbl()) during analysis.
private final boolean needsGeneratedQueryStatement_;
// The column permutation is specified by writing:
// (INSERT|UPSERT) INTO tbl(col3, col1, col2...)
//
// It is a mapping from select-list expr index to (non-partition) output column. If
// null, will be set to the default permutation of all non-partition columns in Hive
// order or all columns for Kudu tables.
//
// A column is said to be 'mentioned' if it occurs either in the column permutation, or
// the PARTITION clause. If columnPermutation is null, all non-partition columns are
// considered mentioned.
//
// Between them, the columnPermutation and the set of partitionKeyValues must mention
// every partition column in the target table exactly once. Other columns, if not
// explicitly mentioned, will be assigned NULL values for INSERTs and left unassigned
// for UPSERTs. Partition columns are not defaulted to NULL by design, and are not just
// for NULL-valued partition slots.
//
// Dynamic partition keys may occur in either the permutation or the PARTITION
// clause. Partition columns with static values may only be mentioned in the PARTITION
// clause, where the static value is specified.
private final List<String> columnPermutation_;
/////////////////////////////////////////
// BEGIN: Members that need to be reset()
// List of inline views that may be referenced in queryStmt.
private final WithClause withClause_;
// Target table into which to insert. May be qualified by analyze()
private TableName targetTableName_;
// Select or union whose results are to be inserted. If null, will be set after
// analysis.
private QueryStmt queryStmt_;
// Set in analyze(). Contains metadata of target table to determine type of sink.
private FeTable table_;
// Set in analyze(). Exprs correspond to the partitionKeyValues, if specified, or to
// the partition columns for Kudu tables.
private List<Expr> partitionKeyExprs_ = new ArrayList<>();
// Set in analyze(). Maps exprs in partitionKeyExprs_ to their column's position in the
// table, eg. partitionKeyExprs_[i] corresponds to table_.columns(partitionKeyIdx_[i]).
// For Kudu tables, the primary keys are a leading subset of the cols, and the partition
// cols can be any subset of the primary keys, meaning that this list will be in
// ascending order from '0' to '# primary key cols - 1' but may leave out some numbers.
private final List<Integer> partitionColPos_ = new ArrayList<>();
// Indicates whether this insert stmt has a shuffle or noshuffle plan hint.
// Both flags may be false. Only one of them may be true, not both.
// Shuffle forces data repartitioning before the data sink, and noshuffle
// prevents it. Set in analyze() based on planHints_.
private boolean hasShuffleHint_ = false;
private boolean hasNoShuffleHint_ = false;
// Indicates whether this insert stmt has a clustered or noclustered hint. Only one of
// them may be true, not both. If clustering is requested, we add a clustering phase
// before the data sink, so that partitions can be written sequentially. The default
// behavior is to not perform an additional clustering step. Both are required to detect
// conflicting hints.
private boolean hasClusteredHint_ = false;
private boolean hasNoClusteredHint_ = false;
// For every column of the target table that is referenced in the optional
// 'sort.columns' table property, this list will
// contain the corresponding result expr from 'resultExprs_'. Before insertion, all rows
// will be sorted by these exprs. If the list is empty, no additional sorting by
// non-partitioning columns will be performed. The column list must not contain
// partition columns and must be empty for non-Hdfs tables.
private List<Expr> sortExprs_ = new ArrayList<>();
// Stores the indices into the list of non-clustering columns of the target table that
// are mentioned in the 'sort.columns' table property. This is
// sent to the backend to populate the RowGroup::sorting_columns list in parquet files.
private List<Integer> sortColumns_ = new ArrayList<>();
// The order used in SORT BY queries.
private TSortingOrder sortingOrder_ = TSortingOrder.LEXICAL;
// Output expressions that produce the final results to write to the target table. May
// include casts. Set in prepareExpressions().
// If this is an INSERT on a non-Kudu table, it will contain one Expr for all
// non-partition columns of the target table with NullLiterals where an output
// column isn't explicitly mentioned. The i'th expr produces the i'th column of
// the target table.
//
// For Kudu tables (INSERT and UPSERT operations), it will contain one Expr per column
// mentioned in the query and mentionedColumns_ is used to map between the Exprs
// and columns in the target table.
private List<Expr> resultExprs_ = new ArrayList<>();
// Position mapping of exprs in resultExprs_ to columns in the target table -
// resultExprs_[i] produces the mentionedColumns_[i] column of the target table.
// Only used for Kudu tables, set in prepareExpressions().
private final List<Integer> mentionedColumns_ = new ArrayList<>();
// Set in analyze(). Exprs corresponding to key columns of Kudu tables. Empty for
// non-Kudu tables.
private List<Expr> primaryKeyExprs_ = new ArrayList<>();
// Set by the Frontend if the target table is transactional.
private long writeId_ = -1;
// END: Members that need to be reset()
/////////////////////////////////////////
// True iff this is an UPSERT operation. Only supported for Kudu tables.
private final boolean isUpsert_;
public static InsertStmt createInsert(WithClause withClause, TableName targetTable,
boolean overwrite, List<PartitionKeyValue> partitionKeyValues,
List<PlanHint> planHints, HintLocation hintLoc, QueryStmt queryStmt,
List<String> columnPermutation) {
return new InsertStmt(withClause, targetTable, overwrite, partitionKeyValues,
planHints, hintLoc, queryStmt, columnPermutation, false);
}
public static InsertStmt createUpsert(WithClause withClause, TableName targetTable,
List<PlanHint> planHints, HintLocation hintLoc, QueryStmt queryStmt,
List<String> columnPermutation) {
return new InsertStmt(withClause, targetTable, false, null, planHints, hintLoc,
queryStmt, columnPermutation, true);
}
protected InsertStmt(WithClause withClause, TableName targetTable, boolean overwrite,
List<PartitionKeyValue> partitionKeyValues, List<PlanHint> planHints,
HintLocation hintLoc, QueryStmt queryStmt, List<String> columnPermutation,
boolean isUpsert) {
Preconditions.checkState(!isUpsert || (!overwrite && partitionKeyValues == null));
withClause_ = withClause;
targetTableName_ = targetTable;
originalTableName_ = targetTableName_;
overwrite_ = overwrite;
partitionKeyValues_ = partitionKeyValues;
planHints_ = (planHints != null) ? planHints : new ArrayList<>();
hintLoc_ = (hintLoc != null) ? hintLoc : HintLocation.End;
queryStmt_ = queryStmt;
needsGeneratedQueryStatement_ = (queryStmt == null);
columnPermutation_ = columnPermutation;
table_ = null;
isUpsert_ = isUpsert;
}
/**
* C'tor used in clone().
*/
private InsertStmt(InsertStmt other) {
super(other);
withClause_ = other.withClause_ != null ? other.withClause_.clone() : null;
targetTableName_ = other.targetTableName_;
originalTableName_ = other.originalTableName_;
overwrite_ = other.overwrite_;
partitionKeyValues_ = other.partitionKeyValues_;
planHints_ = other.planHints_;
hintLoc_ = other.hintLoc_;
queryStmt_ = other.queryStmt_ != null ? other.queryStmt_.clone() : null;
needsGeneratedQueryStatement_ = other.needsGeneratedQueryStatement_;
columnPermutation_ = other.columnPermutation_;
table_ = other.table_;
isUpsert_ = other.isUpsert_;
writeId_ = other.writeId_;
}
@Override
public void reset() {
super.reset();
if (withClause_ != null) withClause_.reset();
targetTableName_ = originalTableName_;
queryStmt_.reset();
table_ = null;
partitionKeyExprs_.clear();
partitionColPos_.clear();
hasShuffleHint_ = false;
hasNoShuffleHint_ = false;
hasClusteredHint_ = false;
hasNoClusteredHint_ = false;
sortExprs_.clear();
sortColumns_.clear();
sortingOrder_ = TSortingOrder.LEXICAL;
resultExprs_.clear();
mentionedColumns_.clear();
primaryKeyExprs_.clear();
writeId_ = -1;
}
@Override
public InsertStmt clone() { return new InsertStmt(this); }
@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
if (isAnalyzed()) return;
super.analyze(analyzer);
if (withClause_ != null) withClause_.analyze(analyzer);
List<Expr> selectListExprs = null;
if (!needsGeneratedQueryStatement_) {
// Use a child analyzer for the query stmt to properly scope WITH-clause
// views and to ignore irrelevant ORDER BYs.
Analyzer queryStmtAnalyzer = new Analyzer(analyzer);
queryStmt_.analyze(queryStmtAnalyzer);
// Use getResultExprs() and not getBaseTblResultExprs() here because the final
// substitution with TupleIsNullPredicate() wrapping happens in planning.
selectListExprs = Expr.cloneList(queryStmt_.getResultExprs());
} else {
selectListExprs = new ArrayList<>();
}
// Set target table and perform table-type specific analysis and auth checking.
// Also checks if the target table is missing.
analyzeTargetTable(analyzer);
boolean isHBaseTable = (table_ instanceof FeHBaseTable);
int numClusteringCols = isHBaseTable ? 0 : table_.getNumClusteringCols();
// Analysis of the INSERT statement from this point is basically the act of matching
// the set of output columns (which come from a column permutation, perhaps
// implicitly, and the PARTITION clause) to the set of input columns (which come from
// the select-list and any statically-valued columns in the PARTITION clause).
//
// First, we compute the set of mentioned columns, and reject statements that refer to
// non-existent columns, or duplicates (we must check both the column permutation, and
// the set of partition keys). Next, we check that all partition columns are
// mentioned. During this process we build the map from select-list expr index to
// column in the targeted table.
//
// Then we check that the select-list contains exactly the right number of expressions
// for all mentioned columns which are not statically-valued partition columns (which
// get their expressions from partitionKeyValues).
//
// Finally, prepareExpressions analyzes the expressions themselves, and confirms that
// they are type-compatible with the target columns. Where columns are not mentioned
// (and by this point, we know that missing columns are not partition columns),
// prepareExpressions assigns them a NULL literal expressions, unless the target is
// a Kudu table, in which case we don't want to overwrite unmentioned columns with
// NULL.
// An null permutation clause is the same as listing all non-partition columns in
// order.
List<String> analysisColumnPermutation = columnPermutation_;
if (analysisColumnPermutation == null) {
analysisColumnPermutation = new ArrayList<>();
List<Column> tableColumns = table_.getColumns();
for (int i = numClusteringCols; i < tableColumns.size(); ++i) {
analysisColumnPermutation.add(tableColumns.get(i).getName());
}
}
// selectExprTargetColumns maps from select expression index to a column in the target
// table. It will eventually include all mentioned columns that aren't static-valued
// partition columns.
List<Column> selectExprTargetColumns = new ArrayList<>();
// Tracks the name of all columns encountered in either the permutation clause or the
// partition clause to detect duplicates.
Set<String> mentionedColumnNames = new HashSet<>();
for (String columnName: analysisColumnPermutation) {
Column column = table_.getColumn(columnName);
if (column == null) {
throw new AnalysisException(
"Unknown column '" + columnName + "' in column permutation");
}
if (!mentionedColumnNames.add(column.getName())) {
throw new AnalysisException(
"Duplicate column '" + columnName + "' in column permutation");
}
selectExprTargetColumns.add(column);
}
int numStaticPartitionExprs = 0;
if (partitionKeyValues_ != null) {
for (PartitionKeyValue pkv: partitionKeyValues_) {
Column column = table_.getColumn(pkv.getColName());
if (column == null) {
throw new AnalysisException("Unknown column '" + pkv.getColName() +
"' in partition clause");
}
if (column.getPosition() >= numClusteringCols) {
throw new AnalysisException(
"Column '" + pkv.getColName() + "' is not a partition column");
}
if (!mentionedColumnNames.add(pkv.getColName())) {
throw new AnalysisException(
"Duplicate column '" + pkv.getColName() + "' in partition clause");
}
if (!pkv.isDynamic()) {
numStaticPartitionExprs++;
} else {
selectExprTargetColumns.add(column);
}
}
}
// Checks that exactly all columns in the target table are assigned an expr.
checkColumnCoverage(selectExprTargetColumns, mentionedColumnNames,
selectListExprs.size(), numStaticPartitionExprs);
// Make sure static partition key values only contain const exprs.
if (partitionKeyValues_ != null) {
for (PartitionKeyValue kv: partitionKeyValues_) {
kv.analyze(analyzer);
}
}
// Check that we can write to the target table/partition. This must be
// done after the partition expression has been analyzed above.
analyzeWriteAccess();
// Populate partitionKeyExprs from partitionKeyValues and selectExprTargetColumns
prepareExpressions(selectExprTargetColumns, selectListExprs, table_, analyzer);
// Analyze 'sort.columns' and 'sort.order' table properties and populate
// sortColumns_, sortExprs_, and sortingOrder_.
analyzeSortColumns();
// Analyze plan hints at the end to prefer reporting other error messages first
// (e.g., the PARTITION clause is not applicable to unpartitioned and HBase tables).
analyzePlanHints(analyzer);
if (hasNoClusteredHint_ && !sortExprs_.isEmpty()) {
analyzer.addWarning(String.format("Insert statement has 'noclustered' hint, but " +
"table has '%s' property. The 'noclustered' hint will be ignored.",
AlterTableSortByStmt.TBL_PROP_SORT_COLUMNS));
}
}
/**
* Sets table_ based on targetTableName_ and performs table-type specific analysis:
* - Cannot (in|up)sert into a view
* - Cannot (in|up)sert into a table with unsupported column types
* - Analysis specific to insert and upsert operations
* Adds table_ to the analyzer's descriptor table if analysis succeeds.
*/
private void analyzeTargetTable(Analyzer analyzer) throws AnalysisException {
// Fine-grained privileges for UPSERT do not exist yet, so they require ALL for now.
Privilege privilegeRequired = isUpsert_ ? Privilege.ALL : Privilege.INSERT;
// If the table has not yet been set, load it from the Catalog. This allows for
// callers to set a table to analyze that may not actually be created in the Catalog.
// One example use case is CREATE TABLE AS SELECT which must run analysis on the
// INSERT before the table has actually been created.
if (table_ == null) {
if (!targetTableName_.isFullyQualified()) {
targetTableName_ =
new TableName(analyzer.getDefaultDb(), targetTableName_.getTbl());
}
table_ = analyzer.getTable(targetTableName_, privilegeRequired);
} else {
targetTableName_ = new TableName(table_.getDb().getName(), table_.getName());
analyzer.registerPrivReq(builder ->
builder.onTable(table_).allOf(privilegeRequired).build());
}
// We do not support (in|up)serting into views.
if (table_ instanceof FeView) {
throw new AnalysisException(
String.format("Impala does not support %sing into views: %s", getOpName(),
table_.getFullName()));
}
analyzer.checkTableCapability(table_, Analyzer.OperationType.WRITE);
// We do not support (in|up)serting into tables with unsupported column types.
for (Column c: table_.getColumns()) {
if (!c.getType().isSupported()) {
throw new AnalysisException(String.format("Unable to %s into target table " +
"(%s) because the column '%s' has an unsupported type '%s'.",
getOpName(), targetTableName_, c.getName(), c.getType().toSql()));
}
if (c.getType().isComplexType()) {
throw new AnalysisException(String.format("Unable to %s into target table " +
"(%s) because the column '%s' has a complex type '%s' and Impala doesn't " +
"support inserting into tables containing complex type columns",
getOpName(), targetTableName_, c.getName(), c.getType().toSql()));
}
}
// Perform operation-specific analysis.
if (isUpsert_) {
if (!(table_ instanceof FeKuduTable)) {
throw new AnalysisException("UPSERT is only supported for Kudu tables");
}
} else {
analyzeTableForInsert(analyzer);
}
// Add target table to descriptor table.
analyzer.getDescTbl().setTargetTable(table_);
}
/**
* Performs INSERT-specific table analysis:
* - Partition clause is invalid for unpartitioned or HBase tables
* - Check INSERT privileges as well as write access to Hdfs paths
* - Overwrite is invalid for HBase and Kudu tables
*/
private void analyzeTableForInsert(Analyzer analyzer) throws AnalysisException {
boolean isHBaseTable = (table_ instanceof FeHBaseTable);
int numClusteringCols = isHBaseTable ? 0 : table_.getNumClusteringCols();
if (partitionKeyValues_ != null && numClusteringCols == 0) {
if (isHBaseTable) {
throw new AnalysisException("PARTITION clause is not valid for INSERT into " +
"HBase tables. '" + targetTableName_ + "' is an HBase table");
} else {
// Unpartitioned table, but INSERT has PARTITION clause
throw new AnalysisException("PARTITION clause is only valid for INSERT into " +
"partitioned table. '" + targetTableName_ + "' is not partitioned");
}
}
if (table_ instanceof FeFsTable) {
FeFsTable fsTable = (FeFsTable) table_;
StringBuilder error = new StringBuilder();
fsTable.parseSkipHeaderLineCount(error);
if (error.length() > 0) throw new AnalysisException(error.toString());
try {
if (!FileSystemUtil.isImpalaWritableFilesystem(fsTable.getLocation())) {
throw new AnalysisException(String.format("Unable to INSERT into target " +
"table (%s) because %s is not a supported filesystem.", targetTableName_,
fsTable.getLocation()));
}
} catch (IOException e) {
throw new AnalysisException(String.format("Unable to INSERT into target " +
"table (%s): %s.", targetTableName_, e.getMessage()), e);
}
for (int colIdx = 0; colIdx < numClusteringCols; ++colIdx) {
Column col = fsTable.getColumns().get(colIdx);
// Hive 1.x has a number of issues handling BOOLEAN partition columns (see HIVE-6590).
// Instead of working around the Hive bugs, INSERT is disabled for BOOLEAN
// partitions in Impala when built against Hive 1. HIVE-6590 is currently resolved,
// but not in Hive 2.3.2, the latest release as of 3/17/2018.
if (col.getType() == Type.BOOLEAN) {
throw new AnalysisException(String.format("INSERT into table with BOOLEAN " +
"partition column (%s) is not supported: %s", col.getName(),
targetTableName_));
}
}
}
if (table_ instanceof FeKuduTable) {
if (overwrite_) {
throw new AnalysisException("INSERT OVERWRITE not supported for Kudu tables.");
}
if (partitionKeyValues_ != null && !partitionKeyValues_.isEmpty()) {
throw new AnalysisException(
"Partition specifications are not supported for Kudu tables.");
}
}
if (isHBaseTable && overwrite_) {
throw new AnalysisException("HBase doesn't have a way to perform INSERT OVERWRITE");
}
}
private void analyzeWriteAccess() throws AnalysisException {
if (!(table_ instanceof FeFsTable)) return;
FeFsTable fsTable = (FeFsTable) table_;
FeFsTable.Utils.checkWriteAccess(fsTable,
hasStaticPartitionTarget() ? partitionKeyValues_ : null, "INSERT");
}
private boolean hasStaticPartitionTarget() {
if (partitionKeyValues_ == null) return false;
// If the partition target is fully static, then check for write access against
// the specific partition. Otherwise, check the whole table.
for (PartitionKeyValue pkv : partitionKeyValues_) {
if (pkv.isDynamic()) return false;
}
return true;
}
/**
* Checks that the column permutation + select list + static partition exprs + dynamic
* partition exprs collectively cover exactly all required columns in the target table,
* depending on the table type.
*/
private void checkColumnCoverage(List<Column> selectExprTargetColumns,
Set<String> mentionedColumnNames, int numSelectListExprs,
int numStaticPartitionExprs) throws AnalysisException {
// Check that all required cols are mentioned by the permutation and partition clauses
if (selectExprTargetColumns.size() + numStaticPartitionExprs !=
table_.getColumns().size()) {
// We've already ruled out too many columns in the permutation and partition clauses
// by checking that there are no duplicates and that every column mentioned actually
// exists. So all columns aren't mentioned in the query.
if (table_ instanceof FeKuduTable) {
checkRequiredKuduColumns(mentionedColumnNames);
} else if (table_ instanceof FeHBaseTable) {
checkRequiredHBaseColumns(mentionedColumnNames);
} else if (table_.getNumClusteringCols() > 0) {
checkRequiredPartitionedColumns(mentionedColumnNames);
}
}
// Expect the selectListExpr to have entries for every target column
if (selectExprTargetColumns.size() != numSelectListExprs) {
String comparator =
(selectExprTargetColumns.size() < numSelectListExprs) ? "fewer" : "more";
String partitionClause =
(partitionKeyValues_ == null) ? "returns" : "and PARTITION clause return";
// If there was no column permutation provided, the error is that the select-list
// has the wrong number of expressions compared to the number of columns in the
// table. If there was a column permutation, then the mismatch is between the
// select-list and the permutation itself.
if (columnPermutation_ == null) {
int totalColumnsMentioned = numSelectListExprs + numStaticPartitionExprs;
throw new AnalysisException(String.format(
"Target table '%s' has %s columns (%s) than the SELECT / VALUES clause %s" +
" (%s)", table_.getFullName(), comparator,
table_.getColumns().size(), partitionClause, totalColumnsMentioned));
} else {
String partitionPrefix =
(partitionKeyValues_ == null) ? "mentions" : "and PARTITION clause mention";
throw new AnalysisException(String.format(
"Column permutation %s %s columns (%s) than " +
"the SELECT / VALUES clause %s (%s)", partitionPrefix, comparator,
selectExprTargetColumns.size(), partitionClause, numSelectListExprs));
}
}
}
/**
* For a Kudu table, checks that all key columns are mentioned.
*/
private void checkRequiredKuduColumns(Set<String> mentionedColumnNames)
throws AnalysisException {
Preconditions.checkState(table_ instanceof FeKuduTable);
List<String> keyColumns = ((FeKuduTable) table_).getPrimaryKeyColumnNames();
List<String> missingKeyColumnNames = new ArrayList<>();
for (Column column : table_.getColumns()) {
if (!mentionedColumnNames.contains(column.getName())
&& keyColumns.contains(column.getName())) {
missingKeyColumnNames.add(column.getName());
}
}
if (!missingKeyColumnNames.isEmpty()) {
throw new AnalysisException(String.format(
"All primary key columns must be specified for %sing into Kudu tables. " +
"Missing columns are: %s", getOpName(),
Joiner.on(", ").join(missingKeyColumnNames)));
}
}
/**
* For an HBase table, checks that the row-key column is mentioned.
* HBase tables have a single row-key column which is always in position 0. It
* must be mentioned, since it is invalid to set it to NULL (which would
* otherwise happen by default).
*/
private void checkRequiredHBaseColumns(Set<String> mentionedColumnNames)
throws AnalysisException {
Preconditions.checkState(table_ instanceof FeHBaseTable);
Column column = table_.getColumns().get(0);
if (!mentionedColumnNames.contains(column.getName())) {
throw new AnalysisException("Row-key column '" + column.getName() +
"' must be explicitly mentioned in column permutation.");
}
}
/**
* For partitioned tables, checks that all partition columns are mentioned.
*/
private void checkRequiredPartitionedColumns(Set<String> mentionedColumnNames)
throws AnalysisException {
int numClusteringCols = table_.getNumClusteringCols();
List<String> missingPartitionColumnNames = new ArrayList<>();
for (Column column : table_.getColumns()) {
if (!mentionedColumnNames.contains(column.getName())
&& column.getPosition() < numClusteringCols) {
missingPartitionColumnNames.add(column.getName());
}
}
if (!missingPartitionColumnNames.isEmpty()) {
throw new AnalysisException(
"Not enough partition columns mentioned in query. Missing columns are: " +
Joiner.on(", ").join(missingPartitionColumnNames));
}
}
/**
* Performs four final parts of the analysis:
* 1. Checks type compatibility between all expressions and their targets
*
* 2. Populates partitionKeyExprs with type-compatible expressions, in Hive
* partition-column order, for all partition columns
*
* 3. Populates resultExprs_ with type-compatible expressions, in Hive column order,
* for all expressions in the select-list. Unmentioned columns are assigned NULL literal
* expressions, unless the target is a Kudu table.
*
* 4. Result exprs for key columns of Kudu tables are stored in primaryKeyExprs_.
*
* If necessary, adds casts to the expressions to make them compatible with the type of
* the corresponding column.
*
* @throws AnalysisException
* If an expression is not compatible with its target column
*/
private void prepareExpressions(List<Column> selectExprTargetColumns,
List<Expr> selectListExprs, FeTable tbl, Analyzer analyzer)
throws AnalysisException {
// Temporary lists of partition key exprs and names in an arbitrary order.
List<Expr> tmpPartitionKeyExprs = new ArrayList<>();
List<String> tmpPartitionKeyNames = new ArrayList<>();
int numClusteringCols = (tbl instanceof FeHBaseTable) ? 0
: tbl.getNumClusteringCols();
boolean isKuduTable = table_ instanceof FeKuduTable;
Set<String> kuduPartitionColumnNames = null;
if (isKuduTable) {
kuduPartitionColumnNames = getKuduPartitionColumnNames((FeKuduTable) table_);
}
UnionStmt unionStmt =
(queryStmt_ instanceof UnionStmt) ? (UnionStmt) queryStmt_ : null;
List<Expr> widestTypeExprList = null;
if (unionStmt != null && unionStmt.getWidestExprs() != null
&& unionStmt.getWidestExprs().size() > 0) {
widestTypeExprList = unionStmt.getWidestExprs();
}
// Check dynamic partition columns for type compatibility.
for (int i = 0; i < selectListExprs.size(); ++i) {
Column targetColumn = selectExprTargetColumns.get(i);
// widestTypeExpr is widest type expression for column i
Expr widestTypeExpr =
(widestTypeExprList != null) ? widestTypeExprList.get(i) : null;
Expr compatibleExpr = checkTypeCompatibility(targetTableName_.toString(),
targetColumn, selectListExprs.get(i), analyzer.getQueryOptions().isDecimal_v2(),
widestTypeExpr);
if (targetColumn.getPosition() < numClusteringCols) {
// This is a dynamic clustering column
tmpPartitionKeyExprs.add(compatibleExpr);
tmpPartitionKeyNames.add(targetColumn.getName());
} else if (isKuduTable) {
if (kuduPartitionColumnNames.contains(targetColumn.getName())) {
tmpPartitionKeyExprs.add(compatibleExpr);
tmpPartitionKeyNames.add(targetColumn.getName());
}
}
selectListExprs.set(i, compatibleExpr);
}
// Check static partition columns, dynamic entries in partitionKeyValues will already
// be in selectExprTargetColumns and therefore are ignored in this loop
if (partitionKeyValues_ != null) {
for (PartitionKeyValue pkv: partitionKeyValues_) {
if (pkv.isStatic()) {
// tableColumns is guaranteed to exist after the earlier analysis checks
Column tableColumn = table_.getColumn(pkv.getColName());
Expr compatibleExpr = checkTypeCompatibility(targetTableName_.toString(),
tableColumn, pkv.getLiteralValue(), analyzer.isDecimalV2(), null);
tmpPartitionKeyExprs.add(compatibleExpr);
tmpPartitionKeyNames.add(pkv.getColName());
}
}
}
// Reorder the partition key exprs and names to be consistent with the target table
// declaration, and store their column positions. We need those exprs in the original
// order to create the corresponding Hdfs folder structure correctly, or the indexes
// to construct rows to pass to the Kudu partitioning API.
for (int i = 0; i < table_.getColumns().size(); ++i) {
Column c = table_.getColumns().get(i);
for (int j = 0; j < tmpPartitionKeyNames.size(); ++j) {
if (c.getName().equals(tmpPartitionKeyNames.get(j))) {
partitionKeyExprs_.add(tmpPartitionKeyExprs.get(j));
partitionColPos_.add(i);
break;
}
}
}
Preconditions.checkState(
(isKuduTable && partitionKeyExprs_.size() == kuduPartitionColumnNames.size())
|| partitionKeyExprs_.size() == numClusteringCols);
// Make sure we have stats for partitionKeyExprs
for (Expr expr: partitionKeyExprs_) {
expr.analyze(analyzer);
}
// Finally, 'undo' the permutation so that the selectListExprs are in Hive column
// order, and add NULL expressions to all missing columns, unless this is an UPSERT.
List<Column> columns = table_.getColumnsInHiveOrder();
for (int col = 0; col < columns.size(); ++col) {
Column tblColumn = columns.get(col);
boolean matchFound = false;
for (int i = 0; i < selectListExprs.size(); ++i) {
if (selectExprTargetColumns.get(i).getName().equals(tblColumn.getName())) {
resultExprs_.add(selectListExprs.get(i));
if (isKuduTable) mentionedColumns_.add(col);
matchFound = true;
break;
}
}
// If no match is found, either the column is a clustering column with a static
// value, or it was unmentioned and therefore should have a NULL select-list
// expression if this is an INSERT and the target is not a Kudu table.
if (!matchFound) {
if (tblColumn.getPosition() >= numClusteringCols) {
if (isKuduTable) {
Preconditions.checkState(tblColumn instanceof KuduColumn);
KuduColumn kuduCol = (KuduColumn) tblColumn;
if (!kuduCol.hasDefaultValue() && !kuduCol.isNullable()) {
throw new AnalysisException("Missing values for column that is not " +
"nullable and has no default value " + kuduCol.getName());
}
} else {
// Unmentioned non-clustering columns get NULL literals with the appropriate
// target type because Parquet cannot handle NULL_TYPE (IMPALA-617).
resultExprs_.add(NullLiteral.create(tblColumn.getType()));
}
}
}
// Store exprs for Kudu key columns.
if (matchFound && isKuduTable) {
FeKuduTable kuduTable = (FeKuduTable) table_;
if (kuduTable.getPrimaryKeyColumnNames().contains(tblColumn.getName())) {
primaryKeyExprs_.add(Iterables.getLast(resultExprs_));
}
}
}
if (table_ instanceof FeKuduTable) {
Preconditions.checkState(!primaryKeyExprs_.isEmpty());
}
// TODO: Check that HBase row-key columns are not NULL? See IMPALA-406
if (needsGeneratedQueryStatement_) {
// Build a query statement that returns NULL for every column
List<SelectListItem> selectListItems = new ArrayList<>();
for(Expr e: resultExprs_) {
selectListItems.add(new SelectListItem(e, null));
}
SelectList selectList = new SelectList(selectListItems);
queryStmt_ = new SelectStmt(selectList, null, null, null, null, null, null);
queryStmt_.analyze(analyzer);
}
}
private static Set<String> getKuduPartitionColumnNames(FeKuduTable table) {
Set<String> ret = new HashSet<>();
for (KuduPartitionParam partitionParam : table.getPartitionBy()) {
ret.addAll(partitionParam.getColumnNames());
}
return ret;
}
/**
* Analyzes the 'sort.columns' table property if it is set, and populates
* sortColumns_ and sortExprs_. If there are errors during the analysis, this will throw
* an AnalysisException.
*/
private void analyzeSortColumns() throws AnalysisException {
if (!(table_ instanceof FeFsTable)) return;
Pair<List<Integer>, TSortingOrder> sortProperties =
AlterTableSetTblProperties.analyzeSortColumns(table_,
table_.getMetaStoreTable().getParameters());
sortColumns_ = sortProperties.first;
sortingOrder_ = sortProperties.second;
// Assign sortExprs_ based on sortColumns_.
for (Integer colIdx: sortColumns_) sortExprs_.add(resultExprs_.get(colIdx));
}
private void analyzePlanHints(Analyzer analyzer) throws AnalysisException {
// If there are no hints then early exit.
if (planHints_.isEmpty() &&
!(analyzer.getQueryOptions().isSetDefault_hints_insert_statement())) return;
if (table_ instanceof FeHBaseTable) {
if (!planHints_.isEmpty()) {
throw new AnalysisException(String.format("INSERT hints are only supported for " +
"inserting into Hdfs and Kudu tables: %s", getTargetTableName()));
}
// Insert hints are not supported for HBase table so ignore any default hints.
return;
}
// Set up the plan hints from query option DEFAULT_HINTS_INSERT_STATEMENT.
// Default hint is ignored, if the original statement had hints.
if (planHints_.isEmpty() &&
analyzer.getQueryOptions().isSetDefault_hints_insert_statement()) {
String defaultHints =
analyzer.getQueryOptions().getDefault_hints_insert_statement();
for (String hint: defaultHints.trim().split(":")) {
planHints_.add(new PlanHint(hint.trim()));
}
}
for (PlanHint hint: planHints_) {
if (hint.is("SHUFFLE")) {
hasShuffleHint_ = true;
analyzer.setHasPlanHints();
} else if (hint.is("NOSHUFFLE")) {
hasNoShuffleHint_ = true;
analyzer.setHasPlanHints();
} else if (hint.is("CLUSTERED")) {
hasClusteredHint_ = true;
analyzer.setHasPlanHints();
} else if (hint.is("NOCLUSTERED")) {
hasNoClusteredHint_ = true;
analyzer.setHasPlanHints();
} else {
analyzer.addWarning("INSERT hint not recognized: " + hint);
}
}
// Both flags may be false or one of them may be true, but not both.
if (hasShuffleHint_ && hasNoShuffleHint_) {
throw new AnalysisException("Conflicting INSERT hints: shuffle and noshuffle");
}
if (hasClusteredHint_ && hasNoClusteredHint_) {
throw new AnalysisException("Conflicting INSERT hints: clustered and noclustered");
}
}
@Override
public List<Expr> getResultExprs() { return resultExprs_; }
@Override
public void rewriteExprs(ExprRewriter rewriter) throws AnalysisException {
Preconditions.checkState(isAnalyzed());
queryStmt_.rewriteExprs(rewriter);
}
private String getOpName() { return isUpsert_ ? "UPSERT" : "INSERT"; }
public List<PlanHint> getPlanHints() { return planHints_; }
public TableName getTargetTableName() { return targetTableName_; }
public FeTable getTargetTable() { return table_; }
public void setTargetTable(FeTable table) { this.table_ = table; }
public void setWriteId(long writeId) { this.writeId_ = writeId; }
public boolean isOverwrite() { return overwrite_; }
public TSortingOrder getSortingOrder() { return sortingOrder_; }
/**
* Only valid after analysis
*/
public QueryStmt getQueryStmt() { return queryStmt_; }
public List<Expr> getPartitionKeyExprs() { return partitionKeyExprs_; }
public List<Integer> getPartitionColPos() { return partitionColPos_; }
public boolean hasShuffleHint() { return hasShuffleHint_; }
public boolean hasNoShuffleHint() { return hasNoShuffleHint_; }
public boolean hasClusteredHint() { return hasClusteredHint_; }
public boolean hasNoClusteredHint() { return hasNoClusteredHint_; }
public List<Expr> getPrimaryKeyExprs() { return primaryKeyExprs_; }
public List<Expr> getSortExprs() { return sortExprs_; }
public long getWriteId() { return writeId_; }
// Clustering is enabled by default. If the table has a 'sort.columns' property and the
// query has a 'noclustered' hint, we issue a warning during analysis and ignore the
// 'noclustered' hint.
public boolean requiresClustering() {
return !hasNoClusteredHint_ || !sortExprs_.isEmpty();
}
public List<String> getMentionedColumns() {
List<String> result = new ArrayList<>();
List<Column> columns = table_.getColumns();
for (Integer i: mentionedColumns_) result.add(columns.get(i).getName());
return result;
}
public DataSink createDataSink() {
// analyze() must have been called before.
Preconditions.checkState(table_ != null);
return TableSink.create(table_, isUpsert_ ? TableSink.Op.UPSERT : TableSink.Op.INSERT,
partitionKeyExprs_, resultExprs_, mentionedColumns_, overwrite_,
requiresClustering(), new Pair<>(sortColumns_, sortingOrder_), writeId_);
}
/**
* Substitutes the result expressions, the partition key expressions, and the primary
* key expressions with smap. Preserves the original types of those expressions during
* the substitution.
*/
public void substituteResultExprs(ExprSubstitutionMap smap, Analyzer analyzer) {
resultExprs_ = Expr.substituteList(resultExprs_, smap, analyzer, true);
partitionKeyExprs_ = Expr.substituteList(partitionKeyExprs_, smap, analyzer, true);
primaryKeyExprs_ = Expr.substituteList(primaryKeyExprs_, smap, analyzer, true);
sortExprs_ = Expr.substituteList(sortExprs_, smap, analyzer, true);
}
@Override
public String toSql(ToSqlOptions options) {
StringBuilder strBuilder = new StringBuilder();
if (withClause_ != null) strBuilder.append(withClause_.toSql(options) + " ");
strBuilder.append(getOpName());
if (!planHints_.isEmpty() && hintLoc_ == HintLocation.Start) {
strBuilder.append(" " + ToSqlUtils.getPlanHintsSql(options, getPlanHints()));
}
if (overwrite_) {
strBuilder.append(" OVERWRITE");
} else {
strBuilder.append(" INTO");
}
strBuilder.append(" TABLE " + originalTableName_);
if (columnPermutation_ != null) {
strBuilder.append("(");
String sep = "";
for (String col : columnPermutation_) {
strBuilder.append(sep);
strBuilder.append(ToSqlUtils.getIdentSql(col));
sep = ", ";
}
strBuilder.append(")");
}
if (partitionKeyValues_ != null) {
List<String> values = new ArrayList<>();
for (PartitionKeyValue pkv: partitionKeyValues_) {
values.add(ToSqlUtils.getIdentSql(pkv.getColName())
+ (pkv.getValue() != null ? ("=" + pkv.getValue().toSql(options)) : ""));
}
strBuilder.append(" PARTITION (" + Joiner.on(", ").join(values) + ")");
}
if (!planHints_.isEmpty() && hintLoc_ == HintLocation.End) {
strBuilder.append(" " + ToSqlUtils.getPlanHintsSql(options, getPlanHints()));
}
if (!needsGeneratedQueryStatement_) {
strBuilder.append(" " + queryStmt_.toSql(options));
}
return strBuilder.toString();
}
@Override
public void collectTableRefs(List<TableRef> tblRefs) {
if (withClause_ != null) {
for (View v: withClause_.getViews()) {
v.getQueryStmt().collectTableRefs(tblRefs);
}
}
tblRefs.add(new TableRef(targetTableName_.toPath(), null));
if (queryStmt_ != null) queryStmt_.collectTableRefs(tblRefs);
}
}