[FLINK-30679][connectors/hive] Fix IndexOutOfBoundsException for Hive lookup join when column pushdown to Hive lookup table source (#21782)
Co-authored-by: hehuiyuan1 <hehuiyuan@jd.com>
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
index a97eec1..e5a249b 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
@@ -257,8 +257,8 @@
jobConf,
hiveVersion,
tablePath,
- getProducedTableSchema().getFieldDataTypes(),
- getProducedTableSchema().getFieldNames(),
+ getTableSchema().getFieldDataTypes(),
+ getTableSchema().getFieldNames(),
catalogTable.getPartitionKeys(),
projectedFields,
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER));
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinITCase.java
index 6756559..f5e3c3f 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinITCase.java
@@ -365,17 +365,17 @@
batchEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
batchEnv.useCatalog(hiveCatalog.getName());
batchEnv.executeSql(
- "insert overwrite bounded_table values (1,'a',10),(2,'a',21),(2,'b',22),(3,'c',33)")
+ "insert overwrite bounded_table values (1,'a',10),(2,'b',22),(3,'c',33)")
.await();
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
TableImpl flinkTable =
(TableImpl)
tableEnv.sqlQuery(
- "select b.x, b.y from "
+ "select b.x, b.z from "
+ " default_catalog.default_database.probe as p "
- + " join bounded_table for system_time as of p.p as b on p.x=b.x and p.y=b.y");
+ + " join bounded_table for system_time as of p.p as b on p.x=b.x");
List<Row> results = CollectionUtil.iteratorToList(flinkTable.execute().collect());
- assertEquals("[+I[1, a], +I[2, b], +I[3, c]]", results.toString());
+ assertEquals("[+I[1, 10], +I[1, 10], +I[2, 22], +I[2, 22], +I[3, 33]]", results.toString());
}
@Test