DRILL-7643: Fix issues with using columns with the same name as a reserved keyword

closes #2028
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/AnalyzeColumnUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/AnalyzeColumnUtils.java
index feeeab7..1f97399 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/AnalyzeColumnUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/AnalyzeColumnUtils.java
@@ -51,7 +51,7 @@
    * Returns actual column name obtained form intermediate name which includes statistics kind and other analyze-specific info.
    * <p>
    * Example: column which corresponds to max statistics value for {@code `o_shippriority`} column is {@code column$maxValue$`o_shippriority`}.
-   * This method will return actual column name: {@code `o_shippriority`}.
+   * This method will return escaped actual column name: {@code `o_shippriority`}.
    *
    * @param fullName the source of actual column name
    * @return actual column name
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/FileMetadataInfoCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/FileMetadataInfoCollector.java
index d9765d6..cc156f0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/FileMetadataInfoCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/FileMetadataInfoCollector.java
@@ -128,6 +128,7 @@
     List<SchemaPath> metastoreInterestingColumns =
         Optional.ofNullable(basicRequests.interestingColumnsAndPartitionKeys(tableInfo).interestingColumns())
             .map(metastoreInterestingColumnNames -> metastoreInterestingColumnNames.stream()
+                // interesting column names are escaped, so SchemaPath.parseFromString() should be used here
                 .map(SchemaPath::parseFromString)
                 .collect(Collectors.toList()))
         .orElse(null);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java
index 7b6a19a..a2607a3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java
@@ -148,7 +148,11 @@
     if (columnList != null) {
       for (SqlNode column : columnList.getList()) {
         // Add only the root segment. Collect metadata for all the columns under that root segment
-        columnSet.add(SchemaPath.getSimplePath(SchemaPath.parseFromString(column.toString()).getRootSegmentPath()));
+        columnSet.add(
+            SchemaPath.getSimplePath(
+                SchemaPath.parseFromString(
+                        column.toSqlString(null, true).getSql())
+                    .getRootSegmentPath()));
       }
     }
     return columnSet;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlMetastoreAnalyzeTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlMetastoreAnalyzeTable.java
index a2bf8a9..a9ab526 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlMetastoreAnalyzeTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlMetastoreAnalyzeTable.java
@@ -127,8 +127,7 @@
     }
 
     return fieldList.getList().stream()
-        .map(SqlNode::toString)
-        .map(SchemaPath::parseFromString)
+        .map(sqlNode -> SchemaPath.parseFromString(sqlNode.toSqlString(null, true).getSql()))
         .collect(Collectors.toList());
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/StatisticsCollectorImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/StatisticsCollectorImpl.java
index f2dcac9..96ef51b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/StatisticsCollectorImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/StatisticsCollectorImpl.java
@@ -240,6 +240,7 @@
       }
       switch (nextField) {
         case Statistic.COLNAME:
+          // column name is escaped, so SchemaPath.parseFromString() should be used here
           ((DrillStatsTable.ColumnStatistics_v1) columnStatistics).setName(SchemaPath.parseFromString(reader.readText().toString()));
           break;
         case Statistic.COLTYPE:
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordCollector.java
index a672d19..da4f474 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordCollector.java
@@ -23,6 +23,7 @@
 import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ExecConstants;
@@ -318,7 +319,7 @@
      * @param schemaPath schema name
      * @param table table instance
      * @param schema table or column schema
-     * @param parentColumnName parent column name if any
+     * @param parentColumnNames list of parent column names if any
      * @param columnIndex column index if any
      * @param isNested indicates if column is nested
      * @return list of column records
@@ -326,28 +327,31 @@
     private List<Records.Column> columns(String schemaPath,
                                          BaseTableMetadata table,
                                          TupleMetadata schema,
-                                         String parentColumnName,
+                                         List<String> parentColumnNames,
                                          int columnIndex,
                                          boolean isNested) {
       List<Records.Column> records = new ArrayList<>();
       schema.toMetadataList().forEach(
         column -> {
-          // concat parent column name to use full column name, i.e. struct_col.nested_col
-          String columnName = parentColumnName == null ? column.name() : parentColumnName + "." + column.name();
+          List<String> columnNames = CollectionUtils.isEmpty(parentColumnNames) ? new ArrayList<>() : new ArrayList<>(parentColumnNames);
+          columnNames.add(column.name());
           // nested columns have the same index as their parent
           int currentIndex = columnIndex == UNDEFINED_INDEX ? schema.index(column.name()) : columnIndex;
           // if column is a map / struct, recursively scan nested columns
           if (column.isMap()) {
             List<Records.Column> mapRecords =
-              columns(schemaPath, table, column.tupleSchema(), columnName, currentIndex, true);
+              columns(schemaPath, table, column.tupleSchema(), columnNames, currentIndex, true);
             records.addAll(mapRecords);
           }
 
           String tableName = table.getTableInfo().name();
-          if (filterEvaluator.shouldVisitColumn(schemaPath, tableName, columnName)) {
+
+          // concat parent column names to use full column name, i.e. struct_col.nested_col
+          String columnPath = String.join(".", columnNames);
+          if (filterEvaluator.shouldVisitColumn(schemaPath, tableName, columnPath)) {
             ColumnStatistics<?> columnStatistics =
-              table.getColumnStatistics(SchemaPath.parseFromString(columnName));
-            records.add(new Records.Column(IS_CATALOG_NAME, schemaPath, tableName, columnName,
+              table.getColumnStatistics(SchemaPath.getCompoundPath(columnNames.toArray(new String[0])));
+            records.add(new Records.Column(IS_CATALOG_NAME, schemaPath, tableName, columnPath,
               column, columnStatistics, currentIndex, isNested));
           }
         });
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java
index 9c63da5..92472cf 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java
@@ -3227,21 +3227,21 @@
   public void testAnalyzeEmptyRequiredParquetTable() throws Exception {
     String tableName = "analyze_empty_simple_required";
 
-    run("create table dfs.tmp.%s as select 1 as id, 'a' as name from (values(1)) where 1 = 2", tableName);
+    run("create table dfs.tmp.%s as select 1 as `date`, 'a' as name from (values(1)) where 1 = 2", tableName);
 
     File table = new File(dirTestWatcher.getDfsTestTmpDir(), tableName);
 
     TableInfo tableInfo = getTableInfo(tableName, "tmp");
 
     TupleMetadata schema = new SchemaBuilder()
-        .add("id", TypeProtos.MinorType.INT)
+        .add("date", TypeProtos.MinorType.INT)
         .add("name", TypeProtos.MinorType.VARCHAR)
         .build();
 
     Map<SchemaPath, ColumnStatistics<?>> columnStatistics = ImmutableMap.<SchemaPath, ColumnStatistics<?>>builder()
         .put(SchemaPath.getSimplePath("name"),
             getColumnStatistics(null, null, 0L, TypeProtos.MinorType.VARCHAR))
-        .put(SchemaPath.getSimplePath("id"),
+        .put(SchemaPath.getSimplePath("date"),
             getColumnStatistics(null, null, 0L, TypeProtos.MinorType.INT))
         .build();
 
@@ -3300,6 +3300,14 @@
           .rowGroupsMetadata(tableInfo, (String) null, null);
 
       assertEquals(1, rowGroupsMetadata.size());
+
+      testBuilder()
+          .sqlQuery("select COLUMN_NAME from INFORMATION_SCHEMA.`COLUMNS` where table_name='%s'", tableName)
+          .unOrdered()
+          .baselineColumns("COLUMN_NAME")
+          .baselineValues("date")
+          .baselineValues("name")
+          .go();
     } finally {
       run("analyze table dfs.tmp.`%s` drop metadata if exists", tableName);
       run("drop table if exists dfs.tmp.`%s`", tableName);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
index e7e9c0c..2ecc3b5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
@@ -944,9 +944,9 @@
 
   @Test
   public void testRefreshWithColumns() throws Exception {
-    test("refresh table metadata columns (o_custkey, o_orderdate) dfs.`%s`", TABLE_NAME_1);
+    test("refresh table metadata columns (`date`, o_orderdate) dfs.`%s`", TABLE_NAME_1);
     checkForMetadataFile(TABLE_NAME_1);
-    String query = String.format("select dir0, dir1, o_custkey, o_orderdate from dfs.`%s` " +
+    String query = String.format("select dir0, dir1, o_custkey as `date`, o_orderdate from dfs.`%s` " +
             " where dir0=1994 and dir1 in ('Q1', 'Q2')", TABLE_NAME_1);
     int expectedRowCount = 20;
     int actualRowCount = testSql(query);