[FLINK-35221][hive] Support SQL 2011 reserved keywords as identifiers in HiveParser (#18)
diff --git a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index 779de53..9501cdf 100644
--- a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++ b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -607,8 +607,7 @@
builder.setOverwrite(overwrite);
builder.setIsToLocal(isToLocal);
builder.setStaticPartitions(staticPartitionSpec);
- builder.setTempPath(
- new org.apache.flink.core.fs.Path(toStagingDir(stagingParentDir, jobConf)));
+ builder.setPath(new org.apache.flink.core.fs.Path(toStagingDir(stagingParentDir, jobConf)));
builder.setOutputFileConfig(fileNaming);
builder.setIdentifier(identifier);
builder.setPartitionCommitPolicyFactory(
diff --git a/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserConstants.java b/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserConstants.java
index 3bd3e0f..ab7744f 100644
--- a/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserConstants.java
+++ b/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserConstants.java
@@ -30,4 +30,6 @@
/* Constants for insert overwrite directory */
public static final String IS_INSERT_DIRECTORY = "is-insert-directory";
public static final String IS_TO_LOCAL_DIRECTORY = "is-to-local-directory";
+ public static final String HIVE_SUPPORT_SQL_11_RESERVED_KEYWORDS =
+ "hive.support.sql11.reserved.keywords";
}
diff --git a/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/FromClauseASTParser.g b/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/FromClauseASTParser.g
index 77ed836..6e77395 100644
--- a/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/FromClauseASTParser.g
+++ b/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/FromClauseASTParser.g
@@ -37,6 +37,9 @@
RecognitionException e) {
gParent.errors.add(new HiveASTParseError(gParent, e, tokenNames));
}
+ protected boolean useSQL11ReservedKeywordsForIdentifier() {
+ return gParent.useSQL11ReservedKeywordsForIdentifier();
+ }
}
@rulecatch {
diff --git a/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveASTParser.g b/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveASTParser.g
index 367d4b2..ec44d95 100644
--- a/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveASTParser.g
+++ b/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveASTParser.g
@@ -417,6 +417,7 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.flink.table.planner.delegation.hive.copy.HiveParserASTNode;
import org.apache.flink.table.planner.delegation.hive.copy.HiveASTParseError;
+import static org.apache.flink.table.planner.delegation.hive.HiveParserConstants.HIVE_SUPPORT_SQL_11_RESERVED_KEYWORDS;
}
@@ -721,6 +722,12 @@
public void setHiveConf(Configuration hiveConf) {
this.hiveConf = hiveConf;
}
+ protected boolean useSQL11ReservedKeywordsForIdentifier() {
+ if(hiveConf==null){
+ return false;
+ }
+ return !hiveConf.getBoolean(HIVE_SUPPORT_SQL_11_RESERVED_KEYWORDS, true);
+ }
}
@rulecatch {
diff --git a/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/IdentifiersASTParser.g b/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/IdentifiersASTParser.g
index 3ab8631..4d21bd9 100644
--- a/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/IdentifiersASTParser.g
+++ b/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/IdentifiersASTParser.g
@@ -37,6 +37,9 @@
RecognitionException e) {
gParent.errors.add(new HiveASTParseError(gParent, e, tokenNames));
}
+ protected boolean useSQL11ReservedKeywordsForIdentifier() {
+ return gParent.useSQL11ReservedKeywordsForIdentifier();
+ }
}
@rulecatch {
@@ -730,6 +733,8 @@
:
Identifier
| nonReserved -> Identifier[$nonReserved.start]
+ // The reserved keywords in SQL 2011 can be used as identifiers if useSQL11ReservedKeywordsForIdentifier() == true.
+ | {useSQL11ReservedKeywordsForIdentifier()}? sql11ReservedKeywordsUsedAsIdentifier -> Identifier[$sql11ReservedKeywordsUsedAsIdentifier.start]
;
functionIdentifier
@@ -806,3 +811,22 @@
:
KW_IF | KW_ARRAY | KW_MAP | KW_BIGINT | KW_BINARY | KW_BOOLEAN | KW_CURRENT_DATE | KW_CURRENT_TIMESTAMP | KW_DATE | KW_DOUBLE | KW_FLOAT | KW_GROUPING | KW_INT | KW_SMALLINT | KW_TIMESTAMP
;
+
+//The following SQL2011 reserved keywords can be used as identifiers if useSQL11ReservedKeywordsForIdentifier() == true.
+sql11ReservedKeywordsUsedAsIdentifier
+ :
+ KW_ALL | KW_ALTER | KW_ARRAY | KW_AS | KW_AUTHORIZATION | KW_BETWEEN | KW_BIGINT | KW_BINARY | KW_BOOLEAN
+ | KW_BOTH | KW_BY | KW_CREATE | KW_CUBE | KW_CURRENT_DATE | KW_CURRENT_TIMESTAMP | KW_CURSOR | KW_DATE | KW_DECIMAL | KW_DELETE | KW_DESCRIBE
+ | KW_DOUBLE | KW_DROP | KW_EXISTS | KW_EXTERNAL | KW_FALSE | KW_FETCH | KW_FLOAT | KW_FOR | KW_FULL | KW_GRANT
+ | KW_GROUP | KW_GROUPING | KW_IMPORT | KW_IN | KW_INNER | KW_INSERT | KW_INT | KW_INTERSECT | KW_INTO | KW_IS | KW_LATERAL
+ | KW_LEFT | KW_LIKE | KW_LOCAL | KW_NONE | KW_NULL | KW_OF | KW_ORDER | KW_OUT | KW_OUTER | KW_PARTITION
+ | KW_PERCENT | KW_PROCEDURE | KW_RANGE | KW_READS | KW_REVOKE | KW_RIGHT
+ | KW_ROLLUP | KW_ROW | KW_ROWS | KW_SET | KW_SMALLINT | KW_TABLE | KW_TIMESTAMP | KW_TO | KW_TRIGGER | KW_TRUE
+ | KW_TRUNCATE | KW_UNION | KW_UPDATE | KW_USER | KW_USING | KW_VALUES | KW_WITH
+ | KW_REGEXP | KW_RLIKE
+ | KW_PRIMARY
+ | KW_FOREIGN
+ | KW_CONSTRAINT
+ | KW_REFERENCES
+ | KW_PRECISION
+ ;
diff --git a/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/SelectClauseASTParser.g b/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/SelectClauseASTParser.g
index c141d31..ac64e03 100644
--- a/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/SelectClauseASTParser.g
+++ b/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/SelectClauseASTParser.g
@@ -37,6 +37,9 @@
RecognitionException e) {
gParent.errors.add(new HiveASTParseError(gParent, e, tokenNames));
}
+ protected boolean useSQL11ReservedKeywordsForIdentifier() {
+ return gParent.useSQL11ReservedKeywordsForIdentifier();
+ }
}
@rulecatch {
diff --git a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectSupportSQL11ReservedKeywordAsIdentifierITTest.java b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectSupportSQL11ReservedKeywordAsIdentifierITTest.java
new file mode 100644
index 0000000..a652ae7
--- /dev/null
+++ b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectSupportSQL11ReservedKeywordAsIdentifierITTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.table.module.CoreModule;
+import org.apache.flink.table.module.hive.HiveModule;
+import org.apache.flink.table.planner.delegation.hive.HiveParserConstants;
+import org.apache.flink.util.CollectionUtil;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import static org.assertj.core.api.Assertions.assertThatNoException;
+
+/** Test with SQL11 reserved keywords in hive queries. */
+class HiveDialectSupportSQL11ReservedKeywordAsIdentifierITTest {
+ private static HiveCatalog hiveCatalog;
+ private static TableEnvironment tableEnv;
+ private static List<String> sql11ReservedKeywords;
+
+ @BeforeAll
+ static void setup() throws Exception {
+ hiveCatalog = HiveTestUtils.createHiveCatalog();
+ hiveCatalog
+ .getHiveConf()
+ .setBoolean(HiveParserConstants.HIVE_SUPPORT_SQL_11_RESERVED_KEYWORDS, false);
+ hiveCatalog.open();
+ tableEnv = getTableEnvWithHiveCatalog();
+ sql11ReservedKeywords =
+ Arrays.asList(
+ "ALL",
+ "ALTER",
+ "ARRAY",
+ "AS",
+ "AUTHORIZATION",
+ "BETWEEN",
+ "BIGINT",
+ "BINARY",
+ "BOOLEAN",
+ "BOTH",
+ "BY",
+ "CREATE",
+ "CUBE",
+ "CURRENT_DATE",
+ "CURRENT_TIMESTAMP",
+ "CURSOR",
+ "DATE",
+ "DECIMAL",
+ "DELETE",
+ "DESCRIBE",
+ "DOUBLE",
+ "DROP",
+ "EXISTS",
+ "EXTERNAL",
+ "FALSE",
+ "FETCH",
+ "FLOAT",
+ "FOR",
+ "FULL",
+ "GRANT",
+ "GROUP",
+ "GROUPING",
+ "IMPORT",
+ "IN",
+ "INNER",
+ "INSERT",
+ "INT",
+ "INTERSECT",
+ "INTO",
+ "IS",
+ "LATERAL",
+ "LEFT",
+ "LIKE",
+ "LOCAL",
+ "NONE",
+ "NULL",
+ "OF",
+ "ORDER",
+ "OUT",
+ "OUTER",
+ "PARTITION",
+ "PERCENT",
+ "PROCEDURE",
+ "RANGE",
+ "READS",
+ "REVOKE",
+ "RIGHT",
+ "ROLLUP",
+ "ROW",
+ "ROWS",
+ "SET",
+ "SMALLINT",
+ "TABLE",
+ "TIMESTAMP",
+ "TO",
+ "TRIGGER",
+ "TRUE",
+ "TRUNCATE",
+ "UNION",
+ "UPDATE",
+ "USER",
+ "USING",
+ "VALUES",
+ "WITH",
+ "REGEXP",
+ "RLIKE",
+ "PRIMARY",
+ "FOREIGN",
+ "CONSTRAINT",
+ "REFERENCES",
+ "PRECISION");
+ }
+
+ @Test
+ void testReservedKeywordAsIdentifierInDDL() {
+ List<String> toRun =
+ new ArrayList<>(
+ Arrays.asList(
+ "create table table1 (x int, %s int)",
+ "create table table2 (x int) partitioned by (%s string, q string)",
+ "create table table3 (\n"
+ + " a int,\n"
+ + " %s struct<f1: boolean, f2: string, f3: struct<f4: int, f5: double>, f6: int>\n"
+ + ")"));
+ Random random = new Random();
+ for (String queryTemplate : toRun) {
+ // Select a random keyword.
+ String chosenKeyword =
+ sql11ReservedKeywords.get(random.nextInt(sql11ReservedKeywords.size()));
+ String finalQuery = String.format(queryTemplate, chosenKeyword);
+ runQuery(finalQuery);
+ }
+ }
+
+ @Test
+ void testReservedKeywordAsIdentifierInDQL() {
+ List<String> toRun =
+ new ArrayList<>(
+ Arrays.asList(
+ "create table table4(id int,name string,dep string,%s int,age int)",
+ "select avg(%s) over (partition by dep) as avgsal from table4",
+ "select dep,name,%s from (select dep,name,%s,rank() over "
+ + "(partition by dep order by %s desc) as rnk from table4) a where rnk=1",
+ "select %s,sum(cnt) over (order by %s)/sum(cnt) over "
+ + "(order by %s ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) from"
+ + " (select %s,count(*) as cnt from table4 group by %s) a"));
+ Random random = new Random();
+ // Select a random keyword.
+ String chosenKeyword =
+ sql11ReservedKeywords.get(random.nextInt(sql11ReservedKeywords.size()));
+ for (String queryTemplate : toRun) {
+ String finalQuery = queryTemplate.replace("%s", chosenKeyword);
+ runQuery(finalQuery);
+ }
+ }
+
+ @Test
+ void testReservedKeywordAsIdentifierInDML() {
+ List<String> toRun =
+ new ArrayList<>(
+ Arrays.asList(
+ "create table table5 (%s string, value string)",
+ "create table table6(key int, ten int, one int, value string)",
+ "from table5 insert overwrite table table6 map table5.%s,"
+ + " CAST(table5.%s / 10 AS INT), CAST(table5.%s % 10 AS INT),"
+ + " table5.value using 'cat' as (tkey, ten, one, tvalue)"
+ + " distribute by tvalue, tkey"));
+ Random random = new Random();
+ // Select a random keyword.
+ String chosenKeyword =
+ sql11ReservedKeywords.get(random.nextInt(sql11ReservedKeywords.size()));
+ for (String queryTemplate : toRun) {
+ String finalQuery = queryTemplate.replace("%s", chosenKeyword);
+ runQuery(finalQuery);
+ }
+ }
+
+ private void runQuery(String query) {
+ assertThatNoException()
+ .isThrownBy(
+ () -> CollectionUtil.iteratorToList(tableEnv.executeSql(query).collect()));
+ }
+
+ private static TableEnvironment getTableEnvWithHiveCatalog() {
+ TableEnvironment tableEnv = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
+ tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+ tableEnv.useCatalog(hiveCatalog.getName());
+ // automatically load hive module in hive-compatible mode
+ HiveModule hiveModule = new HiveModule(hiveCatalog.getHiveVersion());
+ CoreModule coreModule = CoreModule.INSTANCE;
+ for (String loaded : tableEnv.listModules()) {
+ tableEnv.unloadModule(loaded);
+ }
+ tableEnv.loadModule("hive", hiveModule);
+ tableEnv.loadModule("core", coreModule);
+ return tableEnv;
+ }
+}