[SPARK-49737][SQL] Disable bucketing on collated columns in complex types
### What changes were proposed in this pull request?
To disable bucketing on collated string types in complex types (structs, arrays and maps).
### Why are the changes needed?
#45260 introduces the logic to disabled bucketing for collated columns, but forgot to address complex types which have collated strings inside.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #48186 from stefankandic/fixBucketing.
Authored-by: Stefan Kandic <stefan.kandic@databricks.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BucketingUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BucketingUtils.scala
index 4fa1e0c..fd47fee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BucketingUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BucketingUtils.scala
@@ -19,7 +19,8 @@
import org.apache.spark.sql.catalyst.expressions.{Attribute, SpecificInternalRow, UnsafeProjection}
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
-import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.sql.types.DataType
+import org.apache.spark.sql.util.SchemaUtils
object BucketingUtils {
// The file name of bucketed data should have 3 parts:
@@ -53,10 +54,7 @@
bucketIdGenerator(mutableInternalRow).getInt(0)
}
- def canBucketOn(dataType: DataType): Boolean = dataType match {
- case st: StringType => st.supportsBinaryOrdering
- case other => true
- }
+ def canBucketOn(dataType: DataType): Boolean = !SchemaUtils.hasNonUTF8BinaryCollation(dataType)
def bucketIdToString(id: Int): String = f"_$id%05d"
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala
index 73fd897..632b930 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala
@@ -162,9 +162,14 @@
withTable(tableName) {
sql(
s"""
- |CREATE TABLE $tableName
- |(id INT, c1 STRING COLLATE UNICODE, c2 string)
- |USING parquet
+ |CREATE TABLE $tableName (
+ | id INT,
+ | c1 STRING COLLATE UNICODE,
+ | c2 STRING,
+ | struct_col STRUCT<col1: STRING COLLATE UNICODE, col2: STRING>,
+ | array_col ARRAY<STRING COLLATE UNICODE>,
+ | map_col MAP<STRING COLLATE UNICODE, STRING>
+ |) USING parquet
|CLUSTERED BY (${bucketColumns.mkString(",")})
|INTO 4 BUCKETS""".stripMargin
)
@@ -175,14 +180,20 @@
createTable("c2")
createTable("id", "c2")
- Seq(Seq("c1"), Seq("c1", "id"), Seq("c1", "c2")).foreach { bucketColumns =>
+ val failBucketingColumns = Seq(
+ Seq("c1"), Seq("c1", "id"), Seq("c1", "c2"),
+ Seq("struct_col"), Seq("array_col"), Seq("map_col")
+ )
+
+ failBucketingColumns.foreach { bucketColumns =>
checkError(
exception = intercept[AnalysisException] {
createTable(bucketColumns: _*)
},
condition = "INVALID_BUCKET_COLUMN_DATA_TYPE",
- parameters = Map("type" -> "\"STRING COLLATE UNICODE\"")
- );
+ parameters = Map("type" -> ".*STRING COLLATE UNICODE.*"),
+ matchPVals = true
+ )
}
}