IMPALA-11431: Avoid getting stats for synthetic column row__id from HMS

Before this change Impala always tried to fetch stats for row__id in
full ACID tables, which does not exist in the metastore. Sometimes this
led to an exception from HMS, see HIVE-28498 for details.

This caused flakyness in test_compute_stats_with_structs. The test also
had side effects (it computed stats for a shared table) so it was
modified to use unique_database.

Testing:
- could reproduce the issue by starting HMS with
  hive.metastore.try.direct.sql=true and verified that the change
  fixes it

Change-Id: I759f57c99aa16e4ab5fd82aa5f6b756446291f03
Reviewed-on: http://gerrit.cloudera.org:8080/21742
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/fe/src/main/java/org/apache/impala/catalog/CtasTargetTable.java b/fe/src/main/java/org/apache/impala/catalog/CtasTargetTable.java
index 7682833..189242f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CtasTargetTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CtasTargetTable.java
@@ -106,12 +106,7 @@
   @Override
   public List<Column> getColumnsInHiveOrder() {
     List<Column> columns = Lists.newArrayList(getNonClusteringColumns());
-    if (getMetaStoreTable() != null &&
-        AcidUtils.isFullAcidTable(getMetaStoreTable().getParameters())) {
-      // Remove synthetic "row__id" column.
-      Preconditions.checkState(columns.get(0).getName().equals("row__id"));
-      columns.remove(0);
-    }
+    columns = filterColumnsNotStoredInHms(columns);
     columns.addAll(getClusteringColumns());
     return Collections.unmodifiableList(columns);
   }
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeTable.java b/fe/src/main/java/org/apache/impala/catalog/FeTable.java
index e50af06..3453e49 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeTable.java
@@ -20,6 +20,7 @@
 import java.util.Comparator;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.metastore.api.Table;
@@ -28,6 +29,7 @@
 import org.apache.impala.thrift.TImpalaTableType;
 import org.apache.impala.thrift.TTableDescriptor;
 import org.apache.impala.thrift.TTableStats;
+import org.apache.impala.util.AcidUtils;
 
 /**
  * Frontend interface for interacting with a table.
@@ -115,7 +117,7 @@
   /**
    * @return an unmodifiable list of all columns, but with partition columns at the end of
    * the list rather than the beginning. This is equivalent to the order in
-   * which Hive enumerates columns.
+   * which Hive enumerates columns. Removes columns that are not in the HMS schema.
    */
   List<Column> getColumnsInHiveOrder();
 
@@ -139,6 +141,19 @@
    */
   List<Column> getNonClusteringColumns();
 
+  /**
+   * Filter columns not stored in HMS (currently row__id in full ACID tables).
+   */
+  default List<Column> filterColumnsNotStoredInHms(List<Column> columns) {
+    Table tbl = getMetaStoreTable();
+    boolean isFullAcid = tbl != null && AcidUtils.isFullAcidTable(tbl.getParameters());
+    if (!isFullAcid) return columns;
+    // Filter out row__id as it doesn't exist in HMS.
+    return columns.stream()
+        .filter(c -> !c.getName().equals("row__id"))
+        .collect(Collectors.toList());
+  }
+
   int getNumClusteringCols();
 
   boolean isClusteringColumn(Column c);
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 734276f..9e6a204 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -2071,9 +2071,11 @@
 
   @Override
   protected List<String> getColumnNamesWithHmsStats() {
+    List<Column> columns = Lists.newArrayList(getNonClusteringColumns());
+    columns = filterColumnsNotStoredInHms(columns);
     List<String> ret = new ArrayList<>();
     // Only non-partition columns have column stats in the HMS.
-    for (Column column: getColumns().subList(numClusteringCols_, getColumns().size())) {
+    for (Column column: columns) {
       ret.add(column.getName().toLowerCase());
     }
     return ret;
diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
index e141dae..a1aec18 100644
--- a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
@@ -180,7 +180,9 @@
    * Returns the columns in the order they have been created
    */
   @Override
-  public List<Column> getColumnsInHiveOrder() { return getColumns(); }
+  public List<Column> getColumnsInHiveOrder() {
+    return filterColumnsNotStoredInHms(getColumns());
+  }
 
   public static boolean isKuduStorageHandler(String handler) {
     return handler != null && (handler.equals(KUDU_LEGACY_STORAGE_HANDLER) ||
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index ac7befc..90f3065 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -23,6 +23,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -479,9 +480,10 @@
   // stats. This method allows each table type to volunteer the set of columns we should
   // ask the metastore for in loadAllColumnStats().
   protected List<String> getColumnNamesWithHmsStats() {
-    List<String> ret = new ArrayList<>();
-    for (String name: colsByName_.keySet()) ret.add(name);
-    return ret;
+    List<Column> columns = filterColumnsNotStoredInHms(getColumns());
+    return columns.stream()
+        .map(col->col.getName().toLowerCase())
+        .collect(Collectors.toList());
   }
 
   /**
@@ -498,15 +500,18 @@
       List<ColumnStatisticsObj> colStats;
 
       // We need to only query those columns which may have stats; asking HMS for other
-      // columns causes loadAllColumnStats() to return nothing.
-      // TODO(todd): this no longer seems to be true - asking for a non-existent column
-      // is just ignored, and the columns that do exist are returned.
+      // columns can cause get_table_statistics_req to throw an exception.
+      // The exact HMS behavior depends on hive.metastore.try.direct.sql, see HIVE-28498.
       List<String> colNames = getColumnNamesWithHmsStats();
 
       try {
         colStats = MetastoreShim.getTableColumnStatistics(client, db_.getName(), name_,
             colNames);
       } catch (Exception e) {
+        // TODO: Catching all exceptions and ignoring it makes it harder to detect errors.
+        //       At least during testing it would be good to treat this as error, but
+        //       it is hard to report this well if the function is called during
+        //       HMS event processing.
         LOG.warn("Could not load column statistics for: " + getFullName(), e);
         return;
       }
@@ -904,12 +909,7 @@
   @Override // FeTable
   public List<Column> getColumnsInHiveOrder() {
     List<Column> columns = Lists.newArrayList(getNonClusteringColumns());
-    if (getMetaStoreTable() != null &&
-        AcidUtils.isFullAcidTable(getMetaStoreTable().getParameters())) {
-      // Remove synthetic "row__id" column.
-      Preconditions.checkState(columns.get(0).getName().equals("row__id"));
-      columns.remove(0);
-    }
+    columns = filterColumnsNotStoredInHms(columns);
     columns.addAll(getClusteringColumns());
     return Collections.unmodifiableList(columns);
   }
diff --git a/fe/src/main/java/org/apache/impala/catalog/VirtualTable.java b/fe/src/main/java/org/apache/impala/catalog/VirtualTable.java
index 1df7632..32ae98e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/VirtualTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/VirtualTable.java
@@ -110,12 +110,7 @@
   @Override
   public List<Column> getColumnsInHiveOrder() {
     List<Column> columns = Lists.newArrayList(getNonClusteringColumns());
-    if (getMetaStoreTable() != null &&
-        AcidUtils.isFullAcidTable(getMetaStoreTable().getParameters())) {
-      // Remove synthetic "row__id" column.
-      Preconditions.checkState(columns.get(0).getName().equals("row__id"));
-      columns.remove(0);
-    }
+    columns = filterColumnsNotStoredInHms(columns);
     columns.addAll(getClusteringColumns());
     return Collections.unmodifiableList(columns);
   }
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
index a469a4a..91bfaa4 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
@@ -286,8 +286,9 @@
   @Override
   public List<Column> getColumnsInHiveOrder() {
     List<Column> columns = Lists.newArrayList(getNonClusteringColumns());
+    columns = filterColumnsNotStoredInHms(columns);
     columns.addAll(getClusteringColumns());
-    return columns;
+    return Collections.unmodifiableList(columns);
   }
 
   @Override
diff --git a/testdata/workloads/functional-query/queries/QueryTest/compute-stats-with-structs.test b/testdata/workloads/functional-query/queries/QueryTest/compute-stats-with-structs.test
index 1a5390e..461e4ac 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/compute-stats-with-structs.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/compute-stats-with-structs.test
@@ -1,5 +1,6 @@
 ====
 ---- QUERY
+INVALIDATE METADATA complextypes_structs;
 COMPUTE STATS complextypes_structs
 ---- RESULTS
 'Updated 1 partition(s) and 2 column(s).'
@@ -19,6 +20,7 @@
 STRING,STRING,BIGINT,BIGINT,BIGINT,DOUBLE,BIGINT,BIGINT
 ====
 ---- QUERY
+INVALIDATE METADATA complextypes_nested_structs;
 COMPUTE STATS complextypes_nested_structs
 ---- RESULTS
 'Updated 1 partition(s) and 1 column(s).'
diff --git a/tests/query_test/test_nested_types.py b/tests/query_test/test_nested_types.py
index 16361d5..0d4f5ba 100644
--- a/tests/query_test/test_nested_types.py
+++ b/tests/query_test/test_nested_types.py
@@ -232,7 +232,7 @@
         v.get_value('table_format').file_format in ['parquet', 'orc'])
     cls.ImpalaTestMatrix.add_constraint(orc_schema_resolution_constraint)
 
-  def test_mixed_complex_types_in_select_list(self, vector, unique_database):
+  def test_mixed_complex_types_in_select_list(self, vector):
     """Queries where structs and collections are embedded into one another."""
     self.run_test_case('QueryTest/mixed-collections-and-structs', vector)
 
@@ -252,9 +252,22 @@
         v.get_value('table_format').file_format in ['parquet', 'orc'])
     cls.ImpalaTestMatrix.add_constraint(orc_schema_resolution_constraint)
 
-  def test_compute_stats_with_structs(self, vector):
+  @SkipIfFS.hive
+  def test_compute_stats_with_structs(self, vector, unique_database):
     """COMPUTE STATS and SHOW COLUMN STATS for tables with structs"""
-    self.run_test_case('QueryTest/compute-stats-with-structs', vector)
+    file_format = vector.get_value('table_format').file_format
+    src_db = ImpalaTestSuite.get_db_name_from_format(vector.get_value('table_format'))
+    tbl_spec = "STORED AS " + file_format
+    if file_format == "orc":
+      # Create a full ACID copy for the ORC table to cover IMPALA-11431.
+      tbl_spec += " TBLPROPERTIES('transactional'='true')"
+    ctas = "CREATE TABLE {0}.{1} {2} AS SELECT * FROM {3}.{1}"
+    ctas1 = ctas.format(unique_database, "complextypes_structs", tbl_spec, src_db)
+    ctas2 = ctas.format(unique_database, "complextypes_nested_structs", tbl_spec, src_db)
+    self.run_stmt_in_hive(ctas1)
+    self.run_stmt_in_hive(ctas2)
+    self.run_test_case('QueryTest/compute-stats-with-structs', vector,
+                       use_db=unique_database)
 
 
 class TestZippingUnnest(ImpalaTestSuite):