[SPARK-47322][PYTHON][CONNECT] Make `withColumnsRenamed` column names duplication handling consistent with `withColumnRenamed`
### What changes were proposed in this pull request?
Make `withColumnsRenamed` duplicated column name handling consistent with `withColumnRenamed`
### Why are the changes needed?
`withColumnsRenamed` checks the column names duplication of output dataframe, this is not consistent with `withColumnRenamed`:
1, `withColumnRenamed` doesn't do this check, and support output a dataframe with duplicated column names;
2, when the input dataframe has duplicated column names, `withColumnsRenamed` always fail, even if the columns with the same name are not touched at all:
```
In [8]: df1 = spark.createDataFrame([(1, "id2"),], ["id", "value"])
...: df2 = spark.createDataFrame([(1, 'x', 'id1'), ], ["id", 'a', "value"])
...: join = df2.join(df1, on=['id'], how='left')
...: join
Out[8]: DataFrame[id: bigint, a: string, value: string, value: string]
In [9]: join.withColumnRenamed('id', 'value')
Out[9]: DataFrame[value: bigint, a: string, value: string, value: string]
In [10]: join.withColumnsRenamed({'id' : 'value'})
...
AnalysisException: [COLUMN_ALREADY_EXISTS] The column `value` already exists. Choose another name or rename the existing column. SQLSTATE: 42711
In [11]: join.withColumnRenamed('a', 'b')
Out[11]: DataFrame[id: bigint, b: string, value: string, value: string]
In [12]: join.withColumnsRenamed({'a' : 'b'})
...
AnalysisException: [COLUMN_ALREADY_EXISTS] The column `value` already exists. Choose another name or rename the existing column. SQLSTATE: 42711
In [13]: join.withColumnRenamed('x', 'y')
Out[13]: DataFrame[id: bigint, a: string, value: string, value: string]
In [14]: join.withColumnsRenamed({'x' : 'y'})
AnalysisException: [COLUMN_ALREADY_EXISTS] The column `value` already exists. Choose another name or rename the existing column. SQLSTATE: 42711
In [15]: join.withColumnRenamed('value', 'new_value')
Out[15]: DataFrame[id: bigint, a: string, new_value: string, new_value: string]
In [16]: join.withColumnsRenamed({'value' : 'new_value'})
AnalysisException: [COLUMN_ALREADY_EXISTS] The column `new_value` already exists. Choose another name or rename the existing column. SQLSTATE: 42711
```
### Does this PR introduce _any_ user-facing change?
yes
### How was this patch tested?
updated tests
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #45431 from zhengruifeng/connect_renames.
Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
index 1b50936..b989f50 100644
--- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
+++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
@@ -566,15 +566,6 @@
comparePlans(
connectTestRelation.withColumnsRenamed(Map("id" -> "id1", "id" -> "id2")),
sparkTestRelation.withColumnsRenamed(Map("id" -> "id1", "id" -> "id2")))
-
- checkError(
- exception = intercept[AnalysisException] {
- transform(
- connectTestRelation.withColumnsRenamed(
- Map("id" -> "duplicatedCol", "name" -> "duplicatedCol")))
- },
- errorClass = "COLUMN_ALREADY_EXISTS",
- parameters = Map("columnName" -> "`duplicatedcol`"))
}
test("Writes fails without path or table") {
diff --git a/python/pyspark/sql/tests/test_dataframe.py b/python/pyspark/sql/tests/test_dataframe.py
index ad7a3b8..38310f1 100644
--- a/python/pyspark/sql/tests/test_dataframe.py
+++ b/python/pyspark/sql/tests/test_dataframe.py
@@ -231,6 +231,28 @@
message_parameters={"arg_name": "colsMap", "arg_type": "tuple"},
)
+ def test_with_columns_renamed_with_duplicated_names(self):
+ df1 = self.spark.createDataFrame([(1, "v1")], ["id", "value"])
+ df2 = self.spark.createDataFrame([(1, "x", "v2")], ["id", "a", "value"])
+ join = df2.join(df1, on=["id"], how="left")
+
+ self.assertEqual(
+ join.withColumnRenamed("id", "value").columns,
+ join.withColumnsRenamed({"id": "value"}).columns,
+ )
+ self.assertEqual(
+ join.withColumnRenamed("a", "b").columns,
+ join.withColumnsRenamed({"a": "b"}).columns,
+ )
+ self.assertEqual(
+ join.withColumnRenamed("value", "new_value").columns,
+ join.withColumnsRenamed({"value": "new_value"}).columns,
+ )
+ self.assertEqual(
+ join.withColumnRenamed("x", "y").columns,
+ join.withColumnsRenamed({"x": "y"}).columns,
+ )
+
def test_ordering_of_with_columns_renamed(self):
df = self.spark.range(10)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 189be1d..f3bf611 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2932,9 +2932,6 @@
}
)
}
- SchemaUtils.checkColumnNameDuplication(
- projectList.map(_.name),
- sparkSession.sessionState.conf.caseSensitiveAnalysis)
withPlan(Project(projectList, logicalPlan))
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 1dc367f..6b34a64 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -789,45 +789,6 @@
assert(df.columns === Array("key", "value", "renamed1", "renamed2"))
}
- test("SPARK-40311: withColumnsRenamed case sensitive") {
- withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
- val df = testData.toDF().withColumns(Seq("newCol1", "newCOL2"),
- Seq(col("key") + 1, col("key") + 2))
- .withColumnsRenamed(Map("newCol1" -> "renamed1", "newCol2" -> "renamed2"))
- checkAnswer(
- df,
- testData.collect().map { case Row(key: Int, value: String) =>
- Row(key, value, key + 1, key + 2)
- }.toSeq)
- assert(df.columns === Array("key", "value", "renamed1", "newCOL2"))
- }
- }
-
- test("SPARK-40311: withColumnsRenamed duplicate column names simple") {
- checkError(
- exception = intercept[AnalysisException] {
- person.withColumnsRenamed(Map("id" -> "renamed", "name" -> "renamed"))
- },
- errorClass = "COLUMN_ALREADY_EXISTS",
- parameters = Map("columnName" -> "`renamed`"))
- }
-
- test("SPARK-40311: withColumnsRenamed duplicate column names simple case sensitive") {
- withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
- val df = person.withColumnsRenamed(Map("id" -> "renamed", "name" -> "Renamed"))
- assert(df.columns === Array("renamed", "Renamed", "age"))
- }
- }
-
- test("SPARK-40311: withColumnsRenamed duplicate column names indirect") {
- checkError(
- exception = intercept[AnalysisException] {
- person.withColumnsRenamed(Map("id" -> "renamed1", "renamed1" -> "age"))
- },
- errorClass = "COLUMN_ALREADY_EXISTS",
- parameters = Map("columnName" -> "`age`"))
- }
-
test("SPARK-46260: withColumnsRenamed should respect the Map ordering") {
val df = spark.range(10).toDF()
assert(df.withColumnsRenamed(ListMap("id" -> "a", "a" -> "b")).columns === Array("b"))