[SPARK-49756][SQL][FOLLOWUP] Use correct pgsql datetime fields when pushing down EXTRACT

### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/48210 to fix correctness issues caused by pgsql filter pushdown. These datetime fields were picked wrongly before, see https://neon.tech/postgresql/postgresql-date-functions/postgresql-extract

### Why are the changes needed?

bug fix

### Does this PR introduce _any_ user-facing change?

Yes, query result is corrected, but this bug is not released yet.

### How was this patch tested?

updated test

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #50101 from cloud-fan/pgsql.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 208a7ee6771d1fe7e6c240a8add0c8f72ba5a484)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala
index eaf2a07..af3c17d 100644
--- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala
+++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala
@@ -185,6 +185,9 @@
       "('amy', '2022-05-19', '2022-05-19 00:00:00')").executeUpdate()
     connection.prepareStatement("INSERT INTO datetime VALUES " +
       "('alex', '2022-05-18', '2022-05-18 00:00:00')").executeUpdate()
+    // '2022-01-01' is Saturday and is in ISO year 2021.
+    connection.prepareStatement("INSERT INTO datetime VALUES " +
+      "('tom', '2022-01-01', '2022-01-01 00:00:00')").executeUpdate()
   }
 
   override def testUpdateColumnType(tbl: String): Unit = {
@@ -279,17 +282,19 @@
     val df4 = sql(s"SELECT name FROM $tbl WHERE hour(time1) = 0 AND minute(time1) = 0")
     checkFilterPushed(df4)
     val rows4 = df4.collect()
-    assert(rows4.length === 2)
+    assert(rows4.length === 3)
     assert(rows4(0).getString(0) === "amy")
     assert(rows4(1).getString(0) === "alex")
+    assert(rows4(2).getString(0) === "tom")
 
     val df5 = sql(s"SELECT name FROM $tbl WHERE " +
-      "extract(WEEk from date1) > 10 AND extract(YEAROFWEEK from date1) = 2022")
+      "extract(WEEK from date1) > 10 AND extract(YEAR from date1) = 2022")
     checkFilterPushed(df5)
     val rows5 = df5.collect()
-    assert(rows5.length === 2)
+    assert(rows5.length === 3)
     assert(rows5(0).getString(0) === "amy")
     assert(rows5(1).getString(0) === "alex")
+    assert(rows5(2).getString(0) === "tom")
 
     val df6 = sql(s"SELECT name FROM $tbl WHERE date_add(date1, 1) = date'2022-05-20' " +
       "AND datediff(date1, '2022-05-10') > 0")
@@ -304,11 +309,25 @@
     assert(rows7.length === 1)
     assert(rows7(0).getString(0) === "alex")
 
-    val df8 = sql(s"SELECT name FROM $tbl WHERE dayofweek(date1) = 4")
-    checkFilterPushed(df8)
-    val rows8 = df8.collect()
-    assert(rows8.length === 1)
-    assert(rows8(0).getString(0) === "alex")
+    withClue("dayofweek") {
+      val dow = sql(s"SELECT dayofweek(date1) FROM $tbl WHERE name = 'alex'")
+        .collect().head.getInt(0)
+      val df = sql(s"SELECT name FROM $tbl WHERE dayofweek(date1) = $dow")
+      checkFilterPushed(df)
+      val rows = df.collect()
+      assert(rows.length === 1)
+      assert(rows(0).getString(0) === "alex")
+    }
+
+    withClue("yearofweek") {
+      val yow = sql(s"SELECT extract(YEAROFWEEK from date1) FROM $tbl WHERE name = 'tom'")
+        .collect().head.getInt(0)
+      val df = sql(s"SELECT name FROM $tbl WHERE extract(YEAROFWEEK from date1) = $yow")
+      checkFilterPushed(df)
+      val rows = df.collect()
+      assert(rows.length === 1)
+      assert(rows(0).getString(0) === "tom")
+    }
 
     val df9 = sql(s"SELECT name FROM $tbl WHERE " +
       "dayofyear(date1) > 100 order by dayofyear(date1) limit 1")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
index d7fb12f..4a9fbe8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
@@ -303,12 +303,24 @@
 
   class PostgresSQLBuilder extends JDBCSQLBuilder {
     override def visitExtract(field: String, source: String): String = {
-      field match {
-        case "DAY_OF_YEAR" => s"EXTRACT(DOY FROM $source)"
-        case "YEAR_OF_WEEK" => s"EXTRACT(YEAR FROM $source)"
-        case "DAY_OF_WEEK" => s"EXTRACT(DOW FROM $source)"
-        case _ => super.visitExtract(field, source)
+      // SECOND, MINUTE, HOUR, DAY, MONTH, QUARTER, YEAR are identical on postgres and spark for
+      // both datetime and interval types.
+      // DAY_OF_WEEK  is DOW, day of week is full compatible with postgres,
+      //              but in V2ExpressionBuilder they converted DAY_OF_WEEK to DAY_OF_WEEK_ISO,
+      //              so we need to push down ISODOW
+      //              (ISO and standard day of weeks differs in starting day,
+      //              Sunday is 0 on standard DOW extraction, while in ISO it's 7)
+      // DAY_OF_YEAR  have same semantic, but different name (On postgres, it is DOY)
+      // WEEK         is a little bit specific function, but both spark and postgres uses ISO week
+      // YEAR_OF_WEEK is ISO year actually. First few days of a calendar year can belong to the
+      //              past year by ISO standard of week counting.
+      val postgresField = field match {
+        case "DAY_OF_WEEK" => "ISODOW"
+        case "DAY_OF_YEAR" => "DOY"
+        case "YEAR_OF_WEEK" => "ISOYEAR"
+        case _ => field
       }
+      super.visitExtract(postgresField, source)
     }
 
     override def visitBinaryArithmetic(name: String, l: String, r: String): String = {