[SPARK-49756][SQL][FOLLOWUP] Use correct pgsql datetime fields when pushing down EXTRACT
### What changes were proposed in this pull request?
This is a followup of https://github.com/apache/spark/pull/48210 to fix correctness issues caused by pgsql filter pushdown. These datetime fields were picked wrongly before, see https://neon.tech/postgresql/postgresql-date-functions/postgresql-extract
### Why are the changes needed?
bug fix
### Does this PR introduce _any_ user-facing change?
Yes, query result is corrected, but this bug is not released yet.
### How was this patch tested?
updated test
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #50101 from cloud-fan/pgsql.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 208a7ee6771d1fe7e6c240a8add0c8f72ba5a484)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala
index eaf2a07..af3c17d 100644
--- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala
+++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala
@@ -185,6 +185,9 @@
"('amy', '2022-05-19', '2022-05-19 00:00:00')").executeUpdate()
connection.prepareStatement("INSERT INTO datetime VALUES " +
"('alex', '2022-05-18', '2022-05-18 00:00:00')").executeUpdate()
+ // '2022-01-01' is Saturday and is in ISO year 2021.
+ connection.prepareStatement("INSERT INTO datetime VALUES " +
+ "('tom', '2022-01-01', '2022-01-01 00:00:00')").executeUpdate()
}
override def testUpdateColumnType(tbl: String): Unit = {
@@ -279,17 +282,19 @@
val df4 = sql(s"SELECT name FROM $tbl WHERE hour(time1) = 0 AND minute(time1) = 0")
checkFilterPushed(df4)
val rows4 = df4.collect()
- assert(rows4.length === 2)
+ assert(rows4.length === 3)
assert(rows4(0).getString(0) === "amy")
assert(rows4(1).getString(0) === "alex")
+ assert(rows4(2).getString(0) === "tom")
val df5 = sql(s"SELECT name FROM $tbl WHERE " +
- "extract(WEEk from date1) > 10 AND extract(YEAROFWEEK from date1) = 2022")
+ "extract(WEEK from date1) > 10 AND extract(YEAR from date1) = 2022")
checkFilterPushed(df5)
val rows5 = df5.collect()
- assert(rows5.length === 2)
+ assert(rows5.length === 3)
assert(rows5(0).getString(0) === "amy")
assert(rows5(1).getString(0) === "alex")
+ assert(rows5(2).getString(0) === "tom")
val df6 = sql(s"SELECT name FROM $tbl WHERE date_add(date1, 1) = date'2022-05-20' " +
"AND datediff(date1, '2022-05-10') > 0")
@@ -304,11 +309,25 @@
assert(rows7.length === 1)
assert(rows7(0).getString(0) === "alex")
- val df8 = sql(s"SELECT name FROM $tbl WHERE dayofweek(date1) = 4")
- checkFilterPushed(df8)
- val rows8 = df8.collect()
- assert(rows8.length === 1)
- assert(rows8(0).getString(0) === "alex")
+ withClue("dayofweek") {
+ val dow = sql(s"SELECT dayofweek(date1) FROM $tbl WHERE name = 'alex'")
+ .collect().head.getInt(0)
+ val df = sql(s"SELECT name FROM $tbl WHERE dayofweek(date1) = $dow")
+ checkFilterPushed(df)
+ val rows = df.collect()
+ assert(rows.length === 1)
+ assert(rows(0).getString(0) === "alex")
+ }
+
+ withClue("yearofweek") {
+ val yow = sql(s"SELECT extract(YEAROFWEEK from date1) FROM $tbl WHERE name = 'tom'")
+ .collect().head.getInt(0)
+ val df = sql(s"SELECT name FROM $tbl WHERE extract(YEAROFWEEK from date1) = $yow")
+ checkFilterPushed(df)
+ val rows = df.collect()
+ assert(rows.length === 1)
+ assert(rows(0).getString(0) === "tom")
+ }
val df9 = sql(s"SELECT name FROM $tbl WHERE " +
"dayofyear(date1) > 100 order by dayofyear(date1) limit 1")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
index d7fb12f..4a9fbe8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
@@ -303,12 +303,24 @@
class PostgresSQLBuilder extends JDBCSQLBuilder {
override def visitExtract(field: String, source: String): String = {
- field match {
- case "DAY_OF_YEAR" => s"EXTRACT(DOY FROM $source)"
- case "YEAR_OF_WEEK" => s"EXTRACT(YEAR FROM $source)"
- case "DAY_OF_WEEK" => s"EXTRACT(DOW FROM $source)"
- case _ => super.visitExtract(field, source)
+ // SECOND, MINUTE, HOUR, DAY, MONTH, QUARTER, YEAR are identical on postgres and spark for
+ // both datetime and interval types.
+ // DAY_OF_WEEK is DOW, day of week is full compatible with postgres,
+ // but in V2ExpressionBuilder they converted DAY_OF_WEEK to DAY_OF_WEEK_ISO,
+ // so we need to push down ISODOW
+ // (ISO and standard day of weeks differs in starting day,
+ // Sunday is 0 on standard DOW extraction, while in ISO it's 7)
+ // DAY_OF_YEAR have same semantic, but different name (On postgres, it is DOY)
+ // WEEK is a little bit specific function, but both spark and postgres uses ISO week
+ // YEAR_OF_WEEK is ISO year actually. First few days of a calendar year can belong to the
+ // past year by ISO standard of week counting.
+ val postgresField = field match {
+ case "DAY_OF_WEEK" => "ISODOW"
+ case "DAY_OF_YEAR" => "DOY"
+ case "YEAR_OF_WEEK" => "ISOYEAR"
+ case _ => field
}
+ super.visitExtract(postgresField, source)
}
override def visitBinaryArithmetic(name: String, l: String, r: String): String = {