DATAFU-175 Support Spark 3.2.0 (#43)

* DATAFU-176 Support Spark 3.2.x
* Make build continue to work for Spark 3.0 and 3.1 too
diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml
index cc9f9b3..ea2c700 100644
--- a/.github/workflows/tests.yml
+++ b/.github/workflows/tests.yml
@@ -20,10 +20,10 @@
 
     steps:
     - name: Checkout repository
-      uses: actions/checkout@v3
+      uses: actions/checkout@v4
 
     - name: Use Java 8
-      uses: actions/setup-java@v2
+      uses: actions/setup-java@v4
       with:
          java-version: '8'
          distribution: 'adopt'
@@ -31,7 +31,7 @@
 
 
     - name: Set up Python      
-      uses: actions/setup-python@v4
+      uses: actions/setup-python@v5
       with:
          python-version: '3.7'
 
diff --git a/datafu-spark/README.md b/datafu-spark/README.md
index 07f48b1..f670067 100644
--- a/datafu-spark/README.md
+++ b/datafu-spark/README.md
@@ -11,7 +11,7 @@
 | 1.7.0 | 2.2.0 to 2.2.2, 2.3.0 to 2.3.2 and 2.4.0 to 2.4.3|
 | 1.8.0 | 2.2.3, 2.3.3, and 2.4.4 to 2.4.5|
 | 2.0.0 | 3.0.x - 3.1.x |
-| 2.1.0 (not released yet) | 3.2.x and up |
+| 2.1.0 (not released yet) | 3.0.x - 3.2.x |
 
 # Examples
 
diff --git a/datafu-spark/build.gradle b/datafu-spark/build.gradle
index a1202d3..e7011dd 100644
--- a/datafu-spark/build.gradle
+++ b/datafu-spark/build.gradle
@@ -70,26 +70,15 @@
     	testCompile "com.holdenkarau:spark-testing-base_" + scalaVersion + ":3.0.2_" + sparkTestingBaseVersion
     } else if (sparkVersion == "3.1.3") {
     	testCompile "com.holdenkarau:spark-testing-base_" + scalaVersion + ":3.1.2_" + sparkTestingBaseVersion
-    } else if (sparkVersion == "3.2.4") {
+    } else if (sparkVersion > "3.2" && sparkVersion < "3.3") {
     	testCompile "com.holdenkarau:spark-testing-base_" + scalaVersion + ":3.2.1_" + sparkTestingBaseVersion
+    } else if (sparkVersion > "3.3" && sparkVersion < "3.4") {
+    	testCompile "com.holdenkarau:spark-testing-base_" + scalaVersion + ":3.3.0_" + sparkTestingBaseVersion
     } else {
 	testCompile "com.holdenkarau:spark-testing-base_" + scalaVersion + ":" + sparkVersion + "_" + sparkTestingBaseVersion
     }
 }
 
-// we need to set up the build for hadoop 3
-if (hadoopVersion.startsWith("2.")) {
-  dependencies {
-    testRuntime "org.apache.hadoop:hadoop-common:$hadoopVersion"
-    testRuntime "org.apache.hadoop:hadoop-hdfs:$hadoopVersion"
-    testRuntime "org.apache.hadoop:hadoop-mapreduce-client-jobclient:$hadoopVersion"
-  }
-} else {
-  dependencies {
-    testRuntime "org.apache.hadoop:hadoop-core:$hadoopVersion"
-  }
-}
-
 project.ext.sparkFile = file("build/spark-zips/spark-" + sparkVersion + ".zip")
 project.ext.sparkUnzipped = "build/spark-unzipped/spark-" + sparkVersion
 
diff --git a/datafu-spark/build_and_test_spark.sh b/datafu-spark/build_and_test_spark.sh
index 3554278..5add676 100755
--- a/datafu-spark/build_and_test_spark.sh
+++ b/datafu-spark/build_and_test_spark.sh
@@ -17,8 +17,9 @@
 
 #!/bin/bash
 
-export SPARK_VERSIONS_FOR_SCALA_212="3.0.0 3.0.1 3.0.2 3.0.3 3.1.1 3.1.2 3.1.3"
-export LATEST_SPARK_VERSIONS_FOR_SCALA_212="3.0.3 3.1.3"
+export SPARK_VERSIONS_FOR_SCALA_212="3.0.1 3.0.2 3.0.3 3.1.1 3.1.2 3.1.3 3.2.0 3.2.1 3.2.2 3.2.3 3.2.4"
+export LATEST_SPARK_VERSIONS_FOR_SCALA_212="3.0.3 3.1.3 3.2.4"
+
 
 STARTTIME=$(date +%s)
 
diff --git a/datafu-spark/gradle/dependency-versions-scala-2.12.gradle b/datafu-spark/gradle/dependency-versions-scala-2.12.gradle
index e83d5ef..51ff58d 100644
--- a/datafu-spark/gradle/dependency-versions-scala-2.12.gradle
+++ b/datafu-spark/gradle/dependency-versions-scala-2.12.gradle
@@ -17,7 +17,7 @@
  * under the License.
  */
 ext {
-  scalaLibraryVersion = "2.12.10"
+  scalaLibraryVersion = "2.12.15"
   // Extra options for the compiler:
   // -feature: Give detailed warnings about language feature use (rather than just 'there were 4 warnings')
   scalaOptions = "-feature"
diff --git a/datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py b/datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py
index 165e43f..00bcd90 100644
--- a/datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py
+++ b/datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py
@@ -48,8 +48,11 @@
         java_import(gateway.jvm, "org.apache.spark.SparkConf")
         java_import(gateway.jvm, "org.apache.spark.api.java.*")
         java_import(gateway.jvm, "org.apache.spark.api.python.*")
+        java_import(gateway.jvm, "org.apache.spark.ml.python.*")
         java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
+        java_import(gateway.jvm, "org.apache.spark.resource.*")
         java_import(gateway.jvm, "org.apache.spark.sql.*")
+        java_import(gateway.jvm, "org.apache.spark.sql.api.python.*")
         java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
 
         intp = gateway.entry_point
diff --git a/datafu-spark/src/main/scala/spark/utils/overwrites/SparkOverwriteUDAFs.scala b/datafu-spark/src/main/scala/spark/utils/overwrites/SparkOverwriteUDAFs.scala
index 7c3ffbb..d57a6ef 100644
--- a/datafu-spark/src/main/scala/spark/utils/overwrites/SparkOverwriteUDAFs.scala
+++ b/datafu-spark/src/main/scala/spark/utils/overwrites/SparkOverwriteUDAFs.scala
@@ -30,6 +30,12 @@
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
+/*
+In order to support Spark 3.1.x as well as Spark 3.2.0 and up, the methods withNewChildrenInternal and
+ withNewChildInternal are added, but without the override keyword.
+
+ Idea taken from https://github.com/apache/sedona/pull/557
+ */
 object SparkOverwriteUDAFs {
   def minValueByKey(key: Column, value: Column): Column =
     Column(MinValueByKey(key.expr, value.expr).toAggregateExpression(false))
@@ -40,9 +46,21 @@
 }
 
 case class MinValueByKey(child1: Expression, child2: Expression)
-    extends ExtramumValueByKey(child1, child2, LessThan)
+    extends ExtramumValueByKey(child1, child2, LessThan) {
+
+  //override
+  def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): MinValueByKey = {
+    copy(child1=newChildren.head, child2=newChildren.tail.head)
+  }
+}
 case class MaxValueByKey(child1: Expression, child2: Expression)
-    extends ExtramumValueByKey(child1, child2, GreaterThan)
+    extends ExtramumValueByKey(child1, child2, GreaterThan) {
+
+  //override
+  def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): MaxValueByKey = {
+    copy(child1=newChildren.head, child2=newChildren.tail.head)
+  }
+}
 
 abstract class ExtramumValueByKey(
     child1: Expression,
@@ -124,6 +142,11 @@
   override def prettyName: String = "collect_limited_list"
 
   override def eval(buffer: ArrayBuffer[Any]): Any = new GenericArrayData(buffer.toArray)
+  
+  // override
+  def withNewChildInternal(newChild: Expression): Expression = {
+    copy(child = newChild)
+  }
 }
 
 /** *
diff --git a/datafu-spark/src/test/scala/datafu/spark/PySparkLibTestResources.scala b/datafu-spark/src/test/scala/datafu/spark/PySparkLibTestResources.scala
index 84046bf..e0ef993 100644
--- a/datafu-spark/src/test/scala/datafu/spark/PySparkLibTestResources.scala
+++ b/datafu-spark/src/test/scala/datafu/spark/PySparkLibTestResources.scala
@@ -27,7 +27,8 @@
 object PathsResolver {
   
   val sparkSystemVersion = System.getProperty("datafu.spark.version")
-  
+
+  // take the py4j version from here: https://github.com/apache/spark/blob/master/dev/requirements.txt
   val py4js = Map(
       "3.0.0" -> "0.10.9.7",
       "3.0.1" -> "0.10.9.7",
@@ -35,7 +36,7 @@
       "3.1.3" -> "0.10.9.7"
   )
 
-  val sparkVersion = if (sparkSystemVersion == null) "3.0.0" else sparkSystemVersion
+  val sparkVersion = if (sparkSystemVersion == null) "3.1.3" else sparkSystemVersion
   
   val py4jVersion = py4js.getOrElse(sparkVersion, "0.10.9.7") // our default
   
diff --git a/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala b/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala
index 848c28e..75fa8da 100644
--- a/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala
+++ b/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala
@@ -61,7 +61,7 @@
       sqlContext.createDataFrame(sc.parallelize(Seq(Row("b", 1), Row("a", 3))),
                                  StructType(dedupSchema))
 
-    assertDataFrameEquals(expected,
+    assertDataFrameNoOrderEquals(expected,
                           inputDataFrame
                             .dedupWithOrder($"col_grp", $"col_ord".desc)
                             .select($"col_grp", $"col_ord"))
@@ -82,7 +82,7 @@
                                        $"col_ord",
                                        moreAggFunctions = Seq(min($"col_str")))
 
-    assertDataFrameEquals(expectedByIntDf, actual)
+    assertDataFrameNoOrderEquals(expectedByIntDf, actual)
   }
 
   case class dedupExp2(col_grp: String, col_ord: Int, col_str: String)
@@ -95,7 +95,7 @@
       List(dedupExp2("b", 1, "asd4"),
         dedupExp2("a", 1, "asd1")))
 
-    assertDataFrameEquals(expectedByStringDf, actual)
+    assertDataFrameNoOrderEquals(expectedByStringDf, actual)
   }
 
   test("dedup2_with_filter") {
@@ -146,7 +146,7 @@
       List(dedupExp2("b", 1, "asd4"),
         dedupExp2("a", 3, "asd3")))
 
-    assertDataFrameEquals(expectedComplex, actual)
+    assertDataFrameNoOrderEquals(expectedComplex, actual)
   }
 
   test("test_dedup2_by_multi_column") {
@@ -197,23 +197,23 @@
     val schema = df.drop("map_col_blah").schema
 
     val actual = df.dedupWithCombiner($"col_grp", expr("cast(concat('-',col_ord) as int)"))
-      .drop("map_col_blah")
+      .drop("map_col_blah").orderBy("col_grp")
 
     val expected: DataFrame = sqlContext.createDataFrame(
       sqlContext.createDataFrame(
       Seq(
-        expComplex("b",
+         expComplex("a",
+                   1,
+                   "asd1",
+                   Array("a", "1"),
+                   Inner("a", 1),
+                   Map("a" -> 1)),
+         expComplex("b",
                    1,
                    "asd4",
                    Array("b", "1"),
                    Inner("b", 1),
                    Map("b" -> 1)),
-        expComplex("a",
-                   1,
-                   "asd1",
-                   Array("a", "1"),
-                   Inner("a", 1),
-                   Map("a" -> 1))
       )).rdd, schema)
 
     assertDataFrameEquals(expected, actual)
@@ -233,7 +233,7 @@
       sc.parallelize(Seq(Row("b", 1), Row("a", 3), Row("a", 2))),
       StructType(dedupTopNExpectedSchema))
 
-    assertDataFrameEquals(expected, actual)
+    assertDataFrameNoOrderEquals(expected, actual)
   }
 
   val schema2 = List(
@@ -318,7 +318,7 @@
 
     val actual = df.joinWithRangeAndDedup("col_ord", dfr, "start", "end")
 
-    assertDataFrameEquals(expected, actual)
+    assertDataFrameNoOrderEquals(expected, actual)
   }
 
   test("randomJoinSkewedTests") {
diff --git a/gradle.properties b/gradle.properties
index 9a25d6c..2b20997 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -16,9 +16,9 @@
 # under the License.
 
 group=org.apache.datafu
-version=2.0.0
+version=2.1.0
 gradleVersion=5.6.4
-sparkCompatVersion=3.1
-sparkVersion=3.1.3
+sparkCompatVersion=3.2
+sparkVersion=3.2.4
 hadoopVersion=2.7.0
 release=false