[SPARK-47421][SQL] Add collation support for URL expressions
### What changes were proposed in this pull request?
Introduce collation awareness for URL expressions: url_encode, url_decode, parse_url.
### Why are the changes needed?
Add collation support for URL expressions in Spark.
### Does this PR introduce _any_ user-facing change?
Yes, users should now be able to use collated strings within arguments for URL functions: url_encode, url_decode, parse_url.
### How was this patch tested?
E2e sql tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #46460 from uros-db/url-expressions.
Authored-by: Uros Bojanic <157381213+uros-db@users.noreply.github.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_url_decode.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_url_decode.explain
index d612190..ee4936f 100644
--- a/connector/connect/common/src/test/resources/query-tests/explain-results/function_url_decode.explain
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_url_decode.explain
@@ -1,2 +1,2 @@
-Project [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UrlCodec$, StringType, decode, g#0, UTF-8, StringType, StringType, true, true, true) AS url_decode(g)#0]
+Project [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UrlCodec$, StringType, decode, g#0, UTF-8, StringTypeAnyCollation, StringTypeAnyCollation, true, true, true) AS url_decode(g)#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_url_encode.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_url_encode.explain
index bd2c63e..45c55f4 100644
--- a/connector/connect/common/src/test/resources/query-tests/explain-results/function_url_encode.explain
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_url_encode.explain
@@ -1,2 +1,2 @@
-Project [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UrlCodec$, StringType, encode, g#0, UTF-8, StringType, StringType, true, true, true) AS url_encode(g)#0]
+Project [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UrlCodec$, StringType, encode, g#0, UTF-8, StringTypeAnyCollation, StringTypeAnyCollation, true, true, true) AS url_encode(g)#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/urlExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/urlExpressions.scala
index 47b37a5..ef8f2ea 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/urlExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/urlExpressions.scala
@@ -28,7 +28,8 @@
import org.apache.spark.sql.catalyst.trees.UnaryLike
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{AbstractDataType, DataType, StringType}
+import org.apache.spark.sql.internal.types.StringTypeAnyCollation
+import org.apache.spark.sql.types.{AbstractDataType, DataType}
import org.apache.spark.unsafe.types.UTF8String
// scalastyle:off line.size.limit
@@ -54,16 +55,16 @@
override def replacement: Expression =
StaticInvoke(
UrlCodec.getClass,
- StringType,
+ SQLConf.get.defaultStringType,
"encode",
Seq(child, Literal("UTF-8")),
- Seq(StringType, StringType))
+ Seq(StringTypeAnyCollation, StringTypeAnyCollation))
override protected def withNewChildInternal(newChild: Expression): Expression = {
copy(child = newChild)
}
- override def inputTypes: Seq[AbstractDataType] = Seq(StringType)
+ override def inputTypes: Seq[AbstractDataType] = Seq(StringTypeAnyCollation)
override def prettyName: String = "url_encode"
}
@@ -91,16 +92,16 @@
override def replacement: Expression =
StaticInvoke(
UrlCodec.getClass,
- StringType,
+ SQLConf.get.defaultStringType,
"decode",
Seq(child, Literal("UTF-8")),
- Seq(StringType, StringType))
+ Seq(StringTypeAnyCollation, StringTypeAnyCollation))
override protected def withNewChildInternal(newChild: Expression): Expression = {
copy(child = newChild)
}
- override def inputTypes: Seq[AbstractDataType] = Seq(StringType)
+ override def inputTypes: Seq[AbstractDataType] = Seq(StringTypeAnyCollation)
override def prettyName: String = "url_decode"
}
@@ -154,8 +155,8 @@
def this(children: Seq[Expression]) = this(children, SQLConf.get.ansiEnabled)
override def nullable: Boolean = true
- override def inputTypes: Seq[DataType] = Seq.fill(children.size)(StringType)
- override def dataType: DataType = StringType
+ override def inputTypes: Seq[AbstractDataType] = Seq.fill(children.size)(StringTypeAnyCollation)
+ override def dataType: DataType = SQLConf.get.defaultStringType
override def prettyName: String = "parse_url"
// If the url is a constant, cache the URL object so that we don't need to convert url
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CollationSQLExpressionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CollationSQLExpressionsSuite.scala
index b5f1dc7..2b63901 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CollationSQLExpressionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CollationSQLExpressionsSuite.scala
@@ -208,6 +208,109 @@
})
}
+ test("Support UrlEncode hash expression with collation") {
+ case class UrlEncodeTestCase(
+ input: String,
+ collationName: String,
+ result: String
+ )
+
+ val testCases = Seq(
+ UrlEncodeTestCase("https://spark.apache.org", "UTF8_BINARY",
+ "https%3A%2F%2Fspark.apache.org"),
+ UrlEncodeTestCase("https://spark.apache.org", "UTF8_BINARY_LCASE",
+ "https%3A%2F%2Fspark.apache.org"),
+ UrlEncodeTestCase("https://spark.apache.org", "UNICODE",
+ "https%3A%2F%2Fspark.apache.org"),
+ UrlEncodeTestCase("https://spark.apache.org", "UNICODE_CI",
+ "https%3A%2F%2Fspark.apache.org")
+ )
+
+ // Supported collations
+ testCases.foreach(t => {
+ val query =
+ s"""
+ |select url_encode('${t.input}')
+ |""".stripMargin
+ // Result
+ withSQLConf(SqlApiConf.DEFAULT_COLLATION -> t.collationName) {
+ val testQuery = sql(query)
+ checkAnswer(testQuery, Row(t.result))
+ val dataType = StringType(t.collationName)
+ assert(testQuery.schema.fields.head.dataType.sameType(dataType))
+ }
+ })
+ }
+
+ test("Support UrlDecode hash expression with collation") {
+ case class UrlDecodeTestCase(
+ input: String,
+ collationName: String,
+ result: String
+ )
+
+ val testCases = Seq(
+ UrlDecodeTestCase("https%3A%2F%2Fspark.apache.org", "UTF8_BINARY",
+ "https://spark.apache.org"),
+ UrlDecodeTestCase("https%3A%2F%2Fspark.apache.org", "UTF8_BINARY_LCASE",
+ "https://spark.apache.org"),
+ UrlDecodeTestCase("https%3A%2F%2Fspark.apache.org", "UNICODE",
+ "https://spark.apache.org"),
+ UrlDecodeTestCase("https%3A%2F%2Fspark.apache.org", "UNICODE_CI",
+ "https://spark.apache.org")
+ )
+
+ // Supported collations
+ testCases.foreach(t => {
+ val query =
+ s"""
+ |select url_decode('${t.input}')
+ |""".stripMargin
+ // Result
+ withSQLConf(SqlApiConf.DEFAULT_COLLATION -> t.collationName) {
+ val testQuery = sql(query)
+ checkAnswer(testQuery, Row(t.result))
+ val dataType = StringType(t.collationName)
+ assert(testQuery.schema.fields.head.dataType.sameType(dataType))
+ }
+ })
+ }
+
+ test("Support ParseUrl hash expression with collation") {
+ case class ParseUrlTestCase(
+ input: String,
+ collationName: String,
+ path: String,
+ result: String
+ )
+
+ val testCases = Seq(
+ ParseUrlTestCase("http://spark.apache.org/path?query=1", "UTF8_BINARY", "HOST",
+ "spark.apache.org"),
+ ParseUrlTestCase("http://spark.apache.org/path?query=2", "UTF8_BINARY_LCASE", "PATH",
+ "/path"),
+ ParseUrlTestCase("http://spark.apache.org/path?query=3", "UNICODE", "QUERY",
+ "query=3"),
+ ParseUrlTestCase("http://spark.apache.org/path?query=4", "UNICODE_CI", "PROTOCOL",
+ "http")
+ )
+
+ // Supported collations
+ testCases.foreach(t => {
+ val query =
+ s"""
+ |select parse_url('${t.input}', '${t.path}')
+ |""".stripMargin
+ // Result
+ withSQLConf(SqlApiConf.DEFAULT_COLLATION -> t.collationName) {
+ val testQuery = sql(query)
+ checkAnswer(testQuery, Row(t.result))
+ val dataType = StringType(t.collationName)
+ assert(testQuery.schema.fields.head.dataType.sameType(dataType))
+ }
+ })
+ }
+
test("Conv expression with collation") {
// Supported collations
case class ConvTestCase(