DATAFU-175 Support Spark 3.2.0 (#43)
* DATAFU-176 Support Spark 3.2.x
* Make build continue to work for Spark 3.0 and 3.1 too
diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml
index cc9f9b3..ea2c700 100644
--- a/.github/workflows/tests.yml
+++ b/.github/workflows/tests.yml
@@ -20,10 +20,10 @@
steps:
- name: Checkout repository
- uses: actions/checkout@v3
+ uses: actions/checkout@v4
- name: Use Java 8
- uses: actions/setup-java@v2
+ uses: actions/setup-java@v4
with:
java-version: '8'
distribution: 'adopt'
@@ -31,7 +31,7 @@
- name: Set up Python
- uses: actions/setup-python@v4
+ uses: actions/setup-python@v5
with:
python-version: '3.7'
diff --git a/datafu-spark/README.md b/datafu-spark/README.md
index 07f48b1..f670067 100644
--- a/datafu-spark/README.md
+++ b/datafu-spark/README.md
@@ -11,7 +11,7 @@
| 1.7.0 | 2.2.0 to 2.2.2, 2.3.0 to 2.3.2 and 2.4.0 to 2.4.3|
| 1.8.0 | 2.2.3, 2.3.3, and 2.4.4 to 2.4.5|
| 2.0.0 | 3.0.x - 3.1.x |
-| 2.1.0 (not released yet) | 3.2.x and up |
+| 2.1.0 (not released yet) | 3.0.x - 3.2.x |
# Examples
diff --git a/datafu-spark/build.gradle b/datafu-spark/build.gradle
index a1202d3..e7011dd 100644
--- a/datafu-spark/build.gradle
+++ b/datafu-spark/build.gradle
@@ -70,26 +70,15 @@
testCompile "com.holdenkarau:spark-testing-base_" + scalaVersion + ":3.0.2_" + sparkTestingBaseVersion
} else if (sparkVersion == "3.1.3") {
testCompile "com.holdenkarau:spark-testing-base_" + scalaVersion + ":3.1.2_" + sparkTestingBaseVersion
- } else if (sparkVersion == "3.2.4") {
+ } else if (sparkVersion > "3.2" && sparkVersion < "3.3") {
testCompile "com.holdenkarau:spark-testing-base_" + scalaVersion + ":3.2.1_" + sparkTestingBaseVersion
+ } else if (sparkVersion > "3.3" && sparkVersion < "3.4") {
+ testCompile "com.holdenkarau:spark-testing-base_" + scalaVersion + ":3.3.0_" + sparkTestingBaseVersion
} else {
testCompile "com.holdenkarau:spark-testing-base_" + scalaVersion + ":" + sparkVersion + "_" + sparkTestingBaseVersion
}
}
-// we need to set up the build for hadoop 3
-if (hadoopVersion.startsWith("2.")) {
- dependencies {
- testRuntime "org.apache.hadoop:hadoop-common:$hadoopVersion"
- testRuntime "org.apache.hadoop:hadoop-hdfs:$hadoopVersion"
- testRuntime "org.apache.hadoop:hadoop-mapreduce-client-jobclient:$hadoopVersion"
- }
-} else {
- dependencies {
- testRuntime "org.apache.hadoop:hadoop-core:$hadoopVersion"
- }
-}
-
project.ext.sparkFile = file("build/spark-zips/spark-" + sparkVersion + ".zip")
project.ext.sparkUnzipped = "build/spark-unzipped/spark-" + sparkVersion
diff --git a/datafu-spark/build_and_test_spark.sh b/datafu-spark/build_and_test_spark.sh
index 3554278..5add676 100755
--- a/datafu-spark/build_and_test_spark.sh
+++ b/datafu-spark/build_and_test_spark.sh
@@ -17,8 +17,9 @@
#!/bin/bash
-export SPARK_VERSIONS_FOR_SCALA_212="3.0.0 3.0.1 3.0.2 3.0.3 3.1.1 3.1.2 3.1.3"
-export LATEST_SPARK_VERSIONS_FOR_SCALA_212="3.0.3 3.1.3"
+export SPARK_VERSIONS_FOR_SCALA_212="3.0.1 3.0.2 3.0.3 3.1.1 3.1.2 3.1.3 3.2.0 3.2.1 3.2.2 3.2.3 3.2.4"
+export LATEST_SPARK_VERSIONS_FOR_SCALA_212="3.0.3 3.1.3 3.2.4"
+
STARTTIME=$(date +%s)
diff --git a/datafu-spark/gradle/dependency-versions-scala-2.12.gradle b/datafu-spark/gradle/dependency-versions-scala-2.12.gradle
index e83d5ef..51ff58d 100644
--- a/datafu-spark/gradle/dependency-versions-scala-2.12.gradle
+++ b/datafu-spark/gradle/dependency-versions-scala-2.12.gradle
@@ -17,7 +17,7 @@
* under the License.
*/
ext {
- scalaLibraryVersion = "2.12.10"
+ scalaLibraryVersion = "2.12.15"
// Extra options for the compiler:
// -feature: Give detailed warnings about language feature use (rather than just 'there were 4 warnings')
scalaOptions = "-feature"
diff --git a/datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py b/datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py
index 165e43f..00bcd90 100644
--- a/datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py
+++ b/datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py
@@ -48,8 +48,11 @@
java_import(gateway.jvm, "org.apache.spark.SparkConf")
java_import(gateway.jvm, "org.apache.spark.api.java.*")
java_import(gateway.jvm, "org.apache.spark.api.python.*")
+ java_import(gateway.jvm, "org.apache.spark.ml.python.*")
java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
+ java_import(gateway.jvm, "org.apache.spark.resource.*")
java_import(gateway.jvm, "org.apache.spark.sql.*")
+ java_import(gateway.jvm, "org.apache.spark.sql.api.python.*")
java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
intp = gateway.entry_point
diff --git a/datafu-spark/src/main/scala/spark/utils/overwrites/SparkOverwriteUDAFs.scala b/datafu-spark/src/main/scala/spark/utils/overwrites/SparkOverwriteUDAFs.scala
index 7c3ffbb..d57a6ef 100644
--- a/datafu-spark/src/main/scala/spark/utils/overwrites/SparkOverwriteUDAFs.scala
+++ b/datafu-spark/src/main/scala/spark/utils/overwrites/SparkOverwriteUDAFs.scala
@@ -30,6 +30,12 @@
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
+/*
+In order to support Spark 3.1.x as well as Spark 3.2.0 and up, the methods withNewChildrenInternal and
+ withNewChildInternal are added, but without the override keyword.
+
+ Idea taken from https://github.com/apache/sedona/pull/557
+ */
object SparkOverwriteUDAFs {
def minValueByKey(key: Column, value: Column): Column =
Column(MinValueByKey(key.expr, value.expr).toAggregateExpression(false))
@@ -40,9 +46,21 @@
}
case class MinValueByKey(child1: Expression, child2: Expression)
- extends ExtramumValueByKey(child1, child2, LessThan)
+ extends ExtramumValueByKey(child1, child2, LessThan) {
+
+ //override
+ def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): MinValueByKey = {
+ copy(child1=newChildren.head, child2=newChildren.tail.head)
+ }
+}
case class MaxValueByKey(child1: Expression, child2: Expression)
- extends ExtramumValueByKey(child1, child2, GreaterThan)
+ extends ExtramumValueByKey(child1, child2, GreaterThan) {
+
+ //override
+ def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): MaxValueByKey = {
+ copy(child1=newChildren.head, child2=newChildren.tail.head)
+ }
+}
abstract class ExtramumValueByKey(
child1: Expression,
@@ -124,6 +142,11 @@
override def prettyName: String = "collect_limited_list"
override def eval(buffer: ArrayBuffer[Any]): Any = new GenericArrayData(buffer.toArray)
+
+ // override
+ def withNewChildInternal(newChild: Expression): Expression = {
+ copy(child = newChild)
+ }
}
/** *
diff --git a/datafu-spark/src/test/scala/datafu/spark/PySparkLibTestResources.scala b/datafu-spark/src/test/scala/datafu/spark/PySparkLibTestResources.scala
index 84046bf..e0ef993 100644
--- a/datafu-spark/src/test/scala/datafu/spark/PySparkLibTestResources.scala
+++ b/datafu-spark/src/test/scala/datafu/spark/PySparkLibTestResources.scala
@@ -27,7 +27,8 @@
object PathsResolver {
val sparkSystemVersion = System.getProperty("datafu.spark.version")
-
+
+ // take the py4j version from here: https://github.com/apache/spark/blob/master/dev/requirements.txt
val py4js = Map(
"3.0.0" -> "0.10.9.7",
"3.0.1" -> "0.10.9.7",
@@ -35,7 +36,7 @@
"3.1.3" -> "0.10.9.7"
)
- val sparkVersion = if (sparkSystemVersion == null) "3.0.0" else sparkSystemVersion
+ val sparkVersion = if (sparkSystemVersion == null) "3.1.3" else sparkSystemVersion
val py4jVersion = py4js.getOrElse(sparkVersion, "0.10.9.7") // our default
diff --git a/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala b/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala
index 848c28e..75fa8da 100644
--- a/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala
+++ b/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala
@@ -61,7 +61,7 @@
sqlContext.createDataFrame(sc.parallelize(Seq(Row("b", 1), Row("a", 3))),
StructType(dedupSchema))
- assertDataFrameEquals(expected,
+ assertDataFrameNoOrderEquals(expected,
inputDataFrame
.dedupWithOrder($"col_grp", $"col_ord".desc)
.select($"col_grp", $"col_ord"))
@@ -82,7 +82,7 @@
$"col_ord",
moreAggFunctions = Seq(min($"col_str")))
- assertDataFrameEquals(expectedByIntDf, actual)
+ assertDataFrameNoOrderEquals(expectedByIntDf, actual)
}
case class dedupExp2(col_grp: String, col_ord: Int, col_str: String)
@@ -95,7 +95,7 @@
List(dedupExp2("b", 1, "asd4"),
dedupExp2("a", 1, "asd1")))
- assertDataFrameEquals(expectedByStringDf, actual)
+ assertDataFrameNoOrderEquals(expectedByStringDf, actual)
}
test("dedup2_with_filter") {
@@ -146,7 +146,7 @@
List(dedupExp2("b", 1, "asd4"),
dedupExp2("a", 3, "asd3")))
- assertDataFrameEquals(expectedComplex, actual)
+ assertDataFrameNoOrderEquals(expectedComplex, actual)
}
test("test_dedup2_by_multi_column") {
@@ -197,23 +197,23 @@
val schema = df.drop("map_col_blah").schema
val actual = df.dedupWithCombiner($"col_grp", expr("cast(concat('-',col_ord) as int)"))
- .drop("map_col_blah")
+ .drop("map_col_blah").orderBy("col_grp")
val expected: DataFrame = sqlContext.createDataFrame(
sqlContext.createDataFrame(
Seq(
- expComplex("b",
+ expComplex("a",
+ 1,
+ "asd1",
+ Array("a", "1"),
+ Inner("a", 1),
+ Map("a" -> 1)),
+ expComplex("b",
1,
"asd4",
Array("b", "1"),
Inner("b", 1),
Map("b" -> 1)),
- expComplex("a",
- 1,
- "asd1",
- Array("a", "1"),
- Inner("a", 1),
- Map("a" -> 1))
)).rdd, schema)
assertDataFrameEquals(expected, actual)
@@ -233,7 +233,7 @@
sc.parallelize(Seq(Row("b", 1), Row("a", 3), Row("a", 2))),
StructType(dedupTopNExpectedSchema))
- assertDataFrameEquals(expected, actual)
+ assertDataFrameNoOrderEquals(expected, actual)
}
val schema2 = List(
@@ -318,7 +318,7 @@
val actual = df.joinWithRangeAndDedup("col_ord", dfr, "start", "end")
- assertDataFrameEquals(expected, actual)
+ assertDataFrameNoOrderEquals(expected, actual)
}
test("randomJoinSkewedTests") {
diff --git a/gradle.properties b/gradle.properties
index 9a25d6c..2b20997 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -16,9 +16,9 @@
# under the License.
group=org.apache.datafu
-version=2.0.0
+version=2.1.0
gradleVersion=5.6.4
-sparkCompatVersion=3.1
-sparkVersion=3.1.3
+sparkCompatVersion=3.2
+sparkVersion=3.2.4
hadoopVersion=2.7.0
release=false