| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package org.apache.impala.analysis; |
| |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.apache.hadoop.hive.metastore.api.FieldSchema; |
| import org.apache.impala.authorization.Privilege; |
| import org.apache.impala.catalog.Column; |
| import org.apache.impala.catalog.FeCatalogUtils; |
| import org.apache.impala.catalog.FeFsPartition; |
| import org.apache.impala.catalog.FeFsTable; |
| import org.apache.impala.catalog.FeHBaseTable; |
| import org.apache.impala.catalog.FeTable; |
| import org.apache.impala.catalog.HdfsFileFormat; |
| import org.apache.impala.catalog.HdfsPartition.FileDescriptor; |
| import org.apache.impala.catalog.HdfsTable; |
| import org.apache.impala.catalog.PartitionStatsUtil; |
| import org.apache.impala.catalog.Type; |
| import org.apache.impala.common.AnalysisException; |
| import org.apache.impala.common.PrintUtils; |
| import org.apache.impala.common.RuntimeEnv; |
| import org.apache.impala.planner.HdfsScanNode; |
| import org.apache.impala.service.BackendConfig; |
| import org.apache.impala.service.CatalogOpExecutor; |
| import org.apache.impala.service.FrontendProfile; |
| import org.apache.impala.thrift.TComputeStatsParams; |
| import org.apache.impala.thrift.TErrorCode; |
| import org.apache.impala.thrift.TGetPartitionStatsResponse; |
| import org.apache.impala.thrift.TPartitionStats; |
| import org.apache.impala.thrift.TTableName; |
| import org.apache.impala.thrift.TUnit; |
| import org.apache.log4j.Logger; |
| |
| import com.google.common.base.Joiner; |
| import com.google.common.base.Preconditions; |
| import com.google.common.base.Stopwatch; |
| import com.google.common.base.Throwables; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Sets; |
| |
| /** |
| * Represents the following statements for statistics collection. Which statistics |
| * are computed and stored depends on the statement type (incremental or not), the |
| * clauses used (sampling, partition spec), as well as whether stats extrapolation |
| * is enabled or not. |
| * Stats extrapolation can be configured: |
| * - at the impalad level with --enable_stats_extrapolation |
| * - at the table level HdfsTable.TBL_PROP_ENABLE_STATS_EXTRAPOLATION |
| * |
| * 1. COMPUTE STATS <table> [(col_list)] [TABLESAMPLE SYSTEM(<perc>) [REPEATABLE(<seed>)]] |
| * - Stats extrapolation enabled: |
| * Computes and replaces the table-level row count and total file size, as well as all |
| * table-level column statistics. Existing partition-objects and their row count are |
| * not modified at all. The TABLESAMPLE clause can be used to limit the scanned data |
| * volume to a desired percentage. When sampling, the COMPUTE STATS queries directly |
| * produce extrapolated stats which are then stored in the HMS via the CatalogServer. |
| * We store extrapolated stats in the HMS so as not to confuse other engines like |
| * Hive/SparkSQL which may rely on the shared HMS fields representing to the whole |
| * table and not a sample. See {@link CatalogOpExecutor#getExtrapolatedStatsVal}. |
| * - Stats extrapolation disabled: |
| * Computes and replaces the table-level row count and total file size, the row counts |
| * for all partitions (if applicable), as well as all table-level column statistics. |
| * The TABLESAMPLE clause is not supported to simplify implementation and testing. In |
| * particular, we want to avoid implementing and testing code for updating all HMS |
| * partitions to set the extrapolated numRows statistic. Altering many partitions is |
| * expensive and so should be avoided in favor of enabling extrapolation. |
| * |
| * By default, statistics are computed for all columns. To control which columns are |
| * analyzed, a whitelist of columns names can be optionally specified. |
| * |
| * 2. COMPUTE INCREMENTAL STATS <table> [PARTITION <part_spec>] |
| * - Stats extrapolation enabled: |
| * Not supported for now to keep the logic/code simple. |
| * - Stats extrapolation disabled: |
| * Computes and replaces the table and partition-level row counts. Computes mergeable |
| * per-partition column statistics (HLL intermediate state) and stores them in the HMS. |
| * Computes and replaces the table-level column statistics by merging the |
| * partition-level column statistics. |
| * Instead of recomputing those statistics for all partitions, this command reuses |
| * existing statistics from partitions which already have incremental statistics. |
| * If a set of partitions is specified, then the incremental statistics for those |
| * partitions are recomputed (then merged into the table-level statistics). |
| * |
| * TODO: Allow more coarse (db) |
| * TODO: Compute stats on complex types. |
| */ |
| public class ComputeStatsStmt extends StatementBase { |
| private static final Logger LOG = Logger.getLogger(ComputeStatsStmt.class); |
| |
| private static String AVRO_SCHEMA_MSG_PREFIX = "Cannot COMPUTE STATS on Avro table " + |
| "'%s' because its column definitions do not match those in the Avro schema."; |
| private static String AVRO_SCHEMA_MSG_SUFFIX = "Please re-create the table with " + |
| "column definitions, e.g., using the result of 'SHOW CREATE TABLE'"; |
| |
| // Metrics collected when fetching incremental statistics from Catalogd. All metrics |
| // are per query. |
| private static final String STATS_FETCH_PREFIX = "StatsFetch"; |
| // Time (ms) needed to fetch all partitions stats from catalogd. |
| private static final String STATS_FETCH_TIME = STATS_FETCH_PREFIX + ".Time"; |
| // Number of compressed bytes received for all partitions. |
| private static final String STATS_FETCH_COMPRESSED_BYTES = |
| STATS_FETCH_PREFIX + ".CompressedBytes"; |
| // Number of partitions sent from Catalogd. |
| private static final String STATS_FETCH_TOTAL_PARTITIONS = |
| STATS_FETCH_PREFIX + ".TotalPartitions"; |
| // Number of partitions sent from Catalogd that include statistics. |
| private static final String STATS_FETCH_NUM_PARTITIONS_WITH_STATS = |
| STATS_FETCH_PREFIX + ".NumPartitionsWithStats"; |
| |
| // The maximum number of partitions that may be explicitly selected by filter |
| // predicates. Any query that selects more than this automatically drops back to a full |
| // incremental stats recomputation. |
| // TODO: We can probably do better than this, e.g. running several queries, each of |
| // which selects up to MAX_INCREMENTAL_PARTITIONS partitions. |
| private static final int MAX_INCREMENTAL_PARTITIONS = 1000; |
| |
| protected final TableName tableName_; |
| protected final TableSampleClause sampleParams_; |
| |
| // Set during analysis. |
| protected FeTable table_; |
| |
| // Effective sampling percent based on the total number of bytes in the files sample. |
| // Set to -1 for non-HDFS tables or if TABLESAMPLE was not specified. |
| // We run the regular COMPUTE STATS for 0.0 and 1.0 where sampling has no benefit. |
| protected double effectiveSamplePerc_ = -1; |
| |
| // Query for getting the per-partition row count and the total row count. |
| // Set during analysis. |
| protected String tableStatsQueryStr_; |
| |
| // Query for getting the per-column NDVs and number of NULLs. |
| // Set during analysis. |
| protected String columnStatsQueryStr_; |
| |
| // If true, stats will be gathered incrementally per-partition. |
| private boolean isIncremental_; |
| |
| // If true, expect the compute stats process to produce output for all partitions in the |
| // target table. In that case, 'expectedPartitions_' will be empty. The point of this |
| // flag is to optimize the case where all partitions are targeted. |
| // False for unpartitioned HDFS tables, non-HDFS tables or when stats extrapolation |
| // is enabled. |
| private boolean expectAllPartitions_; |
| |
| // The list of valid partition statistics that can be used in an incremental computation |
| // without themselves being recomputed. Populated in analyze(). |
| private final List<TPartitionStats> validPartStats_ = new ArrayList<>(); |
| |
| // For incremental computations, the list of partitions (identified by list of partition |
| // column values) that we expect to receive results for. Used to ensure that even empty |
| // partitions emit results. |
| // TODO: Consider using partition IDs (and adding them to the child queries with a |
| // PARTITION_ID() builtin) |
| private final List<List<String>> expectedPartitions_ = new ArrayList<>(); |
| |
| // If non-null, partitions that an incremental computation might apply to. Must be |
| // null if this is a non-incremental computation. |
| private PartitionSet partitionSet_; |
| |
| // If non-null, represents the user-specified list of columns for computing statistics. |
| // Not supported for incremental statistics. |
| private List<String> columnWhitelist_; |
| |
| // The set of columns to be analyzed. Each column is valid: it must exist in the table |
| // schema, it must be of a type that can be analyzed, and cannot refer to a partitioning |
| // column for HDFS tables. If the set is null, no columns are restricted. |
| private Set<Column> validatedColumnWhitelist_; |
| |
| /** |
| * Should only be constructed via static creation functions. |
| */ |
| private ComputeStatsStmt(TableName tableName, TableSampleClause sampleParams, |
| boolean isIncremental, PartitionSet partitionSet, List<String> columns) { |
| Preconditions.checkState(tableName != null && !tableName.isEmpty()); |
| Preconditions.checkState(isIncremental || partitionSet == null); |
| Preconditions.checkState(!isIncremental || sampleParams == null); |
| Preconditions.checkState(!isIncremental || columns == null); |
| tableName_ = tableName; |
| sampleParams_ = sampleParams; |
| table_ = null; |
| isIncremental_ = isIncremental; |
| partitionSet_ = partitionSet; |
| columnWhitelist_ = columns; |
| if (partitionSet_ != null) { |
| partitionSet_.setTableName(tableName); |
| partitionSet_.setPrivilegeRequirement(Privilege.ALTER); |
| } |
| } |
| |
| @Override |
| public void collectTableRefs(List<TableRef> tblRefs) { |
| tblRefs.add(new TableRef(tableName_.toPath(), null)); |
| } |
| |
| /** |
| * Returns a stmt for COMPUTE STATS. The optional 'sampleParams' indicates whether the |
| * stats should be computed with table sampling. |
| */ |
| public static ComputeStatsStmt createStatsStmt(TableName tableName, |
| TableSampleClause sampleParams, List<String> columns) { |
| return new ComputeStatsStmt(tableName, sampleParams, false, null, columns); |
| } |
| |
| /** |
| * Returns a stmt for COMPUTE INCREMENTAL STATS. The optional 'partitionSet' specifies a |
| * set of partitions whose stats should be computed. |
| */ |
| public static ComputeStatsStmt createIncrementalStatsStmt(TableName tableName, |
| PartitionSet partitionSet) { |
| return new ComputeStatsStmt(tableName, null, true, partitionSet, null); |
| } |
| |
| private List<String> getBaseColumnStatsQuerySelectList(Analyzer analyzer) { |
| List<String> columnStatsSelectList = new ArrayList<>(); |
| // For Hdfs tables, exclude partition columns from stats gathering because Hive |
| // cannot store them as part of the non-partition column stats. For HBase tables, |
| // include the single clustering column (the row key). |
| int startColIdx = (table_ instanceof FeHBaseTable) ? 0 : |
| table_.getNumClusteringCols(); |
| |
| for (int i = startColIdx; i < table_.getColumns().size(); ++i) { |
| Column c = table_.getColumns().get(i); |
| if (validatedColumnWhitelist_ != null && !validatedColumnWhitelist_.contains(c)) { |
| continue; |
| } |
| if (ignoreColumn(c)) continue; |
| |
| // NDV approximation function. Add explicit alias for later identification when |
| // updating the Metastore. |
| String colRefSql = ToSqlUtils.getIdentSql(c.getName()); |
| if (isIncremental_) { |
| columnStatsSelectList.add("NDV_NO_FINALIZE(" + colRefSql + ") AS " + colRefSql); |
| } else if (isSampling()) { |
| columnStatsSelectList.add(String.format("SAMPLED_NDV(%s, %.10f) AS %s", |
| colRefSql, effectiveSamplePerc_, colRefSql)); |
| } else { |
| // Regular (non-incremental) compute stats without sampling. |
| columnStatsSelectList.add("NDV(" + colRefSql + ") AS " + colRefSql); |
| } |
| |
| // Count the number of NULL values. |
| columnStatsSelectList.add("COUNT(CASE WHEN " + colRefSql + |
| " IS NULL THEN 1 ELSE NULL END)"); |
| |
| // For STRING columns also compute the max and avg string length. |
| Type type = c.getType(); |
| if (type.isStringType()) { |
| columnStatsSelectList.add("MAX(length(" + colRefSql + "))"); |
| columnStatsSelectList.add("AVG(length(" + colRefSql + "))"); |
| } else { |
| // For non-STRING columns we use the fixed size of the type. |
| // We store the same information for all types to avoid having to |
| // treat STRING columns specially in the BE CatalogOpExecutor. |
| Integer typeSize = type.getPrimitiveType().getSlotSize(); |
| columnStatsSelectList.add(typeSize.toString()); |
| columnStatsSelectList.add("CAST(" + typeSize.toString() + " as DOUBLE)"); |
| } |
| |
| if (isIncremental_) { |
| // Need the count in order to properly combine per-partition column stats |
| columnStatsSelectList.add("COUNT(" + colRefSql + ")"); |
| } |
| } |
| return columnStatsSelectList; |
| } |
| |
| /** |
| * Constructs two SQL queries for computing the row-count and column statistics and |
| * sets them in 'tableStatsQueryStr_' and 'columnStatsQueryStr_', respectively. |
| * The queries are generated as follows. |
| * |
| * 1. Regular COMPUTE STATS (not incremental and no sampling) |
| * 1.1 Row counts: |
| * SELECT COUNT(*) FROM tbl [GROUP BY part_col1, part_col2 ...] |
| * The GROUP BY clause is added if the target is a partitioned HDFS table and |
| * stats extrapolation is disabled. Otherwise, no GROUP BY is used. |
| * |
| * 1.2 Column stats: |
| * SELECT NDV(c1), CAST(-1 as typeof(c1)), MAX(length(c1)), AVG(length(c1)), |
| * NDV(c2), CAST(-1 as typeof(c2)), MAX(length(c2)), AVG(length(c2)), |
| * ... |
| * FROM tbl |
| * |
| * 2. COMPUTE STATS with TABLESAMPLE |
| * 2.1 Row counts: |
| * SELECT ROUND(COUNT(*) / <effective_sample_perc>) |
| * FROM tbl TABLESAMPLE SYSTEM(<sample_perc>) REPEATABLE (<random_seed>) |
| * |
| * 2.1 Column stats: |
| * SELECT SAMPLED_NDV(c1, p), CAST(-1 as typeof(c1)), MAX(length(c1)), AVG(length(c1)), |
| * SAMPLED_NDV(c2, p), CAST(-1 as typeof(c2)), MAX(length(c2)), AVG(length(c2)), |
| * ... |
| * FROM tbl TABLESAMPLE SYSTEM(<sample_perc>) REPEATABLE (<random_seed>) |
| * SAMPLED_NDV() is a specialized aggregation function that estimates the NDV based on |
| * a sample. The "p" passed to the SAMPLED_NDV() is the effective sampling rate. |
| * |
| * 3. COMPUTE INCREMENTAL STATS |
| * 3.1 Row counts: |
| * SELECT COUNT(*) FROM tbl GROUP BY part_col1, part_col2 ... |
| * [WHERE ((part_col1 = p1_val1) AND (part_col2 = p1_val2)) OR |
| * ((part_col1 = p2_val1) AND (part_col2 = p2_val2)) OR ...] |
| * The WHERE clause is constructed to select the relevant partitions. |
| * |
| * 3.2 Column stats: |
| * SELECT NDV_NO_FINALIZE(c1), <nulls, max, avg>, COUNT(c1), |
| * NDV_NO_FINALIZE(c2), <nulls, max, avg>, COUNT(c2), |
| * ... |
| * FROM tbl |
| * GROUP BY part_col1, part_col2, ... |
| * [WHERE ((part_col1 = p1_val1) AND (part_col2 = p1_val2)) OR |
| * ((part_col1 = p2_val1) AND (part_col2 = p2_val2)) OR ...] |
| * The WHERE clause is constructed to select the relevant partitions. |
| * NDV_NO_FINALIZE() produces a non-finalized HyperLogLog intermediate byte array. |
| * |
| * 4. For all COMPUTE STATS variants: |
| * - The MAX() and AVG() for the column stats queries are only relevant for var-len |
| * columns like STRING. For fixed-len columns MAX() and AVG() are replaced with the |
| * appropriate literals. |
| * - Queries will be set to null if we can detect that no work needs to be performed. |
| */ |
| @Override |
| public void analyze(Analyzer analyzer) throws AnalysisException { |
| // Resolve and analyze this table ref so we can evaluate partition predicates. |
| TableRef tableRef = new TableRef(tableName_.toPath(), null, Privilege.ALTER); |
| tableRef = analyzer.resolveTableRef(tableRef); |
| Preconditions.checkNotNull(tableRef); |
| tableRef.analyze(analyzer); |
| if (tableRef instanceof InlineViewRef) { |
| throw new AnalysisException(String.format( |
| "COMPUTE STATS not supported for view: %s", tableName_)); |
| } |
| if (tableRef instanceof CollectionTableRef) { |
| throw new AnalysisException(String.format( |
| "COMPUTE STATS not supported for nested collection: %s", tableName_)); |
| } |
| table_ = analyzer.getTable(tableName_, Privilege.ALTER, Privilege.SELECT); |
| // Adding the check here instead of tableRef.analyze because tableRef is |
| // used at multiple places and will even disallow select. |
| analyzer.ensureTableNotFullAcid(table_); |
| |
| if (!(table_ instanceof FeFsTable)) { |
| if (partitionSet_ != null) { |
| throw new AnalysisException("COMPUTE INCREMENTAL ... PARTITION not supported " + |
| "for non-HDFS table " + tableName_); |
| } |
| isIncremental_ = false; |
| } |
| |
| if (columnWhitelist_ != null) { |
| validatedColumnWhitelist_ = new HashSet<>(); |
| for (String colName : columnWhitelist_) { |
| Column col = table_.getColumn(colName); |
| if (col == null) { |
| throw new AnalysisException(colName + " not found in table: " + |
| table_.getName()); |
| } |
| if (table_ instanceof FeFsTable && table_.isClusteringColumn(col)) { |
| throw new AnalysisException("COMPUTE STATS not supported for partitioning " + |
| "column " + col.getName() + " of HDFS table."); |
| } |
| if (ignoreColumn(col)) { |
| throw new AnalysisException("COMPUTE STATS not supported for column " + |
| col.getName() + " of complex type:" + col.getType().toSql()); |
| } |
| validatedColumnWhitelist_.add(col); |
| } |
| } |
| |
| FeFsTable hdfsTable = null; |
| if (table_ instanceof FeFsTable) { |
| hdfsTable = (FeFsTable)table_; |
| if (hdfsTable.usesAvroSchemaOverride()) checkIncompleteAvroSchema(hdfsTable); |
| if (isIncremental_ && hdfsTable.getNumClusteringCols() == 0 && |
| partitionSet_ != null) { |
| throw new AnalysisException(String.format( |
| "Can't compute PARTITION stats on an unpartitioned table: %s", |
| tableName_)); |
| } else if (partitionSet_ != null) { |
| Preconditions.checkState(tableRef instanceof BaseTableRef); |
| partitionSet_.setPartitionShouldExist(); |
| partitionSet_.analyze(analyzer); |
| } |
| |
| // For incremental stats, estimate the size of intermediate stats and report an |
| // error if the estimate is greater than --inc_stats_size_limit_bytes in bytes |
| if (isIncremental_) { |
| long incStatMaxSize = BackendConfig.INSTANCE.getIncStatsMaxSize(); |
| long statsSizeEstimate = hdfsTable.getColumns().size() * |
| hdfsTable.getPartitions().size() * HdfsTable.STATS_SIZE_PER_COLUMN_BYTES; |
| if (statsSizeEstimate > incStatMaxSize) { |
| LOG.error("Incremental stats size estimate for table " + hdfsTable.getName() + |
| " exceeded " + incStatMaxSize + ", estimate = " |
| + statsSizeEstimate); |
| throw new AnalysisException("Incremental stats size estimate exceeds " |
| + PrintUtils.printBytes(incStatMaxSize) |
| + ". Please try COMPUTE STATS instead."); |
| } |
| } |
| } |
| |
| // Build partition filters that only select partitions without valid statistics for |
| // incremental computation. |
| List<String> filterPreds = new ArrayList<>(); |
| if (isIncremental_) { |
| if (partitionSet_ == null) { |
| // If any column does not have stats, we recompute statistics for all partitions |
| // TODO: need a better way to invalidate stats for all partitions, so that we can |
| // use this logic to only recompute new / changed columns. |
| boolean tableIsMissingColStats = false; |
| |
| // We'll warn the user if a column is missing stats (and therefore we rescan the |
| // whole table), but if all columns are missing stats, the table just doesn't |
| // have any stats and there's no need to warn. |
| boolean allColumnsMissingStats = true; |
| String exampleColumnMissingStats = null; |
| // Partition columns always have stats, so exclude them from this search |
| for (Column col: table_.getNonClusteringColumns()) { |
| if (ignoreColumn(col)) continue; |
| if (!col.getStats().hasStats()) { |
| if (!tableIsMissingColStats) { |
| tableIsMissingColStats = true; |
| exampleColumnMissingStats = col.getName(); |
| } |
| } else { |
| allColumnsMissingStats = false; |
| } |
| } |
| |
| if (tableIsMissingColStats && !allColumnsMissingStats) { |
| analyzer.addWarning("Column " + exampleColumnMissingStats + |
| " does not have statistics, recomputing stats for the whole table"); |
| } |
| |
| // Get incremental statistics from all relevant partitions. |
| Collection<? extends FeFsPartition> allPartitions = |
| FeCatalogUtils.loadAllPartitions(hdfsTable); |
| Map<Long, TPartitionStats> partitionStats = |
| getOrFetchPartitionStats(analyzer, hdfsTable, allPartitions, |
| /* excludedPartitions= */ Collections.<Long>emptySet()); |
| for (FeFsPartition p: allPartitions) { |
| TPartitionStats partStats = partitionStats.get(p.getId()); |
| if (partStats == null || tableIsMissingColStats) { |
| if (!tableIsMissingColStats) filterPreds.add(p.getConjunctSql()); |
| // TODO(vercegovac): check what happens when "NULL" is used as a partitioning |
| // value. |
| List<String> partValues = PartitionKeyValue.getPartitionKeyValueStringList( |
| p.getPartitionValues(), "NULL"); |
| expectedPartitions_.add(partValues); |
| } else { |
| validPartStats_.add(partStats); |
| } |
| } |
| if (expectedPartitions_.size() == hdfsTable.getPartitions().size()) { |
| expectedPartitions_.clear(); |
| expectAllPartitions_ = true; |
| } |
| } else { |
| // Always compute stats on a set of partitions when told to. |
| for (FeFsPartition targetPartition: partitionSet_.getPartitions()) { |
| filterPreds.add(targetPartition.getConjunctSql()); |
| List<String> partValues = PartitionKeyValue.getPartitionKeyValueStringList( |
| targetPartition.getPartitionValues(), "NULL"); |
| expectedPartitions_.add(partValues); |
| } |
| // Create a hash set out of partitionSet_ for O(1) lookups. |
| // TODO(todd) avoid loading all partitions. |
| Set<Long> targetPartitions = |
| Sets.newHashSetWithExpectedSize(partitionSet_.getPartitions().size()); |
| for (FeFsPartition p: partitionSet_.getPartitions()) { |
| targetPartitions.add(p.getId()); |
| } |
| // Get incremental statistics for partitions that are not recomputed. |
| Collection<? extends FeFsPartition> allPartitions = |
| FeCatalogUtils.loadAllPartitions(hdfsTable); |
| Map<Long, TPartitionStats> partitionStats = getOrFetchPartitionStats( |
| analyzer, hdfsTable, allPartitions, targetPartitions); |
| validPartStats_.addAll(partitionStats.values()); |
| } |
| if (filterPreds.size() == 0 && validPartStats_.size() != 0) { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("No partitions selected for incremental stats update"); |
| } |
| analyzer.addWarning("No partitions selected for incremental stats update"); |
| return; |
| } |
| } else { |
| // Not computing incremental stats. |
| expectAllPartitions_ = true; |
| if (table_ instanceof FeFsTable) { |
| expectAllPartitions_ = !FeFsTable.Utils.isStatsExtrapolationEnabled( |
| (FeFsTable) table_); |
| } |
| } |
| |
| if (filterPreds.size() > MAX_INCREMENTAL_PARTITIONS) { |
| // TODO: Consider simply running for MAX_INCREMENTAL_PARTITIONS partitions, and then |
| // advising the user to iterate. |
| analyzer.addWarning( |
| "Too many partitions selected, doing full recomputation of incremental stats"); |
| filterPreds.clear(); |
| validPartStats_.clear(); |
| } |
| |
| // Tablesample clause to be used for all child queries. |
| String tableSampleSql = analyzeTableSampleClause(analyzer); |
| |
| // Query for getting the per-partition row count and the total row count. |
| StringBuilder tableStatsQueryBuilder = new StringBuilder("SELECT "); |
| String countSql = "COUNT(*)"; |
| if (isSampling()) { |
| // Extrapolate the count based on the effective sampling rate. |
| countSql = String.format("ROUND(COUNT(*) / %.10f)", effectiveSamplePerc_); |
| } |
| List<String> tableStatsSelectList = Lists.newArrayList(countSql); |
| // Add group by columns for incremental stats or with extrapolation disabled. |
| List<String> groupByCols = new ArrayList<>(); |
| if (!updateTableStatsOnly()) { |
| for (Column partCol: hdfsTable.getClusteringColumns()) { |
| groupByCols.add(ToSqlUtils.getIdentSql(partCol.getName())); |
| } |
| tableStatsSelectList.addAll(groupByCols); |
| } |
| tableStatsQueryBuilder.append(Joiner.on(", ").join(tableStatsSelectList)); |
| tableStatsQueryBuilder.append(" FROM " + tableName_.toSql() + tableSampleSql); |
| |
| // Query for getting the per-column NDVs and number of NULLs. |
| List<String> columnStatsSelectList = getBaseColumnStatsQuerySelectList(analyzer); |
| |
| if (isIncremental_) columnStatsSelectList.addAll(groupByCols); |
| |
| StringBuilder columnStatsQueryBuilder = new StringBuilder("SELECT "); |
| columnStatsQueryBuilder.append(Joiner.on(", ").join(columnStatsSelectList)); |
| columnStatsQueryBuilder.append(" FROM " + tableName_.toSql() + tableSampleSql); |
| |
| // Add the WHERE clause to filter out partitions that we don't want to compute |
| // incremental stats for. While this is a win in most situations, we would like to |
| // avoid this where it does no useful work (i.e. it selects all rows). This happens |
| // when there are no existing valid partitions (so all partitions will have been |
| // selected in) and there is no partition spec (so no single partition was explicitly |
| // selected in). |
| if (filterPreds.size() > 0 && |
| (validPartStats_.size() > 0 || partitionSet_ != null)) { |
| String filterClause = " WHERE " + Joiner.on(" OR ").join(filterPreds); |
| columnStatsQueryBuilder.append(filterClause); |
| tableStatsQueryBuilder.append(filterClause); |
| } |
| |
| if (groupByCols.size() > 0) { |
| String groupBy = " GROUP BY " + Joiner.on(", ").join(groupByCols); |
| if (isIncremental_) columnStatsQueryBuilder.append(groupBy); |
| tableStatsQueryBuilder.append(groupBy); |
| } |
| |
| tableStatsQueryStr_ = tableStatsQueryBuilder.toString(); |
| if (LOG.isTraceEnabled()) LOG.trace("Table stats query: " + tableStatsQueryStr_); |
| if (columnStatsSelectList.isEmpty()) { |
| // Table doesn't have any columns that we can compute stats for. |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("No supported column types in table " + table_.getTableName() + |
| ", no column statistics will be gathered."); |
| } |
| columnStatsQueryStr_ = null; |
| return; |
| } |
| columnStatsQueryStr_ = columnStatsQueryBuilder.toString(); |
| if (LOG.isTraceEnabled()) LOG.trace("Column stats query: " + columnStatsQueryStr_); |
| } |
| |
| /** |
| * Get partition statistics from the list of partitions, omitting those in |
| * excludedPartitions and those for which incremental statistics are not present. |
| * |
| * If incremental stats data is not present already in the local catlaog, |
| * partition statistics are fetched from the catalog daemon. |
| */ |
| private static Map<Long, TPartitionStats> getOrFetchPartitionStats(Analyzer analyzer, |
| FeFsTable table, Collection<? extends FeFsPartition> partitions, |
| Set<Long> excludedPartitions) throws AnalysisException { |
| Preconditions.checkNotNull(partitions); |
| Preconditions.checkNotNull(excludedPartitions); |
| int expectedNumStats = partitions.size() - excludedPartitions.size(); |
| Preconditions.checkArgument(expectedNumStats >= 0); |
| |
| // Incremental stats are already present locally when using LocalCatalog |
| // or when testing. TODO(IMPALA-7535) fetch incremental stats separately |
| // for LocalCatlaog as well. |
| if (!BackendConfig.INSTANCE.getBackendCfg().use_local_catalog |
| && !RuntimeEnv.INSTANCE.isTestEnv()) { |
| // We're configured to fetch the statistics from catalogd, so collect the relevant |
| // partition ids. |
| List<FeFsPartition> partitionsToFetch = new ArrayList<>(); |
| for (FeFsPartition p: partitions) { |
| if (excludedPartitions.contains(p.getId())) continue; |
| partitionsToFetch.add(p); |
| } |
| // Gets the partition stats from catalogd. |
| return fetchPartitionStats(analyzer, table, partitionsToFetch); |
| } |
| // Get the statistics directly from the partition, if present. |
| Map<Long, TPartitionStats> ret = Maps.newHashMapWithExpectedSize(expectedNumStats); |
| for (FeFsPartition p: partitions) { |
| if (excludedPartitions.contains(p.getId())) continue; |
| if (!p.hasIncrementalStats()) continue; |
| TPartitionStats stats = p.getPartitionStats(); |
| Preconditions.checkNotNull(stats); |
| ret.put(p.getId(), stats); |
| } |
| return ret; |
| } |
| |
| /** |
| * Fetches statistics for the partitions specified from the target table directly |
| * from catalogd. The partition statistics that are returned are the ones where: |
| * - incremental statistics are present |
| * - the partition is whitelisted in 'partitions' |
| * - the partition is present in the local impalad catalog |
| * TODO(vercegovac): Look into parallelizing the fetch while child-queries are |
| * running. Easiest would be to move this fetch to the backend. |
| */ |
| private static Map<Long, TPartitionStats> fetchPartitionStats(Analyzer analyzer, |
| FeFsTable table, List<FeFsPartition> partitions) throws AnalysisException { |
| Preconditions.checkNotNull(partitions); |
| Preconditions.checkState(!RuntimeEnv.INSTANCE.isTestEnv()); |
| if (partitions.isEmpty()) return Collections.emptyMap(); |
| Stopwatch sw = new Stopwatch().start(); |
| int numCompressedBytes = 0; |
| int totalPartitions = 0; |
| int numPartitionsWithStats = 0; |
| try { |
| TGetPartitionStatsResponse response = |
| analyzer.getCatalog().getPartitionStats(table.getTableName()); |
| if (response.status.status_code != TErrorCode.OK) { |
| throw new AnalysisException( |
| "Error fetching partition statistics: " + response.status.toString()); |
| } |
| if (!response.isSetPartition_stats()) return Collections.emptyMap(); |
| |
| // The response from catalogd is from a version of the table that may be newer |
| // than the local, impalad catalog. As a result, the response might include |
| // partitions not present locally and might not include partitions that are |
| // present locally. After stats are computed, they are sent to catalogd to update |
| // the HMS and catalog state. The catalogd already handles the case where the list |
| // of partitions are out of sync (see CatalogOpExecutor#alterTableUpdateStats). |
| // As a result, at most those partitions in the intersection between remote and |
| // local catalogs are returned. |
| Map<Long, TPartitionStats> partitionStats = |
| Maps.newHashMapWithExpectedSize(partitions.size()); |
| totalPartitions = partitions.size(); |
| for (FeFsPartition part: partitions) { |
| ByteBuffer compressedStats = response.partition_stats.get( |
| FeCatalogUtils.getPartitionName(part)); |
| if (compressedStats != null) { |
| byte[] compressedStatsBytes = new byte[compressedStats.remaining()]; |
| numCompressedBytes += compressedStatsBytes.length; |
| compressedStats.get(compressedStatsBytes); |
| TPartitionStats remoteStats = |
| PartitionStatsUtil.partStatsFromCompressedBytes( |
| compressedStatsBytes, part); |
| if (remoteStats != null && remoteStats.isSetIntermediate_col_stats()) { |
| ++numPartitionsWithStats; |
| partitionStats.put(part.getId(), remoteStats); |
| } |
| } |
| } |
| return partitionStats; |
| } catch (Exception e) { |
| Throwables.propagateIfInstanceOf(e, AnalysisException.class); |
| throw new AnalysisException("Error fetching partition statistics", e); |
| } finally { |
| recordFetchMetrics(numCompressedBytes, totalPartitions, numPartitionsWithStats, sw); |
| } |
| } |
| |
| /** |
| * Adds metrics to the frontend profile when fetching incremental stats from catalogd. |
| */ |
| private static void recordFetchMetrics(int numCompressedBytes, |
| int totalPartitions, int numPartitionsWithStats, Stopwatch stopwatch) { |
| FrontendProfile profile = FrontendProfile.getCurrentOrNull(); |
| if (profile == null) return; |
| profile.addToCounter(STATS_FETCH_COMPRESSED_BYTES, TUnit.BYTES, numCompressedBytes); |
| profile.addToCounter(STATS_FETCH_TOTAL_PARTITIONS, TUnit.NONE, totalPartitions); |
| profile.addToCounter(STATS_FETCH_NUM_PARTITIONS_WITH_STATS, TUnit.NONE, |
| numPartitionsWithStats); |
| profile.addToCounter(STATS_FETCH_TIME, TUnit.TIME_MS, stopwatch.elapsedMillis()); |
| } |
| |
| /** |
| * Analyzes the TABLESAMPLE clause and computes the files sample to set |
| * 'effectiveSamplePerc_'. |
| * Returns the TABLESAMPLE SQL to be used for all child queries or an empty string if |
| * not sampling. If sampling, the returned SQL includes a fixed random seed so all |
| * child queries generate a consistent sample, even if the user did not originally |
| * specify REPEATABLE. |
| * Returns the empty string if this statement has no TABLESAMPLE clause or if |
| * the effective sampling rate is 0.0 or 1.0 (see isSampling()). |
| */ |
| private String analyzeTableSampleClause(Analyzer analyzer) throws AnalysisException { |
| if (sampleParams_ == null) return ""; |
| if (!(table_ instanceof FeFsTable)) { |
| throw new AnalysisException("TABLESAMPLE is only supported on HDFS tables."); |
| } |
| FeFsTable hdfsTable = (FeFsTable) table_; |
| if (!FeFsTable.Utils.isStatsExtrapolationEnabled(hdfsTable)) { |
| throw new AnalysisException(String.format( |
| "COMPUTE STATS TABLESAMPLE requires stats extrapolation which is disabled.\n" + |
| "Stats extrapolation can be enabled service-wide with %s=true or by altering " + |
| "the table to have tblproperty %s=true", |
| "--enable_stats_extrapolation", |
| HdfsTable.TBL_PROP_ENABLE_STATS_EXTRAPOLATION)); |
| } |
| sampleParams_.analyze(analyzer); |
| long sampleSeed; |
| if (sampleParams_.hasRandomSeed()) { |
| sampleSeed = sampleParams_.getRandomSeed(); |
| } else { |
| sampleSeed = System.currentTimeMillis(); |
| } |
| |
| // Compute the sample of files and set 'sampleFileBytes_'. |
| long minSampleBytes = analyzer.getQueryOptions().compute_stats_min_sample_size; |
| long samplePerc = sampleParams_.getPercentBytes(); |
| // TODO(todd): can we avoid loading all the partitions for this? |
| Collection<? extends FeFsPartition> partitions = |
| FeCatalogUtils.loadAllPartitions(hdfsTable); |
| Map<HdfsScanNode.SampledPartitionMetadata, List<FileDescriptor>> sample = |
| FeFsTable.Utils.getFilesSample(hdfsTable, |
| partitions, samplePerc, minSampleBytes, sampleSeed); |
| long sampleFileBytes = 0; |
| for (List<FileDescriptor> fds: sample.values()) { |
| for (FileDescriptor fd: fds) sampleFileBytes += fd.getFileLength(); |
| } |
| |
| // Compute effective sampling percent. |
| long totalFileBytes = ((FeFsTable)table_).getTotalHdfsBytes(); |
| if (totalFileBytes > 0) { |
| effectiveSamplePerc_ = (double) sampleFileBytes / (double) totalFileBytes; |
| } else { |
| effectiveSamplePerc_ = 0; |
| } |
| Preconditions.checkState(effectiveSamplePerc_ >= 0.0 && effectiveSamplePerc_ <= 1.0); |
| |
| // Warn if we will ignore TABLESAMPLE and run the regular COMPUTE STATS. |
| if (effectiveSamplePerc_ == 1.0) { |
| Preconditions.checkState(!isSampling()); |
| analyzer.addWarning(String.format( |
| "Ignoring TABLESAMPLE because the effective sampling rate is 100%%.\n" + |
| "The minimum sample size is COMPUTE_STATS_MIN_SAMPLE_SIZE=%s " + |
| "and the table size %s", |
| PrintUtils.printBytes(minSampleBytes), PrintUtils.printBytes(totalFileBytes))); |
| } |
| if (!isSampling()) return ""; |
| |
| return " " + sampleParams_.toSql(sampleSeed); |
| } |
| |
| /** |
| * Checks whether the column definitions from the CREATE TABLE stmt match the columns |
| * in the Avro schema. If there is a mismatch, then COMPUTE STATS cannot update the |
| * statistics in the Metastore's backend DB due to HIVE-6308. Throws an |
| * AnalysisException for such ill-created Avro tables. Does nothing if |
| * the column definitions match the Avro schema exactly. |
| */ |
| private void checkIncompleteAvroSchema(FeFsTable table) throws AnalysisException { |
| Preconditions.checkState(table.usesAvroSchemaOverride()); |
| org.apache.hadoop.hive.metastore.api.Table msTable = table.getMetaStoreTable(); |
| // The column definitions from 'CREATE TABLE (column definitions) ...' |
| Iterator<FieldSchema> colDefs = msTable.getSd().getCols().iterator(); |
| // The columns derived from the Avro schema file or literal schema. |
| // Inconsistencies between the Avro-schema columns and the column definitions |
| // are sometimes resolved in the CREATE TABLE, and sometimes not (see below). |
| Iterator<Column> avroSchemaCols = table.getColumns().iterator(); |
| // Skip partition columns from 'table' since those are not present in |
| // the msTable field schemas. |
| for (int i = 0; i < table.getNumClusteringCols(); ++i) { |
| if (avroSchemaCols.hasNext()) avroSchemaCols.next(); |
| } |
| int pos = 0; |
| while (colDefs.hasNext() || avroSchemaCols.hasNext()) { |
| if (colDefs.hasNext() && avroSchemaCols.hasNext()) { |
| FieldSchema colDef = colDefs.next(); |
| Column avroSchemaCol = avroSchemaCols.next(); |
| // Check that the column names are identical. Ignore mismatched types |
| // as those will either fail in the scan or succeed. |
| if (!colDef.getName().equalsIgnoreCase(avroSchemaCol.getName())) { |
| throw new AnalysisException( |
| String.format(AVRO_SCHEMA_MSG_PREFIX + |
| "\nDefinition of column '%s' of type '%s' does not match " + |
| "the Avro-schema column '%s' of type '%s' at position '%s'.\n" + |
| AVRO_SCHEMA_MSG_SUFFIX, |
| table.getName(), colDef.getName(), colDef.getType(), |
| avroSchemaCol.getName(), avroSchemaCol.getType(), pos)); |
| } |
| } |
| // The following two cases are typically not possible because Hive resolves |
| // inconsistencies between the column-definition list and the Avro schema if a |
| // column-definition list was given in the CREATE TABLE (having no column |
| // definitions at all results in HIVE-6308). Even so, we check these cases for |
| // extra safety. COMPUTE STATS could be made to succeed in special instances of |
| // the cases below but we chose to throw an AnalysisException to avoid confusion |
| // because this scenario "should" never arise as mentioned above. |
| if (colDefs.hasNext() && !avroSchemaCols.hasNext()) { |
| FieldSchema colDef = colDefs.next(); |
| throw new AnalysisException( |
| String.format(AVRO_SCHEMA_MSG_PREFIX + |
| "\nMissing Avro-schema column corresponding to column " + |
| "definition '%s' of type '%s' at position '%s'.\n" + |
| AVRO_SCHEMA_MSG_SUFFIX, |
| table.getName(), colDef.getName(), colDef.getType(), pos)); |
| } |
| if (!colDefs.hasNext() && avroSchemaCols.hasNext()) { |
| Column avroSchemaCol = avroSchemaCols.next(); |
| throw new AnalysisException( |
| String.format(AVRO_SCHEMA_MSG_PREFIX + |
| "\nMissing column definition corresponding to Avro-schema " + |
| "column '%s' of type '%s' at position '%s'.\n" + |
| AVRO_SCHEMA_MSG_SUFFIX, |
| table.getName(), avroSchemaCol.getName(), avroSchemaCol.getType(), pos)); |
| } |
| ++pos; |
| } |
| } |
| |
| /** |
| * Returns true if we are only updating statistics at the table level and not at |
| * the partition level. |
| */ |
| private boolean updateTableStatsOnly() { |
| if (!(table_ instanceof FeFsTable)) return true; |
| return !isIncremental_ && FeFsTable.Utils.isStatsExtrapolationEnabled( |
| (FeFsTable) table_); |
| } |
| |
| /** |
| * Returns true if this COMPUTE STATS statement should perform sampling. |
| * Returns false if TABLESAMPLE was not specified (effectiveSamplePerc_ == -1) |
| * or if the effective sampling percent is 0% or 100% where sampling has no benefit. |
| */ |
| private boolean isSampling() { |
| Preconditions.checkState(effectiveSamplePerc_ == -1 |
| || effectiveSamplePerc_ >= 0.0 || effectiveSamplePerc_ <= 1.0); |
| return effectiveSamplePerc_ > 0.0 && effectiveSamplePerc_ < 1.0; |
| } |
| |
| /** |
| * Returns true if the given column should be ignored for the purpose of computing |
| * column stats. Columns with an invalid/unsupported/complex type are ignored. |
| * For example, complex types in an HBase-backed table will appear as invalid types. |
| */ |
| private boolean ignoreColumn(Column c) { |
| Type t = c.getType(); |
| return !t.isValid() || !t.isSupported() || t.isComplexType(); |
| } |
| |
| public double getEffectiveSamplingPerc() { return effectiveSamplePerc_; } |
| |
| /** |
| * For testing. |
| */ |
| public String getTblStatsQuery() { return tableStatsQueryStr_; } |
| public String getColStatsQuery() { return columnStatsQueryStr_; } |
| public Set<Column> getValidatedColumnWhitelist() { return validatedColumnWhitelist_; } |
| |
| /** |
| * Returns true if this statement computes stats on Parquet/ORC partitions only, |
| * false otherwise. |
| */ |
| public boolean isColumnar() { |
| if (!(table_ instanceof FeFsTable)) return false; |
| Collection<? extends FeFsPartition> affectedPartitions = null; |
| if (partitionSet_ != null) { |
| affectedPartitions = partitionSet_.getPartitions(); |
| } else { |
| FeFsTable hdfsTable = (FeFsTable)table_; |
| affectedPartitions = FeCatalogUtils.loadAllPartitions(hdfsTable); |
| } |
| for (FeFsPartition partition: affectedPartitions) { |
| if (partition.getFileFormat() != HdfsFileFormat.PARQUET |
| && partition.getFileFormat() != HdfsFileFormat.ORC) |
| return false; |
| } |
| return true; |
| } |
| |
| @Override |
| public String toSql(ToSqlOptions options) { |
| if (!isIncremental_) { |
| StringBuilder columnList = new StringBuilder(); |
| if (columnWhitelist_ != null) { |
| columnList.append("("); |
| columnList.append(Joiner.on(", ").join(columnWhitelist_)); |
| columnList.append(")"); |
| } |
| String tblsmpl = ""; |
| if (sampleParams_ != null) tblsmpl = " " + sampleParams_.toSql(options); |
| return "COMPUTE STATS " + tableName_.toSql() + columnList.toString() + tblsmpl; |
| } else { |
| return "COMPUTE INCREMENTAL STATS " + tableName_.toSql() |
| + (partitionSet_ == null ? "" : partitionSet_.toSql(options)); |
| } |
| } |
| |
| public TComputeStatsParams toThrift() { |
| TComputeStatsParams params = new TComputeStatsParams(); |
| params.setTable_name(new TTableName(table_.getDb().getName(), table_.getName())); |
| params.setTbl_stats_query(tableStatsQueryStr_); |
| if (columnStatsQueryStr_ != null) { |
| params.setCol_stats_query(columnStatsQueryStr_); |
| } else { |
| params.setCol_stats_queryIsSet(false); |
| } |
| params.setIs_incremental(isIncremental_); |
| params.setExisting_part_stats(validPartStats_); |
| params.setExpect_all_partitions(expectAllPartitions_); |
| if (!expectAllPartitions_) params.setExpected_partitions(expectedPartitions_); |
| if (isIncremental_) { |
| params.setNum_partition_cols(((FeFsTable)table_).getNumClusteringCols()); |
| } |
| if (table_ instanceof FeFsTable) { |
| params.setTotal_file_bytes(((FeFsTable)table_).getTotalHdfsBytes()); |
| } |
| return params; |
| } |
| } |