[FLINK-35221][hive] Support SQL 2011 reserved keywords as identifiers in HiveParser (#18)

diff --git a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index 779de53..9501cdf 100644
--- a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++ b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -607,8 +607,7 @@
         builder.setOverwrite(overwrite);
         builder.setIsToLocal(isToLocal);
         builder.setStaticPartitions(staticPartitionSpec);
-        builder.setTempPath(
-                new org.apache.flink.core.fs.Path(toStagingDir(stagingParentDir, jobConf)));
+        builder.setPath(new org.apache.flink.core.fs.Path(toStagingDir(stagingParentDir, jobConf)));
         builder.setOutputFileConfig(fileNaming);
         builder.setIdentifier(identifier);
         builder.setPartitionCommitPolicyFactory(
diff --git a/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserConstants.java b/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserConstants.java
index 3bd3e0f..ab7744f 100644
--- a/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserConstants.java
+++ b/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserConstants.java
@@ -30,4 +30,6 @@
     /* Constants for insert overwrite directory */
     public static final String IS_INSERT_DIRECTORY = "is-insert-directory";
     public static final String IS_TO_LOCAL_DIRECTORY = "is-to-local-directory";
+    public static final String HIVE_SUPPORT_SQL_11_RESERVED_KEYWORDS =
+            "hive.support.sql11.reserved.keywords";
 }
diff --git a/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/FromClauseASTParser.g b/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/FromClauseASTParser.g
index 77ed836..6e77395 100644
--- a/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/FromClauseASTParser.g
+++ b/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/FromClauseASTParser.g
@@ -37,6 +37,9 @@
       RecognitionException e) {
     gParent.errors.add(new HiveASTParseError(gParent, e, tokenNames));
   }
+  protected boolean useSQL11ReservedKeywordsForIdentifier() {
+    return gParent.useSQL11ReservedKeywordsForIdentifier();
+  }
 }
 
 @rulecatch {
diff --git a/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveASTParser.g b/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveASTParser.g
index 367d4b2..ec44d95 100644
--- a/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveASTParser.g
+++ b/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveASTParser.g
@@ -417,6 +417,7 @@
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.flink.table.planner.delegation.hive.copy.HiveParserASTNode;
 import org.apache.flink.table.planner.delegation.hive.copy.HiveASTParseError;
+import static org.apache.flink.table.planner.delegation.hive.HiveParserConstants.HIVE_SUPPORT_SQL_11_RESERVED_KEYWORDS;
 }
 
 
@@ -721,6 +722,12 @@
   public void setHiveConf(Configuration hiveConf) {
     this.hiveConf = hiveConf;
   }
+  protected boolean useSQL11ReservedKeywordsForIdentifier() {
+    if(hiveConf==null){
+      return false;
+    }
+    return !hiveConf.getBoolean(HIVE_SUPPORT_SQL_11_RESERVED_KEYWORDS, true);
+  }
 }
 
 @rulecatch {
diff --git a/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/IdentifiersASTParser.g b/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/IdentifiersASTParser.g
index 3ab8631..4d21bd9 100644
--- a/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/IdentifiersASTParser.g
+++ b/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/IdentifiersASTParser.g
@@ -37,6 +37,9 @@
       RecognitionException e) {
     gParent.errors.add(new HiveASTParseError(gParent, e, tokenNames));
   }
+  protected boolean useSQL11ReservedKeywordsForIdentifier() {
+    return gParent.useSQL11ReservedKeywordsForIdentifier();
+  }
 }
 
 @rulecatch {
@@ -730,6 +733,8 @@
     :
     Identifier
     | nonReserved -> Identifier[$nonReserved.start]
+    // The reserved keywords in SQL 2011 can be used as identifiers if useSQL11ReservedKeywordsForIdentifier() == true.
+    | {useSQL11ReservedKeywordsForIdentifier()}? sql11ReservedKeywordsUsedAsIdentifier -> Identifier[$sql11ReservedKeywordsUsedAsIdentifier.start]
     ;
 
 functionIdentifier
@@ -806,3 +811,22 @@
     :
     KW_IF | KW_ARRAY | KW_MAP | KW_BIGINT | KW_BINARY | KW_BOOLEAN | KW_CURRENT_DATE | KW_CURRENT_TIMESTAMP | KW_DATE | KW_DOUBLE | KW_FLOAT | KW_GROUPING | KW_INT | KW_SMALLINT | KW_TIMESTAMP
     ;
+
+//The following SQL2011 reserved keywords can be used as identifiers if useSQL11ReservedKeywordsForIdentifier() == true.
+sql11ReservedKeywordsUsedAsIdentifier
+    :
+    KW_ALL | KW_ALTER | KW_ARRAY | KW_AS | KW_AUTHORIZATION | KW_BETWEEN | KW_BIGINT | KW_BINARY | KW_BOOLEAN
+    | KW_BOTH | KW_BY | KW_CREATE | KW_CUBE | KW_CURRENT_DATE | KW_CURRENT_TIMESTAMP | KW_CURSOR | KW_DATE | KW_DECIMAL | KW_DELETE | KW_DESCRIBE
+    | KW_DOUBLE | KW_DROP | KW_EXISTS | KW_EXTERNAL | KW_FALSE | KW_FETCH | KW_FLOAT | KW_FOR | KW_FULL | KW_GRANT
+    | KW_GROUP | KW_GROUPING | KW_IMPORT | KW_IN | KW_INNER | KW_INSERT | KW_INT | KW_INTERSECT | KW_INTO | KW_IS | KW_LATERAL
+    | KW_LEFT | KW_LIKE | KW_LOCAL | KW_NONE | KW_NULL | KW_OF | KW_ORDER | KW_OUT | KW_OUTER | KW_PARTITION
+    | KW_PERCENT | KW_PROCEDURE | KW_RANGE | KW_READS | KW_REVOKE | KW_RIGHT
+    | KW_ROLLUP | KW_ROW | KW_ROWS | KW_SET | KW_SMALLINT | KW_TABLE | KW_TIMESTAMP | KW_TO | KW_TRIGGER | KW_TRUE
+    | KW_TRUNCATE | KW_UNION | KW_UPDATE | KW_USER | KW_USING | KW_VALUES | KW_WITH
+    | KW_REGEXP | KW_RLIKE
+    | KW_PRIMARY
+    | KW_FOREIGN
+    | KW_CONSTRAINT
+    | KW_REFERENCES
+    | KW_PRECISION
+    ;
diff --git a/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/SelectClauseASTParser.g b/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/SelectClauseASTParser.g
index c141d31..ac64e03 100644
--- a/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/SelectClauseASTParser.g
+++ b/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/SelectClauseASTParser.g
@@ -37,6 +37,9 @@
       RecognitionException e) {
     gParent.errors.add(new HiveASTParseError(gParent, e, tokenNames));
   }
+  protected boolean useSQL11ReservedKeywordsForIdentifier() {
+    return gParent.useSQL11ReservedKeywordsForIdentifier();
+  }
 }
 
 @rulecatch {
diff --git a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectSupportSQL11ReservedKeywordAsIdentifierITTest.java b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectSupportSQL11ReservedKeywordAsIdentifierITTest.java
new file mode 100644
index 0000000..a652ae7
--- /dev/null
+++ b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectSupportSQL11ReservedKeywordAsIdentifierITTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.table.module.CoreModule;
+import org.apache.flink.table.module.hive.HiveModule;
+import org.apache.flink.table.planner.delegation.hive.HiveParserConstants;
+import org.apache.flink.util.CollectionUtil;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import static org.assertj.core.api.Assertions.assertThatNoException;
+
+/** Test with SQL11 reserved keywords in hive queries. */
+class HiveDialectSupportSQL11ReservedKeywordAsIdentifierITTest {
+    private static HiveCatalog hiveCatalog;
+    private static TableEnvironment tableEnv;
+    private static List<String> sql11ReservedKeywords;
+
+    @BeforeAll
+    static void setup() throws Exception {
+        hiveCatalog = HiveTestUtils.createHiveCatalog();
+        hiveCatalog
+                .getHiveConf()
+                .setBoolean(HiveParserConstants.HIVE_SUPPORT_SQL_11_RESERVED_KEYWORDS, false);
+        hiveCatalog.open();
+        tableEnv = getTableEnvWithHiveCatalog();
+        sql11ReservedKeywords =
+                Arrays.asList(
+                        "ALL",
+                        "ALTER",
+                        "ARRAY",
+                        "AS",
+                        "AUTHORIZATION",
+                        "BETWEEN",
+                        "BIGINT",
+                        "BINARY",
+                        "BOOLEAN",
+                        "BOTH",
+                        "BY",
+                        "CREATE",
+                        "CUBE",
+                        "CURRENT_DATE",
+                        "CURRENT_TIMESTAMP",
+                        "CURSOR",
+                        "DATE",
+                        "DECIMAL",
+                        "DELETE",
+                        "DESCRIBE",
+                        "DOUBLE",
+                        "DROP",
+                        "EXISTS",
+                        "EXTERNAL",
+                        "FALSE",
+                        "FETCH",
+                        "FLOAT",
+                        "FOR",
+                        "FULL",
+                        "GRANT",
+                        "GROUP",
+                        "GROUPING",
+                        "IMPORT",
+                        "IN",
+                        "INNER",
+                        "INSERT",
+                        "INT",
+                        "INTERSECT",
+                        "INTO",
+                        "IS",
+                        "LATERAL",
+                        "LEFT",
+                        "LIKE",
+                        "LOCAL",
+                        "NONE",
+                        "NULL",
+                        "OF",
+                        "ORDER",
+                        "OUT",
+                        "OUTER",
+                        "PARTITION",
+                        "PERCENT",
+                        "PROCEDURE",
+                        "RANGE",
+                        "READS",
+                        "REVOKE",
+                        "RIGHT",
+                        "ROLLUP",
+                        "ROW",
+                        "ROWS",
+                        "SET",
+                        "SMALLINT",
+                        "TABLE",
+                        "TIMESTAMP",
+                        "TO",
+                        "TRIGGER",
+                        "TRUE",
+                        "TRUNCATE",
+                        "UNION",
+                        "UPDATE",
+                        "USER",
+                        "USING",
+                        "VALUES",
+                        "WITH",
+                        "REGEXP",
+                        "RLIKE",
+                        "PRIMARY",
+                        "FOREIGN",
+                        "CONSTRAINT",
+                        "REFERENCES",
+                        "PRECISION");
+    }
+
+    @Test
+    void testReservedKeywordAsIdentifierInDDL() {
+        List<String> toRun =
+                new ArrayList<>(
+                        Arrays.asList(
+                                "create table table1 (x int, %s int)",
+                                "create table table2 (x int) partitioned by (%s string, q string)",
+                                "create table table3 (\n"
+                                        + "  a int,\n"
+                                        + "  %s struct<f1: boolean, f2: string, f3: struct<f4: int, f5: double>, f6: int>\n"
+                                        + ")"));
+        Random random = new Random();
+        for (String queryTemplate : toRun) {
+            // Select a random keyword.
+            String chosenKeyword =
+                    sql11ReservedKeywords.get(random.nextInt(sql11ReservedKeywords.size()));
+            String finalQuery = String.format(queryTemplate, chosenKeyword);
+            runQuery(finalQuery);
+        }
+    }
+
+    @Test
+    void testReservedKeywordAsIdentifierInDQL() {
+        List<String> toRun =
+                new ArrayList<>(
+                        Arrays.asList(
+                                "create table table4(id int,name string,dep string,%s int,age int)",
+                                "select avg(%s) over (partition by dep) as avgsal from table4",
+                                "select dep,name,%s from (select dep,name,%s,rank() over "
+                                        + "(partition by dep order by %s desc) as rnk from table4) a where rnk=1",
+                                "select %s,sum(cnt) over (order by %s)/sum(cnt) over "
+                                        + "(order by %s ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) from"
+                                        + " (select %s,count(*) as cnt from table4 group by %s) a"));
+        Random random = new Random();
+        // Select a random keyword.
+        String chosenKeyword =
+                sql11ReservedKeywords.get(random.nextInt(sql11ReservedKeywords.size()));
+        for (String queryTemplate : toRun) {
+            String finalQuery = queryTemplate.replace("%s", chosenKeyword);
+            runQuery(finalQuery);
+        }
+    }
+
+    @Test
+    void testReservedKeywordAsIdentifierInDML() {
+        List<String> toRun =
+                new ArrayList<>(
+                        Arrays.asList(
+                                "create table table5 (%s string, value string)",
+                                "create table table6(key int, ten int, one int, value string)",
+                                "from table5 insert overwrite table table6 map table5.%s,"
+                                        + " CAST(table5.%s / 10 AS INT), CAST(table5.%s % 10 AS INT),"
+                                        + " table5.value using 'cat' as (tkey, ten, one, tvalue)"
+                                        + " distribute by tvalue, tkey"));
+        Random random = new Random();
+        // Select a random keyword.
+        String chosenKeyword =
+                sql11ReservedKeywords.get(random.nextInt(sql11ReservedKeywords.size()));
+        for (String queryTemplate : toRun) {
+            String finalQuery = queryTemplate.replace("%s", chosenKeyword);
+            runQuery(finalQuery);
+        }
+    }
+
+    private void runQuery(String query) {
+        assertThatNoException()
+                .isThrownBy(
+                        () -> CollectionUtil.iteratorToList(tableEnv.executeSql(query).collect()));
+    }
+
+    private static TableEnvironment getTableEnvWithHiveCatalog() {
+        TableEnvironment tableEnv = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
+        tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+        tableEnv.useCatalog(hiveCatalog.getName());
+        // automatically load hive module in hive-compatible mode
+        HiveModule hiveModule = new HiveModule(hiveCatalog.getHiveVersion());
+        CoreModule coreModule = CoreModule.INSTANCE;
+        for (String loaded : tableEnv.listModules()) {
+            tableEnv.unloadModule(loaded);
+        }
+        tableEnv.loadModule("hive", hiveModule);
+        tableEnv.loadModule("core", coreModule);
+        return tableEnv;
+    }
+}