Merge pull request #429 from damik3/main
Multicontext features
diff --git a/wayang-api/wayang-api-scala-java/README.md b/wayang-api/wayang-api-scala-java/README.md
new file mode 100644
index 0000000..085741b
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/README.md
@@ -0,0 +1,364 @@
+# Wayang scala-java API
+
+Wayang provides two options for performing multiple asynchronous tasks - `async` API and `multicontext` API.
+
+The above examples can be found in the `async.apps` package and the `multicontext.apps` package in wayang-benchmark.
+
+## `async` API
+Our example shows how to utilize the `async` API with an example program containing several asynchronous jobs:
+
+```scala
+import org.apache.wayang.api.async.DataQuantaImplicits._
+import org.apache.wayang.api.async.PlanBuilderImplicits._
+import org.apache.wayang.api.{MultiContext, DataQuanta, PlanBuilder}
+import org.apache.wayang.java.Java
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+object Example {
+
+ def main(args: Array[String]): Unit = {
+ val planBuilder1 = new PlanBuilder(new MultiContext().withPlugin(Java.basicPlugin())).withUdfJarsOf(this.getClass)
+ val planBuilder2 = new PlanBuilder(new MultiContext().withPlugin(Java.basicPlugin())).withUdfJarsOf(this.getClass)
+ val planBuilder3 = new PlanBuilder(new MultiContext().withPlugin(Java.basicPlugin())).withUdfJarsOf(this.getClass)
+
+ // Async job 1
+ val result1 = planBuilder1
+ .loadCollection(List(1, 2, 3, 4, 5))
+ .map(_ * 1)
+ .runAsync(tempFileOut = "file:///tmp/out1.temp")
+
+ // Async job 2
+ val result2 = planBuilder2
+ .loadCollection(List(6, 7, 8, 9, 10))
+ .filter(_ <= 8)
+ .runAsync(tempFileOut = "file:///tmp/out2.temp")
+
+ // Async job 3 which merges 1 and 2
+ val dq1: DataQuanta[Int] = planBuilder1.loadAsync(result1)
+ val dq2: DataQuanta[Int] = planBuilder1.loadAsync(result2)
+ val result3 = dq1.union(dq2)
+ .map(_ * 3)
+ .filter(_ < 100)
+ .runAsync(tempFileOut = "file:///tmp/out3.temp", result1, result2)
+
+ // Async job 4 which runs independently from 1, 2 and 3
+ val result4 = planBuilder3
+ .loadCollection(List(1, 2, 3, 4, 5))
+ .filter(_ >= 2)
+ .runAsync(tempFileOut = "file:///tmp/out4.temp")
+
+ // Async job 5 which merges 3 and 4
+ val dq3: DataQuanta[Int] = planBuilder1.loadAsync(result3)
+ val dq4: DataQuanta[Int] = planBuilder1.loadAsync(result4)
+ val result5 = dq3.intersect(dq4)
+ .map(_ * 4)
+ .writeTextFileAsync(url = "file:///tmp/out5.final", result3, result4)
+
+ Await.result(result5, Duration.Inf)
+ }
+
+}
+```
+
+**Key Points:**
+- Jobs 1, 2, and 4 are executed concurrently as there is no dependency.
+
+
+- The output of these jobs is written to the provided path (`.runAsync(tempFileOut = "file:///tmp/out1.temp")`).
+
+
+- Job 3 waits for jobs 1 and 2 to finish before starting (`.runAsync(tempFileOut = "file:///tmp/out3.temp", result1, result2)`). It reads the results from the aforementioned paths, unites them, and processes further.
+
+
+- Job 5 awaits jobs 3 and 4's completion to begin. This is indicated by `.writeTextFileAsync("file:///tmp/out5.final", result3, result4)`. Instead of using `runAsync`, we use `writeTextFileAsync` to finish the execution.
+
+
+## `multicontext` API
+
+The examples below demonstrate the capabilities of the multi context api.
+
+### Basic usage
+
+```scala
+import org.apache.wayang.api.{MultiContext, MultiContextPlanBuilder}
+import org.apache.wayang.core.api.Configuration
+import org.apache.wayang.java.Java
+import org.apache.wayang.spark.Spark
+
+class WordCount {}
+
+object WordCount {
+
+ def main(args: Array[String]): Unit = {
+ println("WordCount")
+ println("Scala version:")
+ println(scala.util.Properties.versionString)
+
+ val configuration1 = new Configuration()
+ val configuration2 = new Configuration()
+
+ val context1 = new MultiContext(configuration1)
+ .withPlugin(Java.basicPlugin())
+ .withTextFileSink("file:///tmp/out11")
+ val context2 = new MultiContext(configuration2)
+ .withPlugin(Spark.basicPlugin())
+ .withTextFileSink("file:///tmp/out12")
+
+ val multiContextPlanBuilder = new MultiContextPlanBuilder(List(context1, context2))
+ .withUdfJarsOf(this.getClass)
+
+ // Generate some test data
+ val inputValues = Array("Big data is big.", "Is data big data?")
+
+ // Build and execute a word count
+ multiContextPlanBuilder.forEach(_
+ .loadCollection(inputValues)
+ .flatMap(_.split("\\s+"))
+ .map(_.replaceAll("\\W+", "").toLowerCase)
+ .map((_, 1))
+ .reduceByKey(_._1, (a, b) => (a._1, a._2 + b._2))
+ ).execute()
+
+ }
+}
+```
+
+The above program executes the same plan concurrently for two contexts, `context1` which runs on Java and writes to text file `file:///tmp/out11`, and `context2` which runs on Spark (and can be configured through `configuration2`) and writes to `file:///tmp/out12`.
+
+The same job can also be written like this:
+
+```scala
+multiContextPlanBuilder
+ .forEach(_.loadCollection(inputValues))
+ .forEach(_.flatMap(_.split("\\s+")))
+ .forEach(_.map(_.replaceAll("\\W+", "").toLowerCase))
+ .forEach(_.map((_, 1)))
+ .forEach(_.reduceByKey(_._1, (a, b) => (a._1, a._2 + b._2)))
+ .execute()
+```
+
+### Target platforms
+
+```scala
+import org.apache.wayang.api.{MultiContext, MultiContextPlanBuilder}
+import org.apache.wayang.java.Java
+import org.apache.wayang.spark.Spark
+
+object WordCountWithTargetPlatforms {
+
+ def main(args: Array[String]): Unit = {
+ val configuration1 = new Configuration()
+ val configuration2 = new Configuration()
+
+ val context1 = new MultiContext(configuration1)
+ .withPlugin(Java.basicPlugin())
+ .withPlugin(Spark.basicPlugin())
+ .withTextFileSink("file:///tmp/out11")
+ val context2 = new MultiContext(configuration2)
+ .withPlugin(Java.basicPlugin())
+ .withPlugin(Spark.basicPlugin())
+ .withTextFileSink("file:///tmp/out12")
+
+ val multiContextPlanBuilder = new MultiContextPlanBuilder(List(context1, context2))
+ .withUdfJarsOf(this.getClass)
+
+ // Generate some test data
+ val inputValues1 = Array("Big data is big.", "Is data big data?")
+ val inputValues2 = Array("Big big data is big big.", "Is data big data big?")
+
+ multiContextPlanBuilder
+ .loadCollection(context1, inputValues1)
+ .loadCollection(context2, inputValues2)
+
+ .forEach(_.flatMap(_.split("\\s+")))
+ .withTargetPlatforms(context1, Spark.platform())
+ .withTargetPlatforms(context2, Java.platform())
+
+ .forEach(_.map(_.replaceAll("\\W+", "").toLowerCase))
+ .withTargetPlatforms(Java.platform())
+
+ .forEach(_.map((_, 1)))
+
+ .forEach(_.reduceByKey(_._1, (a, b) => (a._1, a._2 + b._2)))
+ .withTargetPlatforms(context1, Spark.platform())
+ .execute()
+ }
+}
+```
+
+Here we add the ability to execute an operation on different platforms for each context.
+
+In the snippet below
+
+```scala
+ .forEach(_.flatMap(_.split("\\s+")))
+ .withTargetPlatforms(context1, Spark.platform())
+ .withTargetPlatforms(context2, Java.platform())
+```
+
+the `flatMap` operator will be executed on Spark for `context1` and on Java for `context2`.
+
+In the snippet below
+
+```scala
+ .forEach(_.map(_.replaceAll("\\W+", "").toLowerCase))
+ .withTargetPlatforms(Java.platform())
+```
+
+the `map` operator gets executed on the Java platform for all contexts, since none is specifically stated.
+
+In the snippet below
+
+```scala
+ .forEach(_.reduceByKey(_._1, (a, b) => (a._1, a._2 + b._2)))
+ .withTargetPlatforms(context1, Spark.platform())
+```
+
+the `reduceByKey` operator will be executed on spark for `context1` and on the default platform (decided by the optimzer) for `context2.
+
+### Merge
+
+```scala
+import org.apache.wayang.api.{MultiContext, MultiContextPlanBuilder}
+import org.apache.wayang.core.api.{Configuration, WayangContext}
+import org.apache.wayang.java.Java
+import org.apache.wayang.spark.Spark
+
+object WordCountWithMerge {
+
+ def main(args: Array[String]): Unit = {
+ val configuration1 = new Configuration()
+ val configuration2 = new Configuration()
+
+ val context1 = new MultiContext(configuration1)
+ .withPlugin(Java.basicPlugin())
+ .withMergeFileSink("file:///tmp/out11") // The mergeContext will read the output of context 1 from here
+ val context2 = new MultiContext(configuration2)
+ .withPlugin(Java.basicPlugin())
+ .withMergeFileSink("file:///tmp/out12") // The mergeContext will read the output of context 2 from here
+
+ val multiContextPlanBuilder = new MultiContextPlanBuilder(List(context1, context2))
+ .withUdfJarsOf(this.getClass)
+
+ // To be used after merging the previous two
+ val mergeContext = new WayangContext(new Configuration())
+ .withPlugin(Java.basicPlugin())
+
+ // Generate some test data
+ val inputValues1 = Array("Big data is big.", "Is data big data?")
+ val inputValues2 = Array("Big big data is big big.", "Is data big data big?")
+
+ // Build and execute a word count in 2 different contexts
+ multiContextPlanBuilder
+ .loadCollection(context1, inputValues1)
+ .loadCollection(context2, inputValues2)
+ .forEach(_.flatMap(_.split("\\s+")))
+ .forEach(_.map(_.replaceAll("\\W+", "").toLowerCase))
+ .forEach(_.map((_, 1)))
+ .forEach(_.reduceByKey(_._1, (a, b) => (a._1, a._2 + b._2)))
+
+ // Merge contexts with union operator
+ .mergeUnion(mergeContext)
+
+ // Continue processing merged DataQuanta
+ .filter(_._2 >= 3)
+ .reduceByKey(_._1, (t1, t2) => (t1._1, t1._2 + t2._2))
+
+ // Write out
+ // Writes:
+ // (big,9)
+ // (data,6)
+ .writeTextFile("file:///tmp/out1.merged", s => s.toString())
+
+ }
+
+}
+```
+
+Here the two contexts start executing concurrently as in the above examples, but they get merged here
+
+```scala
+ .mergeUnion(mergeContext)
+```
+
+The merging is happening at the merge context which can be one of the above or a completely new one.
+
+Note that the merge context reads the data to merge from the paths specified for each context here
+
+```scala
+ .withMergeFileSink("file:///tmp/out11") // The mergeContext will read the output of context 1 from here
+ ...
+ .withMergeFileSink("file:///tmp/out12") // The mergeContext will read the output of context 2 from here
+ ...
+```
+
+The rest of the execution happens at the merge context and is a now single job.
+
+### Combine each
+
+```scala
+import org.apache.wayang.api.{MultiContext, DataQuanta, MultiContextPlanBuilder}
+import org.apache.wayang.java.Java
+
+object WordCountCombineEach {
+
+ def main(args: Array[String]): Unit = {
+ val configuration1 = new Configuration()
+ val configuration2 = new Configuration()
+
+ val context1 = new MultiContext(configuration1)
+ .withPlugin(Java.basicPlugin())
+ .withTextFileSink("file:///tmp/out11")
+ val context2 = new MultiContext(configuration2)
+ .withPlugin(Java.basicPlugin())
+ .withTextFileSink("file:///tmp/out12")
+
+ val multiContextPlanBuilder = new MultiContextPlanBuilder(List(context1, context2))
+ .withUdfJarsOf(this.getClass)
+
+ // Generate some test data
+ val inputValues = Array("Big data is big.", "Is data big data?")
+
+ // Build and execute a word count
+ val dq1 = multiContextPlanBuilder
+ .forEach(_.loadCollection(inputValues))
+ .forEach(_.flatMap(_.split("\\s+")))
+ .forEach(_.map(_.replaceAll("\\W+", "").toLowerCase))
+ .forEach(_.map((_, 1)))
+ .forEach(_.reduceByKey(_._1, (a, b) => (a._1, a._2 + b._2)))
+
+ val dq2 = multiContextPlanBuilder
+ .forEach(_.loadCollection(inputValues))
+ .forEach(_.flatMap(_.split("\\s+")))
+ .forEach(_.map(_.replaceAll("\\W+", "").toLowerCase))
+ .forEach(_.map((_, 1)))
+ .forEach(_.reduceByKey(_._1, (a, _) => (a._1, 100)))
+
+ dq1.combineEach(dq2, (dq1: DataQuanta[(String, Int)], dq2: DataQuanta[(String, Int)]) => dq1.union(dq2))
+ .forEach(_.map(t => (t._1 + " wayang out", t._2)))
+ .execute()
+ }
+
+}
+```
+
+Here we add the capability of performing a binary operator on each context.
+
+With this line
+
+```scala
+ dq1.combineEach(dq2, (dq1: DataQuanta[(String, Int)], dq2: DataQuanta[(String, Int)]) => dq1.union(dq2))
+```
+
+the `dq1.union(dq2)` operation is happening for each context.
+
+The rest of the execution
+
+```scala
+ .forEach(_.map(t => (t._1 + " wayang out", t._2)))
+ .execute()
+```
+
+continues as a multi context job.
\ No newline at end of file
diff --git a/wayang-api/wayang-api-scala-java/pom.xml b/wayang-api/wayang-api-scala-java/pom.xml
index 750f3b0..bdee691 100644
--- a/wayang-api/wayang-api-scala-java/pom.xml
+++ b/wayang-api/wayang-api-scala-java/pom.xml
@@ -61,11 +61,6 @@
</dependency>
<dependency>
<groupId>org.apache.wayang</groupId>
- <artifactId>wayang-api-scala-java_2.11</artifactId>
- <version>0.7.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.wayang</groupId>
<artifactId>wayang-utils-profile-db</artifactId>
<version>0.7.1</version>
</dependency>
@@ -78,13 +73,16 @@
<groupId>org.apache.wayang</groupId>
<artifactId>wayang-java</artifactId>
<version>0.7.1</version>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.wayang</groupId>
<artifactId>wayang-sqlite3</artifactId>
<version>0.7.1</version>
- <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.wayang</groupId>
+ <artifactId>wayang-postgres</artifactId>
+ <version>0.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
@@ -100,7 +98,6 @@
<groupId>org.apache.wayang</groupId>
<artifactId>wayang-spark</artifactId>
<version>0.7.1</version>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
@@ -130,5 +127,13 @@
<artifactId>netty-all</artifactId>
<version>4.1.45.Final</version>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.module</groupId>
+ <artifactId>jackson-module-scala_2.12</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala
index 4582269..c5ff0e1 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala
@@ -28,7 +28,7 @@
import org.apache.commons.lang3.Validate
import org.apache.wayang.basic.function.ProjectionDescriptor
import org.apache.wayang.basic.operators._
-import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBinaryOperator, SerializableFunction, SerializablePredicate}
+import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBinaryOperator, SerializableFunction, SerializableIntUnaryOperator, SerializablePredicate}
import org.apache.wayang.core.function._
import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator
@@ -239,7 +239,7 @@
seed: Option[Long] = None,
sampleMethod: SampleOperator.Methods = SampleOperator.Methods.ANY): DataQuanta[Out] =
this.sampleDynamicJava(
- new IntUnaryOperator {
+ new SerializableIntUnaryOperator {
override def applyAsInt(operand: Int): Int = sampleSizeFunction(operand)
},
datasetSize,
@@ -256,7 +256,7 @@
* @param sampleMethod the [[SampleOperator.Methods]] to use for sampling
* @return a new instance representing the [[FlatMapOperator]]'s output
*/
- def sampleDynamicJava(sampleSizeFunction: IntUnaryOperator,
+ def sampleDynamicJava(sampleSizeFunction: SerializableIntUnaryOperator,
datasetSize: Long = SampleOperator.UNKNOWN_DATASET_SIZE,
seed: Option[Long] = None,
sampleMethod: SampleOperator.Methods = SampleOperator.Methods.ANY): DataQuanta[Out] = {
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala
index ece3a50..6c55dd0 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala
@@ -29,7 +29,7 @@
import org.apache.wayang.basic.data.{Record, Tuple2 => RT2}
import org.apache.wayang.basic.operators.{GlobalReduceOperator, LocalCallbackSink, MapOperator, SampleOperator}
import org.apache.wayang.commons.util.profiledb.model.Experiment
-import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBiFunction, SerializableBinaryOperator, SerializableFunction, SerializablePredicate}
+import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBiFunction, SerializableBinaryOperator, SerializableFunction, SerializableIntUnaryOperator, SerializablePredicate}
import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator
import org.apache.wayang.core.optimizer.costs.{LoadEstimator, LoadProfile, LoadProfileEstimator}
@@ -187,7 +187,7 @@
* @param sampleSize the absolute size of the sample
* @return a [[SampleDataQuantaBuilder]]
*/
- def sample(sampleSize: Int): SampleDataQuantaBuilder[Out] = this.sample(new IntUnaryOperator {
+ def sample(sampleSize: Int): SampleDataQuantaBuilder[Out] = this.sample(new SerializableIntUnaryOperator {
override def applyAsInt(operand: Int): Int = sampleSize
})
@@ -198,7 +198,7 @@
* @param sampleSizeFunction the absolute size of the sample as a function of the current iteration number
* @return a [[SampleDataQuantaBuilder]]
*/
- def sample(sampleSizeFunction: IntUnaryOperator) = new SampleDataQuantaBuilder[Out](this, sampleSizeFunction)
+ def sample(sampleSizeFunction: SerializableIntUnaryOperator) = new SampleDataQuantaBuilder[Out](this, sampleSizeFunction)
/**
* Annotates a key to this instance.
@@ -936,7 +936,7 @@
* @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
* @param sampleSizeFunction the absolute size of the sample as a function of the current iteration number
*/
-class SampleDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T], sampleSizeFunction: IntUnaryOperator)
+class SampleDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T], sampleSizeFunction: SerializableIntUnaryOperator)
(implicit javaPlanBuilder: JavaPlanBuilder)
extends BasicDataQuantaBuilder[SampleDataQuantaBuilder[T], T] {
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/MultiContext.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/MultiContext.scala
new file mode 100644
index 0000000..1f1697b
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/MultiContext.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api
+
+import org.apache.wayang.core.api.{Configuration, WayangContext}
+import org.apache.wayang.core.plugin.Plugin
+
+class MultiContext(configuration: Configuration) extends WayangContext(configuration) {
+
+ val id: Long = MultiContext.nextId()
+
+ private var sink: Option[MultiContext.UnarySink] = None
+ private var plugins: List[String] = List()
+
+ def this() = {
+ this(new Configuration())
+ }
+
+ override def withPlugin(plugin: Plugin): MultiContext = {
+ this.plugins = this.plugins :+ plugin.getClass.getName
+ super.withPlugin(plugin)
+ this
+ }
+
+ def withTextFileSink(url: String): MultiContext = {
+ this.sink = Some(MultiContext.TextFileSink(url))
+ this
+ }
+
+ def withObjectFileSink(url: String): MultiContext = {
+ this.sink = Some(MultiContext.ObjectFileSink(url))
+ this
+ }
+
+ def withMergeFileSink(url: String): MultiContext = {
+ this.sink = Some(MultiContext.MergeFileSink(url))
+ this
+ }
+
+ def getSink: Option[MultiContext.UnarySink] = sink
+ def getPlugins: List[String] = plugins
+}
+
+object MultiContext {
+
+ private var lastId = 0L
+
+ private def nextId(): Long = {
+ lastId += 1
+ lastId
+ }
+
+ private[api] trait UnarySink
+ private[api] case class TextFileSink(url: String) extends UnarySink
+ private[api] case class ObjectFileSink(url: String) extends UnarySink
+ private[api] case class MergeFileSink(url: String) extends UnarySink
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/MultiContextDataQuanta.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/MultiContextDataQuanta.scala
new file mode 100644
index 0000000..3826b2a
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/MultiContextDataQuanta.scala
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.wayang.api
+
+import org.apache.wayang.api.async.DataQuantaImplicits._
+import org.apache.wayang.api.async.PlanBuilderImplicits._
+import org.apache.wayang.core.api.WayangContext
+import org.apache.wayang.core.api.exception.WayangException
+import org.apache.wayang.core.plan.wayangplan.Operator
+import org.apache.wayang.core.platform.Platform
+
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
+import scala.reflect.ClassTag
+
+class MultiContextDataQuanta[Out: ClassTag](private val dataQuantaMap: Map[Long, DataQuanta[Out]])(private val multiContextPlanBuilder: MultiContextPlanBuilder) {
+
+ /**
+ * Apply the specified function to each [[DataQuanta]].
+ *
+ * @tparam NewOut the type of elements in the resulting [[MultiContextDataQuanta]]
+ * @param f the function to apply to each element
+ * @return a new [[MultiContextDataQuanta]] with elements transformed by the function
+ */
+ def forEach[NewOut: ClassTag](f: DataQuanta[Out] => DataQuanta[NewOut]): MultiContextDataQuanta[NewOut] =
+ new MultiContextDataQuanta[NewOut](dataQuantaMap.mapValues(f))(this.multiContextPlanBuilder)
+
+
+ /**
+ * Merges this and `that` [[MultiContextDataQuanta]], by using the `f` function to merge the corresponding [[DataQuanta]].
+ * Returns a new [[MultiContextDataQuanta]] containing the results.
+ *
+ * @tparam ThatOut the type of data contained in `that` [[MultiContextDataQuanta]]
+ * @tparam NewOut the type of data contained in the resulting [[MultiContextDataQuanta]]
+ * @param that the [[MultiContextDataQuanta]] to apply the function on
+ * @param f the function to apply on the corresponding data quanta
+ * @return a new [[MultiContextDataQuanta]] containing the results of applying the function
+ */
+ def combineEach[ThatOut: ClassTag, NewOut: ClassTag](that: MultiContextDataQuanta[ThatOut],
+ f: (DataQuanta[Out], DataQuanta[ThatOut]) => DataQuanta[NewOut]): MultiContextDataQuanta[NewOut] =
+ new MultiContextDataQuanta[NewOut](this.dataQuantaMap.map { case (key, thisDataQuanta) =>
+ val thatDataQuanta = that.dataQuantaMap(key)
+ key -> f(thisDataQuanta, thatDataQuanta)
+ })(this.multiContextPlanBuilder)
+
+
+ /**
+ * Restrict the [[Operator]] of each [[MultiContext]] to run on certain [[Platform]]s.
+ *
+ * @param platforms on that the [[Operator]] may be executed
+ * @return this instance
+ */
+ def withTargetPlatforms(platforms: Platform*): MultiContextDataQuanta[Out] = {
+ new MultiContextDataQuanta[Out](this.dataQuantaMap.mapValues(_.withTargetPlatforms(platforms: _*)))(multiContextPlanBuilder)
+ }
+
+
+ /**
+ * Restrict the [[Operator]] of specified [[MultiContext]] to run on certain [[Platform]]s.
+ *
+ * @param multiContext the [[MultiContext]] to restrict
+ * @param platforms on that the [[Operator]] may be executed
+ * @return this instance
+ */
+ def withTargetPlatforms(multiContext: MultiContext, platforms: Platform*): MultiContextDataQuanta[Out] = {
+ val updatedDataQuanta = dataQuantaMap(multiContext.id).withTargetPlatforms(platforms: _*)
+ val updatedDataQuantaMap = dataQuantaMap.updated(multiContext.id, updatedDataQuanta)
+ new MultiContextDataQuanta[Out](updatedDataQuantaMap)(this.multiContextPlanBuilder)
+ }
+
+
+ /**
+ * Executes the plan asynchronously.
+ *
+ * @param timeout the maximum time to wait for the execution to complete.
+ * If not specified, the execution will block indefinitely.
+ */
+ def execute(timeout: Duration = Duration.Inf): Unit = {
+
+ val asyncResults = multiContextPlanBuilder.multiContexts.map(multiContext => {
+
+ // For each multiContext get its corresponding dataQuanta
+ val dataQuanta = dataQuantaMap(multiContext.id)
+
+ multiContext.getSink match {
+
+ // Execute plan asynchronously
+ case Some(textFileSink: MultiContext.TextFileSink) =>
+ dataQuanta.writeTextFileAsync(textFileSink.url)
+
+ // Execute plan asynchronously
+ case Some(objectFileSink: MultiContext.ObjectFileSink) =>
+ dataQuanta.writeObjectFileAsync(objectFileSink.url)
+
+ case None =>
+ throw new WayangException("All contexts must be attached to an output sink.")
+
+ case _ =>
+ throw new WayangException("Invalid sink.")
+ }
+ })
+
+ // Block indefinitely until all futures finish
+ val aggregateFuture: Future[List[Any]] = Future.sequence(asyncResults)
+ Await.result(aggregateFuture, timeout)
+
+ }
+
+
+ /**
+ * Merge the underlying [[DataQuanta]]s asynchronously using the [[DataQuanta.union]] operator.
+ *
+ * @param mergeContext The Wayang context for merging the [[DataQuanta]]s.
+ * @param timeout The maximum time to wait for all futures to finish. Default is [[Duration.Inf]] (indefinitely).
+ * @return A [[DataQuanta]] representing the result of merging and unioning each of the [[DataQuanta]].
+ * @throws WayangException if any of the contexts are not attached to a merge sink or if the sink is invalid.
+ */
+ def mergeUnion(mergeContext: WayangContext, timeout: Duration = Duration.Inf): DataQuanta[Out] = {
+
+ // Execute plans asynchronously
+ val asyncResults = multiContextPlanBuilder.multiContexts.map(multiContext => {
+
+ // For each multiContext get its corresponding dataQuanta
+ val dataQuanta = dataQuantaMap(multiContext.id)
+
+ // Get the sink of the multiContext (it should be a merge sink)
+ multiContext.getSink match {
+
+ // And execute plan asynchronously
+ case Some(mergeFileSink: MultiContext.MergeFileSink) =>
+ dataQuanta.runAsync(mergeFileSink.url)
+
+ case None =>
+ throw new WayangException("All contexts must be attached to a merge sink.")
+
+ case _ =>
+ throw new WayangException("Invalid sink.")
+ }
+ })
+
+ // Get futures
+ val futures: List[Future[Any]] = asyncResults.map(_.future)
+
+ // Block indefinitely until all futures finish
+ val aggregateFuture: Future[List[Any]] = Future.sequence(futures)
+ Await.result(aggregateFuture, timeout)
+
+ // Create plan builder for the new merge context
+ val planBuilder = new PlanBuilder(mergeContext).withUdfJarsOf(this.getClass)
+
+ // Sources to merge
+ var sources: List[DataQuanta[Out]] = List()
+
+ // Create sources by loading each one using the merge context
+ asyncResults.foreach(dataQuantaAsyncResult2 => sources = sources :+ planBuilder.loadAsync(dataQuantaAsyncResult2))
+
+ // Merge sources with union and return
+ sources.reduce((dq1, dq2) => dq1.union(dq2))
+ }
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/MultiContextPlanBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/MultiContextPlanBuilder.scala
new file mode 100644
index 0000000..a49a908
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/MultiContextPlanBuilder.scala
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api
+
+import org.apache.wayang.core.util.ReflectionUtils
+
+import scala.collection.JavaConverters._
+import scala.language.implicitConversions
+import scala.reflect.ClassTag
+
+class MultiContextPlanBuilder(private[api] val multiContexts: List[MultiContext]) {
+
+ private[api] var udfJars = scala.collection.mutable.Set[String]()
+
+ private val multiContextMap: Map[Long, MultiContext] = multiContexts.map(context => context.id -> context).toMap
+
+ private var dataQuantaMap: Map[Long, DataQuanta[_]] = Map()
+
+ private var planBuilderMap: Map[Long, PlanBuilder] = multiContexts.map(context => context.id -> new PlanBuilder(context)).toMap
+
+ def this(multiContexts: java.util.List[MultiContext]) =
+ this(multiContexts.asScala.toList)
+
+
+ /**
+ * Defines user-code JAR files that might be needed to transfer to execution platforms.
+ *
+ * @param paths paths to JAR files that should be transferred
+ * @return this instance
+ */
+ def withUdfJars(paths: String*): MultiContextPlanBuilder = {
+ // For each planBuilder in planBuilderMap,
+ // call its `withUdfJars` method with the value of `this.udfJars` and update map with the new PlanBuilder
+ planBuilderMap = planBuilderMap.mapValues(_.withUdfJars(paths.toSeq: _*))
+ this
+ }
+
+ /**
+ * Defines user-code JAR files that might be needed to transfer to execution platforms.
+ *
+ * @param classes whose JAR files should be transferred
+ * @return this instance
+ */
+ def withUdfJarsOf(classes: Class[_]*): MultiContextPlanBuilder = {
+ withUdfJars(classes.map(ReflectionUtils.getDeclaringJar).filterNot(_ == null): _*)
+ this
+ }
+
+ /**
+ * Applies a function `f` to each [[PlanBuilder]] in the contexts.
+ * Returns a [[MultiContextDataQuanta]] containing the [[DataQuanta]].
+ *
+ * @param f the function to apply to each [[PlanBuilder]]
+ * @tparam Out the type parameter for the output of the `f` function
+ * @return a [[MultiContextDataQuanta]] containing the results of applying `f` to each [[PlanBuilder]]
+ */
+ def forEach[Out: ClassTag](f: PlanBuilder => DataQuanta[Out]): MultiContextDataQuanta[Out] = {
+ val dataQuantaMap = multiContexts.map(context => context.id -> f(planBuilderMap(context.id))).toMap
+ new MultiContextDataQuanta[Out](dataQuantaMap)(this)
+ }
+
+ /**
+ * Same as [[PlanBuilder.readTextFile]], but for specified `multiContext`
+ *
+ * @param multiContext The multi context.
+ * @param url The URL of the text file to be read.
+ * @return The ReadTextFileMultiContextPlanBuilder with the added data quanta.
+ */
+ def readTextFile(multiContext: MultiContext, url: String): ReadTextFileMultiContextPlanBuilder = {
+ dataQuantaMap += (multiContext.id -> planBuilderMap(multiContext.id).readTextFile(url))
+ new ReadTextFileMultiContextPlanBuilder(this, multiContextMap, dataQuantaMap.asInstanceOf[Map[Long, DataQuanta[String]]])
+ }
+
+ /**
+ * Same as [[PlanBuilder.readObjectFile()]], but for specified `multiContext`
+ *
+ * @param multiContext The multi context.
+ * @param url The URL of the object file to be read.
+ * @return The ReadObjectFileMultiContextPlanBuilder with the added data quanta.
+ */
+ def readObjectFile[T: ClassTag](multiContext: MultiContext, url: String): ReadObjectFileMultiContextPlanBuilder[T] = {
+ dataQuantaMap += (multiContext.id -> planBuilderMap(multiContext.id).readObjectFile(url))
+ new ReadObjectFileMultiContextPlanBuilder[T](this, multiContextMap, dataQuantaMap.asInstanceOf[Map[Long, DataQuanta[T]]])
+ }
+
+ /**
+ * Same as [[PlanBuilder.loadCollection]], but for specified `multiContext`
+ *
+ * @param multiContext The multi context.
+ * @param iterable The collection to be loaded.
+ * @return The LoadCollectionMultiContextPlanBuilder with the added data quanta.
+ */
+ def loadCollection[T: ClassTag](multiContext: MultiContext, iterable: Iterable[T]): LoadCollectionMultiContextPlanBuilder[T] = {
+ dataQuantaMap += (multiContext.id -> planBuilderMap(multiContext.id).loadCollection(iterable))
+ new LoadCollectionMultiContextPlanBuilder[T](this, multiContextMap, dataQuantaMap.asInstanceOf[Map[Long, DataQuanta[T]]])
+ }
+
+}
+
+
+class ReadTextFileMultiContextPlanBuilder(private val multiContextPlanBuilder: MultiContextPlanBuilder,
+ private val multiContextMap: Map[Long, MultiContext],
+ private var dataQuantaMap: Map[Long, DataQuanta[String]] = Map()) {
+
+ /**
+ * Same as [[PlanBuilder.readTextFile]], but for specified `multiContext`
+ *
+ * @param multiContext The multi context.
+ * @param url The URL of the text file to be read.
+ * @return The ReadTextFileMultiContextPlanBuilder with the added data quanta.
+ */
+ def readTextFile(multiContext: MultiContext, url: String): ReadTextFileMultiContextPlanBuilder = {
+ dataQuantaMap += (multiContext.id -> multiContextMap(multiContext.id).readTextFile(url))
+ this
+ }
+}
+
+object ReadTextFileMultiContextPlanBuilder {
+ implicit def toMultiContextDataQuanta(builder: ReadTextFileMultiContextPlanBuilder): MultiContextDataQuanta[String] = {
+ new MultiContextDataQuanta[String](builder.dataQuantaMap)(builder.multiContextPlanBuilder)
+ }
+}
+
+
+class ReadObjectFileMultiContextPlanBuilder[T: ClassTag](private val multiContextPlanBuilder: MultiContextPlanBuilder,
+ private val multiContextMap: Map[Long, MultiContext],
+ private var dataQuantaMap: Map[Long, DataQuanta[T]] = Map()) {
+
+ /**
+ * Same as [[PlanBuilder.readObjectFile()]], but for specified `multiContext`
+ *
+ * @param multiContext The multi context.
+ * @param url The URL of the object file to be read.
+ * @return The ReadObjectFileMultiContextPlanBuilder with the added data quanta.
+ */
+ def readObjectFile(multiContext: MultiContext, url: String): ReadObjectFileMultiContextPlanBuilder[T] = {
+ dataQuantaMap += (multiContext.id -> multiContextMap(multiContext.id).readObjectFile(url))
+ this
+ }
+}
+
+object ReadObjectFileMultiContextPlanBuilder {
+ implicit def toMultiContextDataQuanta[T: ClassTag](builder: ReadObjectFileMultiContextPlanBuilder[T]): MultiContextDataQuanta[T] = {
+ new MultiContextDataQuanta[T](builder.dataQuantaMap)(builder.multiContextPlanBuilder)
+ }
+}
+
+
+class LoadCollectionMultiContextPlanBuilder[T: ClassTag](private val multiContextPlanBuilder: MultiContextPlanBuilder,
+ private val multiContextMap: Map[Long, MultiContext],
+ private var dataQuantaMap: Map[Long, DataQuanta[T]] = Map()) {
+
+ /**
+ * Same as [[PlanBuilder.loadCollection]], but for specified `multiContext`
+ *
+ * @param multiContext The multi context.
+ * @param iterable The collection to be loaded.
+ * @return The LoadCollectionMultiContextPlanBuilder with the added data quanta.
+ */
+ def loadCollection(multiContext: MultiContext, iterable: Iterable[T]): LoadCollectionMultiContextPlanBuilder[T] = {
+ dataQuantaMap += (multiContext.id -> multiContextMap(multiContext.id).loadCollection(iterable))
+ this
+ }
+}
+
+object LoadCollectionMultiContextPlanBuilder {
+ implicit def toMultiContextDataQuanta[T: ClassTag](builder: LoadCollectionMultiContextPlanBuilder[T]): MultiContextDataQuanta[T] = {
+ new MultiContextDataQuanta[T](builder.dataQuantaMap)(builder.multiContextPlanBuilder)
+ }
+}
+
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala
index 1578421..a00d3f5 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala
@@ -38,11 +38,11 @@
/**
* Utility to build [[WayangPlan]]s.
*/
-class PlanBuilder(wayangContext: WayangContext, private var jobName: String = null) {
+class PlanBuilder(private[api] val wayangContext: WayangContext, private var jobName: String = null) {
private[api] val sinks = ListBuffer[Operator]()
- private val udfJars = scala.collection.mutable.Set[String]()
+ private[api] val udfJars = scala.collection.mutable.Set[String]()
private var experiment: Experiment = _
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/async/DataQuantaAsyncResult.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/async/DataQuantaAsyncResult.scala
new file mode 100644
index 0000000..1090bf6
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/async/DataQuantaAsyncResult.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.wayang.api.async
+
+import scala.concurrent.Future
+import scala.reflect.ClassTag
+
+/**
+ * Represents an asynchronous result of data quanta processing.
+ *
+ * The `DataQuantaAsyncResult2` class is a case class used to encapsulate the result of an asynchronous data quanta processing.
+ * It contains the temporary output file path, the class tag of the output type, and the future representing the completion
+ * of the processing.
+ *
+ * @param tempFileOut The temporary output file path.
+ * @param classTag The class tag of the output type.
+ * @param future The future representing the completion of the processing.
+ * @tparam Out The type of the output.
+ */
+case class DataQuantaAsyncResult[Out: ClassTag](tempFileOut: String, classTag: ClassTag[Out], future: Future[_])
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/async/DataQuantaImplicits.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/async/DataQuantaImplicits.scala
new file mode 100644
index 0000000..9aa28b3
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/async/DataQuantaImplicits.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.async
+
+import org.apache.wayang.api
+import org.apache.wayang.api.{MultiContext, DataQuanta, async}
+
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+import scala.reflect.ClassTag
+
+/**
+ * Implicit conversions and utility methods for asynchronous operations on `DataQuanta`.
+ */
+object DataQuantaImplicits {
+
+ implicit class DataQuantaRunAsyncImplicits[T: ClassTag](dataQuanta: DataQuanta[T]) {
+
+ /**
+ * Asynchronously runs a Wayang plan and writes out to `tempFileOut`.
+ * This method schedules a new future to run after the previous futures in `dataQuantaAsyncResults` have completed.
+ * The result of the execution is represented by an instance of `DataQuantaAsyncResult`.
+ *
+ * @param tempFileOut the temporary file output path
+ * @param dataQuantaAsyncResults a variable number of previous asynchronous results
+ * @return a future representing the asynchronous result of running the data quanta
+ */
+ def runAsync(tempFileOut: String, dataQuantaAsyncResults: DataQuantaAsyncResult[_]*): DataQuantaAsyncResult[T] = {
+ // Schedule new future to run after the previous futures in `dataQuantaAsyncResults` have completed
+ val resultFuture: Future[_] = getFutureSequence(dataQuantaAsyncResults: _*).flatMap { _ =>
+ async.runAsyncWithTempFileOut(dataQuanta, tempFileOut)
+ }
+ DataQuantaAsyncResult(tempFileOut, implicitly[ClassTag[T]], resultFuture)
+ }
+
+ /**
+ * Similar to [[DataQuanta.writeTextFile]] but for asynchronous execution.
+ *
+ * @param url The URL where the text file should be written.
+ * @param dataQuantaAsyncResults The asynchronous results of data quanta operations.
+ * Variable arguments of type 'DataQuantaAsyncResult[_]'.
+ * Multiple results can be provided in any order.
+ * The method will wait for all the provided futures to complete
+ * before executing the file writing operation.
+ * @return A future unit indicating the completion of the text file writing operation.
+ * @throws RuntimeException if any of the provided asynchronous results
+ * fails with an exception during execution.
+ */
+ def writeTextFileAsync(url: String, dataQuantaAsyncResults: DataQuantaAsyncResult[_]*): Future[Unit] = {
+ // Schedule new future to run after the previous futures in `dataQuantaAsyncResults` have completed
+ getFutureSequence(dataQuantaAsyncResults: _*).flatMap { _ =>
+ async.runAsyncWithTextFileOut(dataQuanta, url)
+ }
+ }
+
+ /**
+ * Similar to [[DataQuanta.writeObjectFile]] but for asynchronous execution.
+ *
+ * @param url The URL of the object file sink.
+ * @param dataQuantaAsyncResults The asynchronous results of data quanta operations.
+ * Variable arguments of type 'DataQuantaAsyncResult[_]'.
+ * Multiple results can be provided in any order.
+ * The method will wait for all the provided futures to complete
+ * before executing the file writing operation.* @return A `Future` that completes when the data quanta have been written successfully.
+ */
+ def writeObjectFileAsync(url: String, dataQuantaAsyncResults: DataQuantaAsyncResult[_]*): Future[Unit] = {
+ // Schedule new future to run after the previous futures in `dataQuantaAsyncResults` have completed
+ getFutureSequence(dataQuantaAsyncResults: _*).flatMap { _ =>
+ async.runAsyncWithObjectFileOut(dataQuanta, url)
+ }
+ }
+
+ // Extract futures
+ private def getFutureSequence(dataQuantaAsyncResult2: DataQuantaAsyncResult[_]*): Future[Seq[Any]] = {
+ // Extract the futures
+ val futures: Seq[Future[Any]] = dataQuantaAsyncResult2.map(_.future)
+
+ // Create a Future of Seq[Any]
+ Future.sequence(futures)
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/async/Main.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/async/Main.scala
new file mode 100644
index 0000000..8be4f4b
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/async/Main.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.async
+
+import org.apache.wayang.api.serialization.TempFileUtils
+import org.apache.wayang.api.{MultiContext, PlanBuilder}
+import org.apache.wayang.basic.operators.{ObjectFileSink, TextFileSink}
+import org.apache.wayang.core.api.exception.WayangException
+import org.apache.wayang.core.plan.wayangplan.{Operator, WayangPlan}
+
+import java.nio.file.Path
+
+object Main {
+
+ /**
+ * Main method for executing a Wayang plan.
+ * Used as a spawned process for async operations and the MultiContextPlanBuilder API.
+ *
+ * @param args Array of command line arguments. Two arguments are expected: paths to the serialized operator and context.
+ */
+ def main(args: Array[String]): Unit = {
+ println("org.apache.wayang.api.async.Main")
+ println(args.mkString("Array(", ", ", ")"))
+
+ if (args.length != 2) {
+ println("Expected two arguments: paths to the serialized operator and context.")
+ System.exit(1)
+ }
+
+ // Parse file paths
+ val operatorPath = Path.of(args(0))
+ val planBuilderPath = Path.of(args(1))
+
+ // Parse operator and multiContextPlanBuilder
+ val operator = TempFileUtils.readFromTempFileFromString[Operator](operatorPath)
+ val planBuilder = TempFileUtils.readFromTempFileFromString[PlanBuilder](planBuilderPath)
+
+ // Get context
+ val context = planBuilder.wayangContext.asInstanceOf[MultiContext]
+
+ // Get udf jars
+ val udfJars = planBuilder.udfJars
+ println(s"udfJars: $udfJars")
+
+ // Get out output type to create sink with
+ val outType = operator.getOutput(0).getType.getDataUnitType.getTypeClass
+
+ // Connect to sink and execute plan
+ context.getSink match {
+ case Some(textFileSink: MultiContext.TextFileSink) =>
+ connectToSinkAndExecutePlan(new TextFileSink(textFileSink.url, outType))
+
+ case Some(objectFileSink: MultiContext.ObjectFileSink) =>
+ connectToSinkAndExecutePlan(new ObjectFileSink(objectFileSink.url, outType))
+
+ case None =>
+ throw new WayangException("All contexts must be attached to an output sink.")
+
+ case _ =>
+ throw new WayangException("Invalid sink..")
+ }
+
+
+ def connectToSinkAndExecutePlan(sink: Operator): Unit = {
+ operator.connectTo(0, sink, 0)
+ context.execute(new WayangPlan(sink), udfJars.toSeq: _*)
+ }
+ }
+}
\ No newline at end of file
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/async/PlanBuilderImplicits.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/async/PlanBuilderImplicits.scala
new file mode 100644
index 0000000..c6d841e
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/async/PlanBuilderImplicits.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.wayang.api.async
+
+import org.apache.wayang.api.{DataQuanta, PlanBuilder, async}
+
+import scala.concurrent.Future
+import scala.reflect.ClassTag
+
+
+object PlanBuilderImplicits {
+
+ /**
+ * An implicit class that adds additional functionality to the PlanBuilder class
+ * for loading results of previous asynchronous operations.
+ *
+ * @param planBuilder The PlanBuilder instance to extend.
+ * @tparam Out The type of data to be loaded.
+ */
+ implicit class PlanBuilderLoadAsyncImplicit[Out: ClassTag](planBuilder: PlanBuilder) {
+ /**
+ * Loads the data from the result of a previous asynchronous operation.
+ *
+ * @param asyncResult The asynchronous result object containing the temporary file path of the data to be loaded.
+ * @return A DataQuanta representing the loaded data.
+ */
+ def loadAsync(asyncResult: DataQuantaAsyncResult[Out]): DataQuanta[Out] = {
+ planBuilder.readObjectFile[Out](asyncResult.tempFileOut)
+ }
+ }
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/async/package.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/async/package.scala
new file mode 100644
index 0000000..771e618
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/async/package.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api
+
+import org.apache.logging.log4j.{LogManager, Logger}
+import org.apache.wayang.api.serialization.TempFileUtils
+import org.apache.wayang.core.api.exception.WayangException
+
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Files, Paths}
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+import scala.reflect.ClassTag
+
+package object async {
+
+ val logger: Logger = LogManager.getLogger(getClass)
+
+ /**
+ * Runs the given data quanta asynchronously with a temporary file as the output.
+ *
+ * @param dataQuanta The data quanta to be executed.
+ * @param tempFileOut The path to the temporary output file.
+ * @tparam Out The type parameter of the data quanta and the output.
+ * @return A future representing the completion of the execution, which will contain a
+ * DataQuantaAsyncResult object that holds the path to the temporary output file
+ * and the class tag for the output type.
+ */
+ def runAsyncWithTempFileOut[Out: ClassTag](dataQuanta: DataQuanta[Out], tempFileOut: String): Future[Unit] = {
+ runAsyncWithObjectFileOut(dataQuanta, tempFileOut)
+ }
+
+
+ /**
+ * Runs the given DataQuanta asynchronously and writes the output to a text file.
+ *
+ * @param dataQuanta the DataQuanta to be executed
+ * @param url the URL of the text file to write the output to
+ * @tparam Out the type of the output DataQuanta
+ * @return a Future representing the completion of the execution
+ * @throws WayangException if the WayangContext is not of type MultiContext
+ */
+ def runAsyncWithTextFileOut[Out: ClassTag](dataQuanta: DataQuanta[Out], url: String): Future[Unit] = {
+ // Add sink to multi context and then pass to runAsyncBody
+ val wayangContext = dataQuanta.planBuilder.wayangContext
+ wayangContext match {
+ case context: MultiContext =>
+ val updatedContext = context.withTextFileSink(url)
+ runAsyncBody(dataQuanta, updatedContext)
+ case _ =>
+ throw new WayangException("WayangContext is not of type MultiContext")
+ }
+ }
+
+
+ /**
+ * Runs the given DataQuanta asynchronously and writes the output to an object file specified by the URL.
+ *
+ * @param dataQuanta the DataQuanta to be executed
+ * @param url the URL of the object file to write the output to
+ * @tparam Out the type parameter for the output DataQuanta
+ * @return a Future that represents the execution of the DataQuanta
+ * @throws WayangException if the WayangContext is not of type MultiContext
+ */
+ def runAsyncWithObjectFileOut[Out: ClassTag](dataQuanta: DataQuanta[Out], url: String): Future[Unit] = {
+ // Add sink to multi context and then pass to runAsyncBody
+ val wayangContext = dataQuanta.planBuilder.wayangContext
+ wayangContext match {
+ case context: MultiContext =>
+ val updatedContext = context.withObjectFileSink(url)
+ runAsyncBody(dataQuanta, updatedContext)
+ case _ =>
+ throw new WayangException("WayangContext is not of type MultiContext")
+ }
+ }
+
+
+ def runAsyncBody[Out: ClassTag](dataQuanta: DataQuanta[Out], multiContext: MultiContext): Future[Unit] = Future {
+
+ import scala.concurrent.blocking
+
+ // Write plan builder to temp file
+ val planBuilderPath = TempFileUtils.writeToTempFileAsString(dataQuanta.planBuilder.withUdfJarsOf(this.getClass))
+
+ // Write operator to temp file
+ val operatorPath = TempFileUtils.writeToTempFileAsString(dataQuanta.operator)
+
+ var process: Process = null
+
+ try {
+ val mainClass = "org.apache.wayang.api.async.Main"
+ val classpath = System.getProperty("java.class.path") // get classpath from parent JVM
+
+ // Child process
+ val processBuilder = new ProcessBuilder(
+ "java",
+ "-cp",
+ classpath,
+ mainClass,
+ operatorPath.toString,
+ planBuilderPath.toString
+ )
+
+ // Redirect children output to parent output
+ processBuilder.redirectOutput(ProcessBuilder.Redirect.INHERIT)
+ processBuilder.redirectError(ProcessBuilder.Redirect.INHERIT)
+
+ // Start child process
+ process = processBuilder.start()
+
+ // And block this future while waiting for it
+ blocking {
+ process.waitFor()
+ }
+ }
+
+ finally {
+ Files.deleteIfExists(planBuilderPath)
+ Files.deleteIfExists(operatorPath)
+ }
+
+ }
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/package.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/package.scala
index 715741c..2514f09 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/package.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/package.scala
@@ -20,10 +20,10 @@
import _root_.java.lang.{Class => JavaClass, Iterable => JavaIterable}
import _root_.java.util.function.{Consumer, ToLongBiFunction, ToLongFunction}
-
import org.apache.wayang.basic.data.{Record, Tuple2 => WayangTuple2}
import org.apache.wayang.core.api.WayangContext
-import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBinaryOperator, SerializableFunction, SerializablePredicate}
+import org.apache.wayang.core.function.FunctionDescriptor
+import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBinaryOperator, SerializableFunction, SerializablePredicate, SerializableToLongBiFunction}
import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
import org.apache.wayang.core.optimizer.cardinality.{CardinalityEstimate, CardinalityEstimator, DefaultCardinalityEstimator, FixedSizeCardinalityEstimator}
import org.apache.wayang.core.optimizer.costs.{DefaultLoadEstimator, LoadEstimator, LoadProfileEstimator, NestableLoadProfileEstimator}
@@ -103,12 +103,12 @@
new FixedSizeCardinalityEstimator(fixCardinality, true)
implicit def toCardinalityEstimator(f: Long => Long): CardinalityEstimator =
- new DefaultCardinalityEstimator(.99d, 1, true, new ToLongFunction[Array[Long]] {
+ new DefaultCardinalityEstimator(.99d, 1, true, new FunctionDescriptor.SerializableToLongFunction[Array[Long]] {
override def applyAsLong(inCards: Array[Long]): Long = f.apply(inCards(0))
})
implicit def toCardinalityEstimator(f: (Long, Long) => Long): CardinalityEstimator =
- new DefaultCardinalityEstimator(.99d, 1, true, new ToLongFunction[Array[Long]] {
+ new DefaultCardinalityEstimator(.99d, 1, true, new FunctionDescriptor.SerializableToLongFunction[Array[Long]] {
override def applyAsLong(inCards: Array[Long]): Long = f.apply(inCards(0), inCards(1))
})
@@ -118,7 +118,7 @@
1,
.99d,
CardinalityEstimate.EMPTY_ESTIMATE,
- new ToLongBiFunction[Array[Long], Array[Long]] {
+ new SerializableToLongBiFunction[Array[Long], Array[Long]] {
override def applyAsLong(t: Array[Long], u: Array[Long]): Long = f.apply(t(0), u(0))
}
)
@@ -129,7 +129,7 @@
1,
.99d,
CardinalityEstimate.EMPTY_ESTIMATE,
- new ToLongBiFunction[Array[Long], Array[Long]] {
+ new SerializableToLongBiFunction[Array[Long], Array[Long]] {
override def applyAsLong(t: Array[Long], u: Array[Long]): Long = f.apply(t(0), t(1), u(0))
}
)
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/SerializationUtils.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/SerializationUtils.scala
new file mode 100644
index 0000000..541d13d
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/SerializationUtils.scala
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.serialization
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility
+import com.fasterxml.jackson.annotation._
+import com.fasterxml.jackson.databind._
+import com.fasterxml.jackson.databind.module.SimpleModule
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import org.apache.wayang.api.serialization.customserializers._
+import org.apache.wayang.api.serialization.mixins.ConfigurationAndContextMixIns._
+import org.apache.wayang.api.serialization.mixins.DataTypeMixIns._
+import org.apache.wayang.api.serialization.mixins.DescriptorMixIns._
+import org.apache.wayang.api.serialization.mixins.EstimatorMixIns._
+import org.apache.wayang.api.serialization.mixins.IgnoreLoggerMixIn
+import org.apache.wayang.api.serialization.mixins.OperatorMixIns._
+import org.apache.wayang.api.serialization.mixins.ProviderMixIns._
+import org.apache.wayang.api.serialization.mixins.SlotMixIns._
+import org.apache.wayang.api.{MultiContext, MultiContextPlanBuilder}
+import org.apache.wayang.basic.function.ProjectionDescriptor
+import org.apache.wayang.basic.operators._
+import org.apache.wayang.basic.types.RecordType
+import org.apache.wayang.core.api.configuration._
+import org.apache.wayang.core.api.{Configuration, Job, WayangContext}
+import org.apache.wayang.core.function.FunctionDescriptor._
+import org.apache.wayang.core.function._
+import org.apache.wayang.core.mapping.{OperatorPattern, PlanTransformation}
+import org.apache.wayang.core.optimizer.cardinality.{CardinalityEstimate, CardinalityEstimator, CardinalityEstimatorManager, CardinalityPusher, DefaultCardinalityEstimator}
+import org.apache.wayang.core.optimizer.channels.ChannelConversionGraph
+import org.apache.wayang.core.optimizer.costs._
+import org.apache.wayang.core.optimizer.enumeration._
+import org.apache.wayang.core.optimizer.{OptimizationContext, OptimizationUtils, ProbabilisticDoubleInterval, SanityChecker}
+import org.apache.wayang.core.plan.executionplan.{Channel, ExecutionPlan}
+import org.apache.wayang.core.plan.wayangplan._
+import org.apache.wayang.core.plan.wayangplan.traversal.AbstractTopologicalTraversal
+import org.apache.wayang.core.platform._
+import org.apache.wayang.core.profiling.{CardinalityRepository, ExecutionLog}
+import org.apache.wayang.core.types.{BasicDataUnitType, DataSetType, DataUnitGroupType, DataUnitType}
+import org.apache.wayang.core.util.fs.{FileSystems, HadoopFileSystem, LocalFileSystem}
+import org.apache.wayang.core.util.{AbstractReferenceCountable, ReflectionUtils}
+
+import scala.reflect.ClassTag
+
+object SerializationUtils {
+
+ val mapper: ObjectMapper = {
+ val mapper = new ObjectMapper()
+ .setVisibility(PropertyAccessor.FIELD, Visibility.ANY)
+ .configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false)
+ .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+ .enable(SerializationFeature.INDENT_OUTPUT)
+ .registerModule(DefaultScalaModule)
+
+ // Custom serializers
+ .registerModule(new SimpleModule().addSerializer(classOf[MultiContext], new MultiContextSerializer()))
+ .registerModule(new SimpleModule().addDeserializer(classOf[MultiContext], new MultiContextDeserializer()))
+ .registerModule(new SimpleModule().addSerializer(classOf[Platform], new PlatformSerializer()))
+ .registerModule(new SimpleModule().addDeserializer(classOf[Platform], new PlatformDeserializer()))
+ .registerModule(new SimpleModule().addDeserializer(classOf[Operator], new OperatorDeserializer()))
+
+ // Custom serializers for UDFs
+ .registerModule(new SimpleModule().addSerializer(classOf[SerializablePredicate[_]], new GenericSerializableSerializer[SerializablePredicate[_]]()))
+ .registerModule(new SimpleModule().addDeserializer(classOf[SerializablePredicate[_]], new GenericSerializableDeserializer[SerializablePredicate[_]]()))
+ .registerModule(new SimpleModule().addSerializer(classOf[SerializableFunction[_, _]], new GenericSerializableSerializer[FunctionDescriptor.SerializableFunction[_, _]]()))
+ .registerModule(new SimpleModule().addDeserializer(classOf[SerializableFunction[_, _]], new GenericSerializableDeserializer[FunctionDescriptor.SerializableFunction[_, _]]()))
+ .registerModule(new SimpleModule().addSerializer(classOf[SerializableBinaryOperator[_]], new GenericSerializableSerializer[SerializableBinaryOperator[_]]()))
+ .registerModule(new SimpleModule().addDeserializer(classOf[SerializableBinaryOperator[_]], new GenericSerializableDeserializer[SerializableBinaryOperator[_]]()))
+ .registerModule(new SimpleModule().addSerializer(classOf[SerializableConsumer[_]], new GenericSerializableSerializer[SerializableConsumer[_]]()))
+ .registerModule(new SimpleModule().addDeserializer(classOf[SerializableConsumer[_]], new GenericSerializableDeserializer[SerializableConsumer[_]]()))
+ .registerModule(new SimpleModule().addSerializer(classOf[SerializableIntUnaryOperator], new GenericSerializableSerializer[SerializableIntUnaryOperator]()))
+ .registerModule(new SimpleModule().addDeserializer(classOf[SerializableIntUnaryOperator], new GenericSerializableDeserializer[SerializableIntUnaryOperator]()))
+ .registerModule(new SimpleModule().addSerializer(classOf[SerializableLongUnaryOperator], new GenericSerializableSerializer[SerializableLongUnaryOperator]()))
+ .registerModule(new SimpleModule().addDeserializer(classOf[SerializableLongUnaryOperator], new GenericSerializableDeserializer[SerializableLongUnaryOperator]()))
+ .registerModule(new SimpleModule().addSerializer(classOf[LoadEstimator.SinglePointEstimationFunction], new GenericSerializableSerializer[LoadEstimator.SinglePointEstimationFunction]()))
+ .registerModule(new SimpleModule().addDeserializer(classOf[LoadEstimator.SinglePointEstimationFunction], new GenericSerializableDeserializer[LoadEstimator.SinglePointEstimationFunction]()))
+ .registerModule(new SimpleModule().addSerializer(classOf[SerializableToLongBiFunction[_, _]], new GenericSerializableSerializer[SerializableToLongBiFunction[_, _]]()))
+ .registerModule(new SimpleModule().addDeserializer(classOf[SerializableToLongBiFunction[_, _]], new GenericSerializableDeserializer[SerializableToLongBiFunction[_, _]]()))
+ .registerModule(new SimpleModule().addSerializer(classOf[SerializableToDoubleBiFunction[_, _]], new GenericSerializableSerializer[SerializableToDoubleBiFunction[_, _]]()))
+ .registerModule(new SimpleModule().addDeserializer(classOf[SerializableToDoubleBiFunction[_, _]], new GenericSerializableDeserializer[SerializableToDoubleBiFunction[_, _]]()))
+
+ // Register mix-ins
+ mapper
+ .addMixIn(classOf[MultiContextPlanBuilder], classOf[MultiContextPlanBuilderMixIn])
+ .addMixIn(classOf[WayangContext], classOf[WayangContextMixIn])
+ .addMixIn(classOf[Configuration], classOf[ConfigurationMixIn])
+ .addMixIn(classOf[CardinalityRepository], classOf[IgnoreLoggerMixIn])
+ .addMixIn(classOf[KeyValueProvider[_, _]], classOf[KeyValueProviderMixIn])
+ .addMixIn(classOf[ValueProvider[_]], classOf[IgnoreLoggerMixIn])
+ .addMixIn(classOf[CollectionProvider[_]], classOf[CollectionProviderMixIn])
+ .addMixIn(classOf[ExplicitCollectionProvider[_]], classOf[ExplicitCollectionProviderMixIn])
+ .addMixIn(classOf[FunctionalKeyValueProvider[_, _]], classOf[FunctionalKeyValueProviderMixIn[_, _]])
+ .addMixIn(classOf[MapBasedKeyValueProvider[_, _]], classOf[MapBasedKeyValueProviderMixIn[_, _]])
+ .addMixIn(classOf[ConstantValueProvider[_]], classOf[ConstantValueProviderMixIn])
+ .addMixIn(classOf[PlanTransformation], classOf[IgnoreLoggerMixIn])
+ .addMixIn(classOf[OperatorPattern[_]], classOf[OperatorPatternMixin])
+ .addMixIn(classOf[Slot[_]], classOf[SlotMixIn[_]])
+ .addMixIn(classOf[OutputSlot[_]], classOf[OutputSlotMixIn[_]])
+ .addMixIn(classOf[OperatorBase], classOf[OperatorBaseMixIn])
+ .addMixIn(classOf[MultiContext.UnarySink], classOf[MultiContextUnarySinkMixIn])
+ .addMixIn(classOf[ElementaryOperator], classOf[ElementaryOperatorMixIn])
+ .addMixIn(classOf[ActualOperator], classOf[ActualOperatorMixIn])
+ .addMixIn(classOf[Operator], classOf[OperatorMixIn])
+ .addMixIn(classOf[PredicateDescriptor[_]], classOf[PredicateDescriptorMixIn[_]])
+ .addMixIn(classOf[TransformationDescriptor[_, _]], classOf[TransformationDescriptorMixIn[_, _]])
+ .addMixIn(classOf[ProjectionDescriptor[_, _]], classOf[ProjectionDescriptorMixIn[_, _]])
+ .addMixIn(classOf[ReduceDescriptor[_]], classOf[ReduceDescriptorMixIn[_]])
+ .addMixIn(classOf[FlatMapDescriptor[_, _]], classOf[FlatMapDescriptorMixIn[_, _]])
+ .addMixIn(classOf[MapPartitionsDescriptor[_, _]], classOf[MapPartitionsDescriptorMixIn[_, _]])
+ .addMixIn(classOf[BasicDataUnitType[_]], classOf[BasicDataUnitTypeMixIn[_]])
+ .addMixIn(classOf[RecordType], classOf[RecordTypeMixIn])
+ .addMixIn(classOf[DataUnitGroupType[_]], classOf[DataUnitGroupTypeMixIn[_]])
+ .addMixIn(classOf[ProbabilisticDoubleInterval], classOf[ProbabilisticDoubleIntervalMixIn])
+ .addMixIn(classOf[LoadProfileEstimator], classOf[LoadProfileEstimatorMixIn])
+ .addMixIn(classOf[FunctionDescriptor], classOf[FunctionDescriptorMixIn])
+ .addMixIn(classOf[NestableLoadProfileEstimator], classOf[NestableLoadProfileEstimatorMixIn])
+ .addMixIn(classOf[LoadEstimator], classOf[LoadEstimatorMixIn])
+ .addMixIn(classOf[DefaultLoadEstimator], classOf[DefaultLoadEstimatorMixIn])
+ .addMixIn(classOf[CardinalityEstimate], classOf[CardinalityEstimateMixIn])
+ .addMixIn(classOf[DataSetType[_]], classOf[DataSetTypeMixIn[_]])
+ .addMixIn(classOf[DataUnitType[_]], classOf[DataUnitTypeMixIn])
+ .addMixIn(classOf[TextFileSource], classOf[IgnoreLoggerMixIn])
+ .addMixIn(classOf[UnarySource[_]], classOf[UnarySourceMixIn[_]])
+ .addMixIn(classOf[UnarySink[_]], classOf[UnarySinkMixIn[_]])
+ .addMixIn(classOf[UnaryToUnaryOperator[_, _]], classOf[UnaryToUnaryOperatorMixIn[_, _]])
+ .addMixIn(classOf[BinaryToUnaryOperator[_, _, _]], classOf[BinaryToUnaryOperatorMixIn[_, _, _]])
+ .addMixIn(classOf[LoopHeadOperator], classOf[LoopHeadOperatorMixIn])
+ .addMixIn(classOf[SampleOperator[_]], classOf[IgnoreLoggerMixIn])
+ .addMixIn(classOf[CardinalityEstimator], classOf[CardinalityEstimatorMixIn])
+ .addMixIn(classOf[DefaultCardinalityEstimator], classOf[DefaultCardinalityEstimatorMixIn])
+ .addMixIn(classOf[EstimatableCost], classOf[EstimatableCostMixIn])
+
+
+ // Ignore loggers
+ .addMixIn(classOf[Job], classOf[IgnoreLoggerMixIn])
+ .addMixIn(classOf[OptimizationContext], classOf[IgnoreLoggerMixIn])
+ .addMixIn(classOf[OptimizationUtils], classOf[IgnoreLoggerMixIn])
+ .addMixIn(classOf[SanityChecker], classOf[IgnoreLoggerMixIn])
+ .addMixIn(classOf[CardinalityEstimatorManager], classOf[IgnoreLoggerMixIn])
+ .addMixIn(classOf[CardinalityPusher], classOf[IgnoreLoggerMixIn])
+ .addMixIn(classOf[ChannelConversionGraph], classOf[IgnoreLoggerMixIn])
+ .addMixIn(classOf[LoadProfileEstimators], classOf[IgnoreLoggerMixIn])
+ .addMixIn(classOf[LatentOperatorPruningStrategy], classOf[IgnoreLoggerMixIn])
+ .addMixIn(classOf[PlanEnumeration], classOf[IgnoreLoggerMixIn])
+ .addMixIn(classOf[PlanEnumerator], classOf[IgnoreLoggerMixIn])
+ .addMixIn(classOf[PlanImplementation], classOf[IgnoreLoggerMixIn])
+ .addMixIn(classOf[StageAssignmentTraversal], classOf[IgnoreLoggerMixIn])
+ .addMixIn(classOf[Channel], classOf[IgnoreLoggerMixIn])
+ .addMixIn(classOf[ExecutionPlan], classOf[IgnoreLoggerMixIn])
+ .addMixIn(classOf[PlanTraversal], classOf[IgnoreLoggerMixIn])
+ .addMixIn(classOf[SlotMapping], classOf[IgnoreLoggerMixIn])
+ .addMixIn(classOf[WayangPlan], classOf[IgnoreLoggerMixIn])
+ .addMixIn(classOf[AbstractTopologicalTraversal[_, _]], classOf[IgnoreLoggerMixIn])
+ .addMixIn(classOf[CardinalityBreakpoint], classOf[IgnoreLoggerMixIn])
+ .addMixIn(classOf[CrossPlatformExecutor], classOf[IgnoreLoggerMixIn])
+ .addMixIn(classOf[ExecutorTemplate], classOf[IgnoreLoggerMixIn])
+ .addMixIn(classOf[Junction], classOf[IgnoreLoggerMixIn])
+ .addMixIn(classOf[ExecutionLog], classOf[IgnoreLoggerMixIn])
+ .addMixIn(classOf[AbstractReferenceCountable], classOf[IgnoreLoggerMixIn])
+ .addMixIn(classOf[ReflectionUtils], classOf[IgnoreLoggerMixIn])
+ .addMixIn(classOf[FileSystems], classOf[IgnoreLoggerMixIn])
+ .addMixIn(classOf[HadoopFileSystem], classOf[IgnoreLoggerMixIn])
+ .addMixIn(classOf[LocalFileSystem], classOf[IgnoreLoggerMixIn])
+ .addMixIn(classOf[ObjectFileSource[_]], classOf[IgnoreLoggerMixIn])
+
+ mapper
+ }
+
+
+ def serialize(obj: AnyRef): Array[Byte] = {
+ mapper.writeValueAsBytes(obj)
+ }
+
+ def serializeAsString(obj: AnyRef): String = {
+ mapper.writeValueAsString(obj)
+ }
+
+ def deserialize[T: ClassTag](bytes: Array[Byte]): T = {
+ val clazz = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]
+ mapper.readValue(bytes, clazz)
+ }
+
+ def deserializeFromString[T: ClassTag](string: String): T = {
+ val clazz = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]
+ mapper.readValue(string, clazz)
+ }
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/TempFileUtils.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/TempFileUtils.scala
new file mode 100644
index 0000000..82c37bd
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/TempFileUtils.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.serialization
+
+import java.io.{FileInputStream, FileOutputStream}
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Files, Path}
+import scala.reflect.ClassTag
+
+object TempFileUtils {
+
+ def writeToTempFileAsBinary(obj: AnyRef): Path = {
+ val tempFile = Files.createTempFile("serialized", ".tmp")
+ val fos = new FileOutputStream(tempFile.toFile)
+ try {
+ fos.write(SerializationUtils.serialize(obj))
+ } finally {
+ fos.close()
+ }
+ tempFile
+ }
+
+
+ def readFromTempFileFromBinary[T : ClassTag](path: Path): T = {
+ val fis = new FileInputStream(path.toFile)
+ try {
+ SerializationUtils.deserialize[T](fis.readAllBytes())
+ } finally {
+ fis.close()
+ Files.deleteIfExists(path)
+ }
+ }
+
+
+ def writeToTempFileAsString(obj: AnyRef): Path = {
+ val tempFile = Files.createTempFile("serialized", ".tmp")
+ val serializedString = SerializationUtils.serializeAsString(obj)
+ Files.writeString(tempFile, serializedString, StandardCharsets.UTF_8)
+ tempFile
+ }
+
+
+ def readFromTempFileFromString[T: ClassTag](path: Path): T = {
+ val serializedString = Files.readString(path, StandardCharsets.UTF_8)
+ val deserializedObject = SerializationUtils.deserializeFromString[T](serializedString)
+ Files.deleteIfExists(path)
+ deserializedObject
+ }
+}
\ No newline at end of file
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/GenericSerializableDeserializer.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/GenericSerializableDeserializer.scala
new file mode 100644
index 0000000..5514aa7
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/GenericSerializableDeserializer.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.wayang.api.serialization.customserializers
+
+import com.fasterxml.jackson.core.JsonParser
+import com.fasterxml.jackson.databind.{DeserializationContext, JsonDeserializer}
+
+import java.io.{ByteArrayInputStream, ObjectInputStream}
+
+class GenericSerializableDeserializer[T] extends JsonDeserializer[T] {
+
+ override def deserialize(p: JsonParser, ctxt: DeserializationContext): T = {
+ val value = p.getBinaryValue
+ val byteArrayInputStream: ByteArrayInputStream = new ByteArrayInputStream(value)
+ val inputStream: ObjectInputStream = new ObjectInputStream(byteArrayInputStream)
+ inputStream.readObject().asInstanceOf[T]
+ }
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/GenericSerializableSerializer.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/GenericSerializableSerializer.scala
new file mode 100644
index 0000000..991fcdf
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/GenericSerializableSerializer.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.wayang.api.serialization.customserializers
+
+import com.fasterxml.jackson.core.JsonGenerator
+import com.fasterxml.jackson.databind.{JsonSerializer, SerializerProvider}
+
+import java.io.{ByteArrayOutputStream, ObjectOutputStream}
+
+class GenericSerializableSerializer[T] extends JsonSerializer[T] {
+ override def serialize(value: T, gen: JsonGenerator, serializers: SerializerProvider): Unit = {
+ val byteArrayOutputStream: ByteArrayOutputStream = new ByteArrayOutputStream()
+ val outputStream: ObjectOutputStream = new ObjectOutputStream(byteArrayOutputStream)
+ outputStream.writeObject(value)
+ gen.writeBinary(byteArrayOutputStream.toByteArray)
+ }
+}
+
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/MultiContextDeserializer.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/MultiContextDeserializer.scala
new file mode 100644
index 0000000..e98fb8e
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/MultiContextDeserializer.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.wayang.api.serialization.customserializers
+
+import com.fasterxml.jackson.core.{JsonParser, JsonProcessingException}
+import com.fasterxml.jackson.core.`type`.TypeReference
+import com.fasterxml.jackson.databind.{DeserializationContext, JsonDeserializer, JsonNode}
+import com.fasterxml.jackson.databind.jsontype.TypeDeserializer
+import org.apache.wayang.api.MultiContext
+import org.apache.wayang.api.serialization.SerializationUtils.mapper
+import org.apache.wayang.core.api.Configuration
+import org.apache.wayang.java.Java
+import org.apache.wayang.postgres.Postgres
+import org.apache.wayang.spark.Spark
+import org.apache.wayang.sqlite3.Sqlite3
+
+import java.io.IOException
+
+class MultiContextDeserializer extends JsonDeserializer[MultiContext] {
+
+ override def deserializeWithType(p: JsonParser, ctxt: DeserializationContext, typeDeserializer: TypeDeserializer): AnyRef = {
+ this.deserialize(p, ctxt)
+ }
+
+ @throws[IOException]
+ @throws[JsonProcessingException]
+ override def deserialize(jp: JsonParser, ctxt: DeserializationContext): MultiContext = {
+
+ // Deserialize each field of MultiContext separately
+ val node: JsonNode = jp.getCodec.readTree(jp)
+
+ val configurationParser: JsonParser = node.get("configuration").traverse(jp.getCodec)
+ val configuration: Configuration = mapper.readValue(configurationParser, classOf[Configuration])
+
+ val sinkParser: JsonParser = node.get("sink").traverse(jp.getCodec)
+ val sink: Option[MultiContext.UnarySink] = mapper.readValue(sinkParser, new TypeReference[Option[MultiContext.UnarySink]]() {})
+
+ val pluginsParser: JsonParser = node.get("plugins").traverse(jp.getCodec)
+ val plugins: List[String] = mapper.readValue(pluginsParser, new TypeReference[List[String]]() {})
+
+ //
+ // Create the whole deserialized multi context
+ //
+ // 1. Add configuration
+ val multiContext = new MultiContext(configuration)
+
+ // 2. Add sink
+ sink match {
+ case Some(MultiContext.TextFileSink(url)) =>
+ println(s"It's a TextFileSink with url: $url")
+ multiContext.withTextFileSink(url)
+ case Some(MultiContext.ObjectFileSink(url)) =>
+ println(s"It's an ObjectFileSink with url: $url")
+ multiContext.withObjectFileSink(url)
+ case None =>
+ println("No sink defined")
+ case _ =>
+ println("Unknown sink type")
+ }
+
+ // TODO: Add all plugins
+ // 3. Add plugins
+ val javaPluginName = Java.basicPlugin.getClass.getName
+ val sparkPluginName = Spark.basicPlugin.getClass.getName
+ val postgresPluginName = Postgres.plugin().getClass.getName
+ // val flinkPluginName = Flink.basicPlugin().getClass.getName
+ val sqlite3PluginName = Sqlite3.plugin().getClass.getName
+
+ plugins.foreach {
+ case pluginName if pluginName == javaPluginName => multiContext.register(Java.basicPlugin())
+ case pluginName if pluginName == sparkPluginName => multiContext.register(Spark.basicPlugin())
+ case pluginName if pluginName == postgresPluginName => multiContext.register(Postgres.plugin())
+ // case pluginName if pluginName == flinkPluginName => multiContext.register(Flink.basicPlugin())
+ case pluginName if pluginName == sqlite3PluginName => multiContext.register(Sqlite3.plugin())
+ case _ => println("Unknown plugin detected")
+ }
+
+ multiContext
+ }
+}
+
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/MultiContextSerializer.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/MultiContextSerializer.scala
new file mode 100644
index 0000000..1b49299
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/MultiContextSerializer.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.wayang.api.serialization.customserializers
+
+import com.fasterxml.jackson.core.{JsonGenerator, JsonProcessingException}
+import com.fasterxml.jackson.databind.SerializerProvider
+import com.fasterxml.jackson.databind.jsontype.TypeSerializer
+import com.fasterxml.jackson.databind.ser.std.StdSerializer
+import org.apache.wayang.api.MultiContext
+
+import java.io.IOException
+
+
+class MultiContextSerializer extends StdSerializer[MultiContext](classOf[MultiContext]) {
+
+ override def serializeWithType(value: MultiContext, gen: JsonGenerator, serializers: SerializerProvider, typeSer: TypeSerializer): Unit = {
+ this.serialize(value, gen, serializers)
+ }
+
+ @throws[IOException]
+ @throws[JsonProcessingException]
+ def serialize(multiContext: MultiContext, jsonGenerator: JsonGenerator, serializerProvider: SerializerProvider): Unit = {
+ jsonGenerator.writeStartObject()
+
+ // Use default serialization for the 'configuration' field
+ jsonGenerator.writeFieldName("configuration")
+ serializerProvider.defaultSerializeValue(multiContext.getConfiguration, jsonGenerator)
+
+ // Use default serialization for the 'sink' field
+ jsonGenerator.writeFieldName("sink")
+ multiContext.getSink match {
+ case Some(sink) => serializerProvider.defaultSerializeValue(sink, jsonGenerator)
+ case None => jsonGenerator.writeNull()
+ }
+
+ // Serialize the plugins list as an array of strings
+ jsonGenerator.writeArrayFieldStart("plugins")
+ multiContext.getPlugins.foreach(plugin => jsonGenerator.writeString(plugin))
+ jsonGenerator.writeEndArray()
+
+ // Write the type of the context
+ jsonGenerator.writeStringField("@type", "MultiContext")
+
+ jsonGenerator.writeEndObject()
+ }
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/OperatorDeserializer.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/OperatorDeserializer.scala
new file mode 100644
index 0000000..174876b
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/OperatorDeserializer.scala
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.wayang.api.serialization.customserializers
+
+import com.fasterxml.jackson.core.JsonParser
+import com.fasterxml.jackson.core.`type`.TypeReference
+import com.fasterxml.jackson.databind.jsontype.TypeDeserializer
+import com.fasterxml.jackson.databind.node.ArrayNode
+import com.fasterxml.jackson.databind.{DeserializationContext, JsonDeserializer, JsonNode}
+import org.apache.wayang.api.serialization.SerializationUtils.mapper
+import org.apache.wayang.api.serialization.customserializers.OperatorDeserializer.{inputSlotOwnerIdMap, outputSlotOwnerIdMap}
+import org.apache.wayang.basic.data.Record
+import org.apache.wayang.basic.operators._
+import org.apache.wayang.basic.types.RecordType
+import org.apache.wayang.core.api.exception.WayangException
+import org.apache.wayang.core.function.FunctionDescriptor.{SerializableIntUnaryOperator, SerializableLongUnaryOperator}
+import org.apache.wayang.core.function._
+import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator
+import org.apache.wayang.core.plan.wayangplan.{LoopHeadOperator, Operator, OperatorBase}
+import org.apache.wayang.core.platform.Platform
+import org.apache.wayang.core.types.DataSetType
+import org.apache.wayang.jdbc.operators.JdbcTableSource
+import org.apache.wayang.postgres.operators.PostgresTableSource
+import org.apache.wayang.sqlite3.operators.Sqlite3TableSource
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+class OperatorDeserializer extends JsonDeserializer[Operator] {
+
+ // Define a type for the deserialization function
+ private type DeserializerFunction = (JsonParser, JsonNode) => Operator
+
+ // Map type names to deserialization functions
+ private val deserializers: Map[String, DeserializerFunction] = Map(
+
+ // Source
+ "TextFileSource" -> deserializeTextFileSource,
+ "ObjectFileSource" -> deserializeObjectFileSource,
+ "CollectionSource" -> deserializeCollectionSource,
+ "Sqlite3TableSource" -> deserializeSqlite3TableSource,
+ "PostgresTableSource" -> deserializePostgresTableSource,
+
+ // Unary
+ "MapOperator" -> deserializeMapOperator,
+ "MapPartitionsOperator" -> deserializeMapPartitionsOperator,
+ "FilterOperator" -> deserializeFilterOperator,
+ "FlatMapOperator" -> deserializeFlatMapOperator,
+ "SampleOperator" -> deserializeSampleOperator,
+ "ReduceByOperator" -> deserializeReduceByOperator,
+ "MaterializedGroupByOperator" -> deserializeMaterializedGroupByOperator,
+ "GlobalReduceOperator" -> deserializeGlobalReduceOperator,
+ "GlobalMaterializedGroupOperator" -> deserializeGlobalMaterializedGroupOperator,
+ "GroupByOperator" -> deserializeGroupByOperator,
+ "ReduceOperator" -> deserializeReduceOperator,
+ "SortOperator" -> deserializeSortOperator,
+ "ZipWithIdOperator" -> deserializeZipWithIdOperator,
+ "DistinctOperator" -> deserializeDistinctOperator,
+ "CountOperator" -> deserializeCountOperator,
+
+ // Binary
+ "CartesianOperator" -> deserializeCartesianOperator,
+ "UnionAllOperator" -> deserializeUnionAllOperator,
+ "IntersectOperator" -> deserializeIntersectOperator,
+ "JoinOperator" -> deserializeJoinOperator,
+ "CoGroupOperator" -> deserializeCoGroupOperator,
+
+ // Loop
+ "DoWhileOperator" -> deserializeDoWhileOperator,
+ "RepeatOperator" -> deserializeRepeatOperator,
+
+ /*
+ "LocalCallbackSink" -> deserializeLocalCallbackSink,
+ */
+ )
+
+
+ override def deserialize(jp: JsonParser, ctxt: DeserializationContext): Operator = {
+ val objectIdMap = OperatorDeserializer.operatorIdMap.get()
+ val jsonNodeOperator: JsonNode = mapper.readTree(jp)
+
+ // If operator does not have any fields (or equivalently the standard @type field)
+ // and is just a number (a Jackson id of an output slot),
+ // then it means we have already parsed that operator and already stored it,
+ // so return the stored one
+ if (jsonNodeOperator.get("@type") == null) {
+ objectIdMap.get(jsonNodeOperator.asLong()) match {
+ case Some(operator) => return operator
+ case None => throw new WayangException(s"Can't deserialize operator with id ${jsonNodeOperator.asLong()}")
+ }
+ }
+
+ val typeName = jsonNodeOperator.get("@type").asText
+ val id = jsonNodeOperator.get("@id").asLong
+ // println(s"Processing operator $typeName")
+
+ deserializers.get(typeName) match {
+
+ case Some(deserializeFunc) =>
+
+ // Deserialize operator
+ val operator = deserializeFunc(jp, jsonNodeOperator)
+
+ // Add target platforms
+ val targetPlatformsNode: JsonNode = jsonNodeOperator.get("targetPlatforms")
+ targetPlatformsNode.asInstanceOf[ArrayNode].elements().asScala.foreach( // Iterate over json array
+ platformStringNode => {
+ val platform = mapper.treeToValue(platformStringNode, classOf[Platform]) // Custom Platform deserializer gets called here
+ operator.addTargetPlatform(platform) // Add to operator
+ }
+ )
+
+ // Add target platforms
+ val cardinalityEstimatorsNode: JsonNode = jsonNodeOperator.get("cardinalityEstimators")
+ cardinalityEstimatorsNode.asInstanceOf[ArrayNode].elements().asScala.foreach( // Iterate over json array
+ cardinalityEstimatorNode => {
+ val cardinalityEstimator = mapper.treeToValue(cardinalityEstimatorNode, classOf[CardinalityEstimator]) // Custom Platform deserializer gets called here
+
+ // TODO: Check hard coded output index 0
+ operator.asInstanceOf[OperatorBase].setCardinalityEstimator(0, cardinalityEstimator) // Add to operator
+ }
+ )
+
+ // Store in map id -> operator
+ objectIdMap.put(id, operator)
+ // println(s"\tStoring $typeName with id ${id}")
+
+ // Connect to input operators and return
+ connectToInputOperatorsAndReturn(jsonNodeOperator, operator)
+
+ // If no deserialization function is matched, throw error
+ case None =>
+ throw new IllegalArgumentException(s"Unknown type: $typeName")
+ }
+ }
+
+
+ //
+ // Custom deserialization functions for each type
+ //
+ private def deserializeTextFileSource(jp: JsonParser, rootNode: JsonNode): Operator = {
+ val inputUrl = rootNode.get("inputUrl").asText
+ new TextFileSource(inputUrl)
+ }
+
+ private def deserializeObjectFileSource(jp: JsonParser, rootNode: JsonNode): Operator = {
+ val inputUrl = rootNode.get("inputUrl").asText
+ val tClass = mapper.treeToValue(rootNode.get("tClass"), classOf[Class[AnyRef]])
+ new ObjectFileSource(inputUrl, tClass)
+ }
+
+ private def deserializeCollectionSource(jp: JsonParser, rootNode: JsonNode): Operator = {
+ val collection = mapper.treeToValue(rootNode.get("collection"), classOf[Iterable[AnyRef]])
+ val t = mapper.treeToValue(rootNode.get("type"), classOf[DataSetType[AnyRef]])
+ new CollectionSource(collection.asJavaCollection, t)
+ }
+
+ private def deserializeSqlite3TableSource(jp: JsonParser, rootNode: JsonNode): Operator = {
+ val tableName = mapper.treeToValue(rootNode.get("tableName"), classOf[String])
+ val t = mapper.treeToValue(rootNode.get("type"), classOf[DataSetType[Record]])
+ new Sqlite3TableSource(tableName, t.getDataUnitType.asInstanceOf[RecordType].getFieldNames: _*)
+ }
+
+ private def deserializePostgresTableSource(jp: JsonParser, rootNode: JsonNode): Operator = {
+ val tableName = mapper.treeToValue(rootNode.get("tableName"), classOf[String])
+ val t = mapper.treeToValue(rootNode.get("type"), classOf[DataSetType[Record]])
+ new PostgresTableSource(tableName, t.getDataUnitType.asInstanceOf[RecordType].getFieldNames: _*)
+ }
+
+ private def deserializeMapOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+ val functionDescriptor = mapper.treeToValue(rootNode.get("functionDescriptor"), classOf[TransformationDescriptor[AnyRef, AnyRef]])
+ new MapOperator(functionDescriptor)
+ }
+
+ private def deserializeMapPartitionsOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+ val functionDescriptor = mapper.treeToValue(rootNode.get("functionDescriptor"), classOf[MapPartitionsDescriptor[AnyRef, AnyRef]])
+ new MapPartitionsOperator(functionDescriptor)
+ }
+
+ private def deserializeFilterOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+ val predicateDescriptor = mapper.treeToValue(rootNode.get("predicateDescriptor"), classOf[PredicateDescriptor[AnyRef]])
+ new FilterOperator(predicateDescriptor)
+ }
+
+ private def deserializeFlatMapOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+ val functionDescriptor = mapper.treeToValue(rootNode.get("functionDescriptor"), classOf[FlatMapDescriptor[AnyRef, AnyRef]])
+ new FlatMapOperator(functionDescriptor)
+ }
+
+ private def deserializeSampleOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+ val sampleSizeFunction = mapper.treeToValue(rootNode.get("sampleSizeFunction"), classOf[SerializableIntUnaryOperator])
+ val typeValue = mapper.treeToValue(rootNode.get("type"), classOf[DataSetType[AnyRef]])
+ val sampleMethod = mapper.treeToValue(rootNode.get("sampleMethod"), classOf[SampleOperator.Methods])
+ val seedFunction = mapper.treeToValue(rootNode.get("seedFunction"), classOf[SerializableLongUnaryOperator])
+ new SampleOperator(sampleSizeFunction, typeValue, sampleMethod, seedFunction)
+ }
+
+ private def deserializeReduceByOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+ val keyDescriptor = mapper.treeToValue(rootNode.get("keyDescriptor"), classOf[TransformationDescriptor[AnyRef, AnyRef]])
+ val reduceDescriptor = mapper.treeToValue(rootNode.get("reduceDescriptor"), classOf[ReduceDescriptor[AnyRef]])
+ new ReduceByOperator(keyDescriptor, reduceDescriptor)
+ }
+
+ private def deserializeMaterializedGroupByOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+ val keyDescriptor = mapper.treeToValue(rootNode.get("keyDescriptor"), classOf[TransformationDescriptor[AnyRef, AnyRef]])
+ new MaterializedGroupByOperator(keyDescriptor)
+ }
+
+ private def deserializeGlobalReduceOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+ val reduceDescriptor = mapper.treeToValue(rootNode.get("reduceDescriptor"), classOf[ReduceDescriptor[AnyRef]])
+ new GlobalReduceOperator(reduceDescriptor)
+ }
+
+ private def deserializeGlobalMaterializedGroupOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+ val inputType = mapper.treeToValue(rootNode.get("inputType"), classOf[DataSetType[AnyRef]])
+ val outputType = mapper.treeToValue(rootNode.get("outputType"), classOf[DataSetType[java.lang.Iterable[AnyRef]]])
+ new GlobalMaterializedGroupOperator(inputType, outputType)
+ }
+
+ private def deserializeGroupByOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+ val keyDescriptor = mapper.treeToValue(rootNode.get("keyDescriptor"), classOf[TransformationDescriptor[AnyRef, AnyRef]])
+ new GroupByOperator(keyDescriptor)
+ }
+
+ private def deserializeReduceOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+ val reduceDescriptor = mapper.treeToValue(rootNode.get("reduceDescriptor"), classOf[ReduceDescriptor[AnyRef]])
+ val inputType = mapper.treeToValue(rootNode.get("inputType"), classOf[DataSetType[AnyRef]])
+ val outputType = mapper.treeToValue(rootNode.get("outputType"), classOf[DataSetType[AnyRef]])
+ new ReduceOperator(reduceDescriptor, inputType, outputType)
+ }
+
+ private def deserializeSortOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+ val keyDescriptor = mapper.treeToValue(rootNode.get("keyDescriptor"), classOf[TransformationDescriptor[AnyRef, AnyRef]])
+ new SortOperator(keyDescriptor)
+ }
+
+ private def deserializeZipWithIdOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+ val inputType = mapper.treeToValue(rootNode.get("inputType"), classOf[DataSetType[AnyRef]])
+ new ZipWithIdOperator(inputType)
+ }
+
+ private def deserializeDistinctOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+ val inputType = mapper.treeToValue(rootNode.get("inputType"), classOf[DataSetType[AnyRef]])
+ new DistinctOperator(inputType)
+ }
+
+ private def deserializeCountOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+ val inputType = mapper.treeToValue(rootNode.get("inputType"), classOf[DataSetType[AnyRef]])
+ new CountOperator(inputType)
+ }
+
+ private def deserializeCartesianOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+ val inputType0 = mapper.treeToValue(rootNode.get("inputType0"), classOf[DataSetType[AnyRef]])
+ val inputType1 = mapper.treeToValue(rootNode.get("inputType1"), classOf[DataSetType[AnyRef]])
+ new CartesianOperator(inputType0, inputType1)
+ }
+
+ private def deserializeUnionAllOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+ val inputType0 = mapper.treeToValue(rootNode.get("inputType0"), classOf[DataSetType[AnyRef]])
+ new UnionAllOperator(inputType0)
+ }
+
+ private def deserializeIntersectOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+ val inputType0 = mapper.treeToValue(rootNode.get("inputType0"), classOf[DataSetType[AnyRef]])
+ new IntersectOperator(inputType0)
+ }
+
+ private def deserializeJoinOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+ val keyDescriptor0 = mapper.treeToValue(rootNode.get("keyDescriptor0"), classOf[TransformationDescriptor[AnyRef, AnyRef]])
+ val keyDescriptor1 = mapper.treeToValue(rootNode.get("keyDescriptor1"), classOf[TransformationDescriptor[AnyRef, AnyRef]])
+ new JoinOperator(keyDescriptor0, keyDescriptor1)
+ }
+
+ private def deserializeCoGroupOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+ val keyDescriptor0 = mapper.treeToValue(rootNode.get("keyDescriptor0"), classOf[TransformationDescriptor[AnyRef, AnyRef]])
+ val keyDescriptor1 = mapper.treeToValue(rootNode.get("keyDescriptor1"), classOf[TransformationDescriptor[AnyRef, AnyRef]])
+ new CoGroupOperator(keyDescriptor0, keyDescriptor1)
+ }
+
+ private def deserializeDoWhileOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+ val inputType = mapper.treeToValue(rootNode.get("inputType"), classOf[DataSetType[AnyRef]])
+ val convergenceType = mapper.treeToValue(rootNode.get("convergenceType"), classOf[DataSetType[AnyRef]])
+ val criterionDescriptor = mapper.treeToValue(rootNode.get("criterionDescriptor"), classOf[PredicateDescriptor[java.util.Collection[AnyRef]]])
+ val numExpectedIterations = mapper.treeToValue(rootNode.get("numExpectedIterations"), classOf[Integer])
+ new DoWhileOperator(inputType, convergenceType, criterionDescriptor, numExpectedIterations)
+ }
+
+ private def deserializeRepeatOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+ val numIterations = mapper.treeToValue(rootNode.get("numIterations"), classOf[Integer])
+ val typeValue = mapper.treeToValue(rootNode.get("type"), classOf[DataSetType[AnyRef]])
+ new RepeatOperator(numIterations, typeValue)
+ }
+
+
+ private def connectToInputOperatorsAndReturn(node: JsonNode, operator: Operator): Operator = {
+ val inputOperators = getInputOperators(node)
+ for ((inputOperator, index) <- inputOperators.zipWithIndex) {
+ val thisOutputIndex = if (isLoopOutput(node)) 1 else 0
+ inputOperator.connectTo(thisOutputIndex, operator, index)
+ }
+ operator
+ }
+
+ // If the inputSlot->occupant->outputSlot has the "finOut" name,
+ // then it means this node is the output of a loop operator
+ private def isLoopOutput(node: JsonNode): Boolean = {
+ val inputSlots = node.get("inputSlots")
+ if (inputSlots != null && inputSlots.isArray && inputSlots.size() == 1) {
+ // For each input slot
+ inputSlots.elements().forEachRemaining { inputSlot =>
+ // Access occupant
+ if (inputSlot.get("occupant") != null) {
+ val outputSlot = inputSlot.get("occupant")
+ // Access owner
+ if (outputSlot.get("name") != null) {
+ val name = outputSlot.get("name").asText()
+ return name == "finOut"
+ }
+ }
+ }
+ }
+ return false
+ }
+
+ private def getInputOperators(node: JsonNode): List[Operator] = {
+
+ var inputOperators: List[Operator] = List()
+
+ // Navigate to inputSlots
+ val inputSlots = node.get("inputSlots")
+ if (inputSlots != null && inputSlots.isArray) {
+
+ // For each input slot
+ inputSlots.elements().forEachRemaining { inputSlot =>
+
+ // Access occupant
+ if (inputSlot.get("@id") != null) {
+
+ val inputSlotId = inputSlot.get("@id").asLong()
+ // println(s"Processing input slot with id ${inputSlotId}")
+
+ val outputSlot = inputSlot.get("occupant")
+
+ // Access owner
+ if (outputSlot.get("@id") != null) {
+
+ val outputSlotId = outputSlot.get("@id").asLong
+ // println(s"Processing output slot with id ${outputSlotId}")
+
+ val owner = outputSlot.get("owner")
+
+ // Deserialize the nested owner operator and add it into list to be returned
+ val jsonParser = owner.traverse(mapper)
+ jsonParser.nextToken()
+ val inputOperator = mapper.readValue[Operator](jsonParser, classOf[Operator])
+ inputOperators = inputOperators :+ inputOperator
+
+ // println(s"\tStoring input slot with id ${inputSlotId}")
+ inputSlotOwnerIdMap.get().put(inputSlotId, inputOperator)
+ // println(s"\tStoring output slot with id ${outputSlotId}")
+ outputSlotOwnerIdMap.get().put(outputSlotId, inputOperator)
+ }
+
+ // If owner does not have any fields and is just a number (a Jackson id of an output slot),
+ // then it means we have already parsed that node and associated it to an operator
+ else {
+ val inputOperator = outputSlotOwnerIdMap.get().get(outputSlot.asLong)
+ inputOperator match {
+ case Some(operator) => inputOperators = inputOperators :+ operator
+ case None => throw new WayangException(s"Can't find output slot ${outputSlot.asLong}")
+ }
+ }
+ }
+
+ // If occupant does not have any fields and is just a number a Jackson id of an input slot),
+ // then it means we have already parsed that node and associated it to an operator
+ else {
+ val inputOperator = inputSlotOwnerIdMap.get().get(inputSlot.asLong)
+ inputOperator match {
+ case Some(operator) => inputOperators = inputOperators :+ operator
+ case None => throw new WayangException(s"Can't find input slot ${inputSlot.asLong}")
+ }
+ }
+ }
+ }
+
+ inputOperators
+ }
+
+
+ override def deserializeWithType(p: JsonParser, ctxt: DeserializationContext, typeDeserializer: TypeDeserializer): Operator = {
+ deserialize(p, ctxt)
+ }
+
+
+ override def deserializeWithType(p: JsonParser, ctxt: DeserializationContext, typeDeserializer: TypeDeserializer, intoValue: Operator): Operator = {
+ deserialize(p, ctxt)
+ }
+}
+
+
+object OperatorDeserializer {
+
+ // operator serialization id -> operator
+ private val operatorIdMap: ThreadLocal[mutable.Map[Long, Operator]] = ThreadLocal.withInitial(() => mutable.Map[Long, Operator]())
+
+ // input slot serialization id -> input slot owner
+ private val inputSlotOwnerIdMap: ThreadLocal[mutable.Map[Long, Operator]] = ThreadLocal.withInitial(() => mutable.Map[Long, Operator]())
+
+ // output slot serialization id -> input slot owner
+ private val outputSlotOwnerIdMap: ThreadLocal[mutable.Map[Long, Operator]] = ThreadLocal.withInitial(() => mutable.Map[Long, Operator]())
+
+}
\ No newline at end of file
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/PlatformDeserializer.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/PlatformDeserializer.scala
new file mode 100644
index 0000000..762ca27
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/PlatformDeserializer.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.wayang.api.serialization.customserializers
+
+import com.fasterxml.jackson.core.JsonParser
+import com.fasterxml.jackson.databind.{DeserializationContext, JsonDeserializer}
+import org.apache.wayang.core.api.exception.WayangException
+import org.apache.wayang.core.platform.Platform
+import org.apache.wayang.java.Java
+import org.apache.wayang.postgres.Postgres
+import org.apache.wayang.spark.Spark
+
+class PlatformDeserializer extends JsonDeserializer[Platform]{
+
+ override def deserialize(p: JsonParser, ctxt: DeserializationContext): Platform = {
+ val className = p.getValueAsString
+
+ // TODO: Add all platforms
+ if (className == Java.platform().getClass.getName) {
+ Java.platform()
+ } else if (className == Spark.platform().getClass.getName) {
+ Spark.platform()
+ } else if (className == Postgres.platform().getClass.getName) {
+ Postgres.platform()
+ } else {
+ throw new WayangException(s"Can't deserialize platform: $className")
+ }
+ }
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/PlatformSerializer.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/PlatformSerializer.scala
new file mode 100644
index 0000000..68b203b
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/PlatformSerializer.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.wayang.api.serialization.customserializers
+
+import com.fasterxml.jackson.core.JsonGenerator
+import com.fasterxml.jackson.databind.{JsonSerializer, SerializerProvider}
+import org.apache.wayang.core.platform.Platform
+
+class PlatformSerializer extends JsonSerializer[Platform]{
+
+ def serialize(platform: Platform, jsonGenerator: JsonGenerator, serializerProvider: SerializerProvider): Unit = {
+ jsonGenerator.writeString(platform.getClass.getName)
+ }
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/ConfigurationAndContextMixIns.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/ConfigurationAndContextMixIns.scala
new file mode 100644
index 0000000..00eb473
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/ConfigurationAndContextMixIns.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.serialization.mixins
+
+import com.fasterxml.jackson.annotation.{JsonIdentityInfo, JsonIgnore, JsonSubTypes, JsonTypeInfo, ObjectIdGenerators}
+import org.apache.logging.log4j.Logger
+import org.apache.wayang.api.{MultiContext, DataQuanta, PlanBuilder}
+import org.apache.wayang.core.api.configuration.{CollectionProvider, ExplicitCollectionProvider, KeyValueProvider, MapBasedKeyValueProvider, ValueProvider}
+import org.apache.wayang.core.function.FunctionDescriptor
+import org.apache.wayang.core.mapping.Mapping
+import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
+import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator
+import org.apache.wayang.core.optimizer.channels.ChannelConversion
+import org.apache.wayang.core.optimizer.costs.{LoadProfileEstimator, LoadProfileToTimeConverter, TimeToCostConverter}
+import org.apache.wayang.core.optimizer.enumeration.PlanEnumerationPruningStrategy
+import org.apache.wayang.core.plan.wayangplan.{ExecutionOperator, OutputSlot}
+import org.apache.wayang.core.platform.Platform
+import org.apache.wayang.core.profiling.{CardinalityRepository, InstrumentationStrategy}
+
+import java.util.function.ToDoubleFunction
+
+object ConfigurationAndContextMixIns {
+
+
+ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+ @JsonSubTypes(Array(
+ new JsonSubTypes.Type(value = classOf[MultiContext], name = "MultiContext"),
+ ))
+ abstract class WayangContextMixIn {
+ @JsonIgnore
+ private var logger: Logger = _
+
+ // TODO: Is this okay?
+ @JsonIgnore
+ private var cardinalityRepository: CardinalityRepository = _
+ }
+
+
+
+
+ @JsonIdentityInfo(generator = classOf[ObjectIdGenerators.IntSequenceGenerator], property = "@id")
+ abstract class ConfigurationMixIn {
+ @JsonIgnore
+ private var cardinalityEstimatorProvider: KeyValueProvider[OutputSlot[_], CardinalityEstimator] = _
+
+ @JsonIgnore
+ private var udfSelectivityProvider: KeyValueProvider[FunctionDescriptor, ProbabilisticDoubleInterval] = _
+
+ @JsonIgnore
+ private var operatorLoadProfileEstimatorProvider: KeyValueProvider[ExecutionOperator, LoadProfileEstimator] = _
+
+ @JsonIgnore
+ private var functionLoadProfileEstimatorProvider: KeyValueProvider[FunctionDescriptor, LoadProfileEstimator] = _
+
+ @JsonIgnore
+ private var loadProfileEstimatorCache: MapBasedKeyValueProvider[String, LoadProfileEstimator] = _
+
+ @JsonIgnore
+ private var loadProfileToTimeConverterProvider: KeyValueProvider[Platform, LoadProfileToTimeConverter] = _
+
+ @JsonIgnore
+ private var timeToCostConverterProvider: KeyValueProvider[Platform, TimeToCostConverter] = _
+
+ @JsonIgnore
+ private var costSquasherProvider: ValueProvider[ToDoubleFunction[ProbabilisticDoubleInterval]] = _
+
+ @JsonIgnore
+ private var platformStartUpTimeProvider: KeyValueProvider[Platform, Long] = _
+
+ @JsonIgnore
+ private var platformProvider: ExplicitCollectionProvider[Platform] = _
+
+ @JsonIgnore
+ private var mappingProvider: ExplicitCollectionProvider[Mapping] = _
+
+ @JsonIgnore
+ private var channelConversionProvider: ExplicitCollectionProvider[ChannelConversion] = _
+
+ @JsonIgnore
+ private var pruningStrategyClassProvider: CollectionProvider[Class[PlanEnumerationPruningStrategy]] = _
+
+ @JsonIgnore
+ private var instrumentationStrategyProvider: ValueProvider[InstrumentationStrategy] = _
+ }
+
+ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+ @JsonSubTypes(Array(
+ new JsonSubTypes.Type(value = classOf[MultiContext.TextFileSink], name = "MultiContextTextFileSink"),
+ new JsonSubTypes.Type(value = classOf[MultiContext.ObjectFileSink], name = "MultiContextObjectFileSink"
+ ))
+ )
+ abstract class MultiContextUnarySinkMixIn {
+ }
+
+ abstract class MultiContextPlanBuilderMixIn {
+ @JsonIgnore
+ private var multiContextMap: Map[Long, MultiContext] = _
+
+ @JsonIgnore
+ private var dataQuantaMap: Map[Long, DataQuanta[_]] = _
+
+ @JsonIgnore
+ private var planBuilderMap: Map[Long, PlanBuilder] = _
+ }
+
+}
\ No newline at end of file
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/DataTypeMixIns.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/DataTypeMixIns.scala
new file mode 100644
index 0000000..2f3be61
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/DataTypeMixIns.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.serialization.mixins
+
+import com.fasterxml.jackson.annotation.{JsonCreator, JsonProperty, JsonSubTypes, JsonTypeInfo}
+import org.apache.wayang.basic.types.RecordType
+import org.apache.wayang.core.types.{BasicDataUnitType, DataUnitGroupType, DataUnitType}
+
+object DataTypeMixIns {
+
+
+ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+ @JsonSubTypes(Array(
+ new JsonSubTypes.Type(value = classOf[BasicDataUnitType[_]], name = "BasicDataUnitType"),
+ new JsonSubTypes.Type(value = classOf[DataUnitGroupType[_]], name = "DataUnitGroupType"),
+ ))
+ abstract class DataUnitTypeMixIn {
+ }
+
+ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+ @JsonSubTypes(Array(
+ new JsonSubTypes.Type(value = classOf[RecordType], name = "RecordType"),
+ ))
+ abstract class BasicDataUnitTypeMixIn[T] {
+ @JsonCreator
+ def this(@JsonProperty("typeClass") typeClass: Class[T]) = {
+ this()
+ }
+ }
+
+ abstract class RecordTypeMixIn {
+ @JsonCreator
+ def this(@JsonProperty("fieldNames") fieldNames: Array[String]) = {
+ this()
+ }
+ }
+
+ abstract class DataUnitGroupTypeMixIn[T] {
+ @JsonCreator
+ def this(@JsonProperty("baseType") baseType: DataUnitType[_]) = {
+ this()
+ }
+ }
+
+ abstract class DataSetTypeMixIn[T] {
+ @JsonCreator
+ def this(@JsonProperty("dataUnitType") dataUnitType: DataUnitType[T]) = {
+ this()
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/DescriptorMixIns.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/DescriptorMixIns.scala
new file mode 100644
index 0000000..7f2832d
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/DescriptorMixIns.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.serialization.mixins
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility
+import com.fasterxml.jackson.annotation.{JsonAutoDetect, JsonCreator, JsonProperty, JsonSubTypes, JsonTypeInfo}
+import org.apache.wayang.basic.function.ProjectionDescriptor
+import org.apache.wayang.core.function.FunctionDescriptor.SerializablePredicate
+import org.apache.wayang.core.function.{AggregationDescriptor, ConsumerDescriptor, FlatMapDescriptor, FunctionDescriptor, MapPartitionsDescriptor, PredicateDescriptor, ReduceDescriptor, TransformationDescriptor}
+import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
+import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator
+import org.apache.wayang.core.types.{BasicDataUnitType, DataUnitGroupType}
+
+import java.util
+
+object DescriptorMixIns {
+
+
+ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+ @JsonSubTypes(Array(
+ new JsonSubTypes.Type(value = classOf[AggregationDescriptor[_, _]], name = "AggregationDescriptor"),
+ new JsonSubTypes.Type(value = classOf[ConsumerDescriptor[_]], name = "ConsumerDescriptor"),
+ new JsonSubTypes.Type(value = classOf[FlatMapDescriptor[_, _]], name = "FlatMapDescriptor"),
+ new JsonSubTypes.Type(value = classOf[MapPartitionsDescriptor[_, _]], name = "MapPartitionsDescriptor"),
+ new JsonSubTypes.Type(value = classOf[PredicateDescriptor[_]], name = "PredicateDescriptor"),
+ new JsonSubTypes.Type(value = classOf[ReduceDescriptor[_]], name = "ReduceDescriptor"),
+ new JsonSubTypes.Type(value = classOf[TransformationDescriptor[_, _]], name = "TransformationDescriptor"),
+ ))
+ abstract class FunctionDescriptorMixIn {
+ }
+
+ @JsonAutoDetect(fieldVisibility = Visibility.ANY, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE)
+ abstract class PredicateDescriptorMixIn[Input] {
+ @JsonCreator
+ def this(@JsonProperty("javaImplementation") javaImplementation: SerializablePredicate[Input],
+ @JsonProperty("inputType") inputType: BasicDataUnitType[Input],
+ @JsonProperty("selectivity") selectivity: ProbabilisticDoubleInterval,
+ @JsonProperty("loadProfileEstimator") loadProfileEstimator: LoadProfileEstimator) = {
+ this()
+ }
+ }
+
+ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+ @JsonSubTypes(Array(
+ new JsonSubTypes.Type(value = classOf[ProjectionDescriptor[_, _]], name = "ProjectionDescriptor"),
+ ))
+ @JsonAutoDetect(fieldVisibility = Visibility.ANY, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE)
+ abstract class TransformationDescriptorMixIn[Input, Output] {
+ @JsonCreator def this(@JsonProperty("javaImplementation") javaImplementation: FunctionDescriptor.SerializableFunction[Input, Output],
+ @JsonProperty("inputType") inputType: BasicDataUnitType[Input],
+ @JsonProperty("outputType") outputType: BasicDataUnitType[Output],
+ @JsonProperty("loadProfileEstimator") loadProfileEstimator: LoadProfileEstimator) = {
+ this()
+ }
+ }
+
+ @JsonAutoDetect(fieldVisibility = Visibility.ANY, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE)
+ abstract class ProjectionDescriptorMixIn[Input, Output] {
+ @JsonCreator def this(@JsonProperty("javaImplementation") javaImplementation: FunctionDescriptor.SerializableFunction[Input, Output],
+ @JsonProperty("fieldNames") fieldNames: util.List[String],
+ @JsonProperty("inputType") inputType: BasicDataUnitType[Input],
+ @JsonProperty("outputType") outputType: BasicDataUnitType[Output]) = {
+ this()
+ }
+ }
+
+ @JsonAutoDetect(fieldVisibility = Visibility.ANY, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE)
+ abstract class ReduceDescriptorMixIn[Type] {
+ @JsonCreator
+ def this(@JsonProperty("javaImplementation") javaImplementation: FunctionDescriptor.SerializableBinaryOperator[Type],
+ @JsonProperty("inputType") inputType: DataUnitGroupType[Type],
+ @JsonProperty("outputType") outputType: BasicDataUnitType[Type],
+ @JsonProperty("loadProfileEstimator") loadProfileEstimator: LoadProfileEstimator) = {
+ this()
+ }
+ }
+
+ @JsonAutoDetect(fieldVisibility = Visibility.ANY, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE)
+ abstract class FlatMapDescriptorMixIn[Input, Output] {
+ @JsonCreator
+ def this(@JsonProperty("javaImplementation") javaImplementation: FunctionDescriptor.SerializableFunction[Input, Iterable[Output]],
+ @JsonProperty("inputType") inputType: BasicDataUnitType[Input],
+ @JsonProperty("outputType") outputType: BasicDataUnitType[Output],
+ @JsonProperty("selectivity") selectivity: ProbabilisticDoubleInterval,
+ @JsonProperty("loadProfileEstimator") loadProfileEstimator: LoadProfileEstimator) = {
+ this()
+ }
+ }
+
+ @JsonAutoDetect(fieldVisibility = Visibility.ANY, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE)
+ abstract class MapPartitionsDescriptorMixIn[Input, Output] {
+ @JsonCreator
+ def this(@JsonProperty("javaImplementation") javaImplementation: FunctionDescriptor.SerializableFunction[Iterable[Input], Iterable[Output]],
+ @JsonProperty("inputType") inputType: BasicDataUnitType[Input],
+ @JsonProperty("outputType") outputType: BasicDataUnitType[Output],
+ @JsonProperty("selectivity") selectivity: ProbabilisticDoubleInterval,
+ @JsonProperty("loadProfileEstimator") loadProfileEstimator: LoadProfileEstimator) = {
+ this()
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/EstimatorMixIns.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/EstimatorMixIns.scala
new file mode 100644
index 0000000..37e6bd1
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/EstimatorMixIns.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.serialization.mixins
+
+import com.fasterxml.jackson.annotation.{JsonCreator, JsonProperty, JsonSubTypes, JsonTypeInfo, JsonTypeName}
+import org.apache.wayang.core.api.Configuration
+import org.apache.wayang.core.function.FunctionDescriptor
+import org.apache.wayang.core.function.FunctionDescriptor.SerializableToDoubleBiFunction
+import org.apache.wayang.core.optimizer.cardinality.{AggregatingCardinalityEstimator, CardinalityEstimate, DefaultCardinalityEstimator, FallbackCardinalityEstimator, FixedSizeCardinalityEstimator, SwitchForwardCardinalityEstimator}
+import org.apache.wayang.core.optimizer.costs.{ConstantLoadProfileEstimator, DefaultEstimatableCost, DefaultLoadEstimator, IntervalLoadEstimator, LoadEstimator, NestableLoadProfileEstimator}
+
+object EstimatorMixIns {
+
+
+ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+ @JsonSubTypes(Array(
+ new JsonSubTypes.Type(value = classOf[DefaultLoadEstimator], name = "DefaultLoadEstimator"),
+ new JsonSubTypes.Type(value = classOf[IntervalLoadEstimator], name = "IntervalLoadEstimator"),
+ ))
+ abstract class LoadEstimatorMixIn {
+ }
+
+ abstract class DefaultLoadEstimatorMixIn {
+ @JsonCreator
+ def this(@JsonProperty("numInputs") numInputs: Int,
+ @JsonProperty("numOutputs") numOutputs: Int,
+ @JsonProperty("correctnessProbability") correctnessProbability: Double,
+ @JsonProperty("nullCardinalityReplacement") nullCardinalityReplacement: CardinalityEstimate,
+ @JsonProperty("singlePointFunction") singlePointFunction: LoadEstimator.SinglePointEstimationFunction) = {
+ this()
+ }
+ }
+
+ abstract class CardinalityEstimateMixIn {
+ @JsonCreator
+ def this(@JsonProperty("lowerEstimate") lowerEstimate: Long,
+ @JsonProperty("upperEstimate") upperEstimate: Long,
+ @JsonProperty("correctnessProb") correctnessProb: Double,
+ @JsonProperty("isOverride") isOverride: Boolean) = {
+ this()
+ }
+ }
+
+
+ abstract class ProbabilisticDoubleIntervalMixIn {
+ @JsonCreator
+ def this(@JsonProperty("lowerEstimate") lowerEstimate: Double,
+ @JsonProperty("upperEstimate") upperEstimate: Double,
+ @JsonProperty("correctnessProb") correctnessProb: Double,
+ @JsonProperty("isOverride") isOverride: Boolean) = {
+ this()
+ }
+ }
+
+ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+ @JsonSubTypes(Array(
+ new JsonSubTypes.Type(value = classOf[ConstantLoadProfileEstimator], name = "ConstantLoadProfileEstimator"),
+ new JsonSubTypes.Type(value = classOf[NestableLoadProfileEstimator], name = "NestableLoadProfileEstimator"),
+ ))
+ abstract class LoadProfileEstimatorMixIn {
+ }
+
+ @JsonTypeName("nestableLoadProfileEstimator")
+ abstract class NestableLoadProfileEstimatorMixIn {
+ @JsonCreator
+ def this (@JsonProperty("cpuLoadEstimator") cpuLoadEstimator : LoadEstimator,
+ @JsonProperty("ramLoadEstimator") ramLoadEstimator: LoadEstimator,
+ @JsonProperty("diskLoadEstimator") diskLoadEstimator: LoadEstimator,
+ @JsonProperty("networkLoadEstimator") networkLoadEstimator: LoadEstimator,
+ @JsonProperty("resourceUtilizationEstimator") resourceUtilizationEstimator: SerializableToDoubleBiFunction[Array[Long], Array[Long]],
+ @JsonProperty("overheadMillis") overheadMillis: Long,
+ @JsonProperty("configurationKey") configurationKey: String
+ ) = {
+ this()
+ }
+ }
+
+ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+ @JsonSubTypes(Array(
+ new JsonSubTypes.Type(value = classOf[AggregatingCardinalityEstimator], name = "AggregatingCardinalityEstimator"),
+ new JsonSubTypes.Type(value = classOf[DefaultCardinalityEstimator], name = "DefaultCardinalityEstimator"),
+ new JsonSubTypes.Type(value = classOf[FallbackCardinalityEstimator], name = "FallbackCardinalityEstimator"),
+ new JsonSubTypes.Type(value = classOf[FixedSizeCardinalityEstimator], name = "FixedSizeCardinalityEstimator"),
+ new JsonSubTypes.Type(value = classOf[SwitchForwardCardinalityEstimator], name = "SwitchForwardCardinalityEstimator"),
+ ))
+ abstract class CardinalityEstimatorMixIn {
+ }
+
+ abstract class DefaultCardinalityEstimatorMixIn {
+ @JsonCreator
+ def this(@JsonProperty("certaintyProb") certaintyProb: Double,
+ @JsonProperty("numInputs") numInputs: Int,
+ @JsonProperty("isAllowMoreInputs") isAllowMoreInputs: Boolean,
+ @JsonProperty("singlePointEstimator") singlePointEstimator: FunctionDescriptor.SerializableToLongBiFunction[Array[Long], Configuration]) = {
+ this()
+ }
+ }
+
+ // TODO: Add more estimator mixins
+
+ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+ @JsonSubTypes(Array(
+ new JsonSubTypes.Type(value = classOf[DefaultEstimatableCost], name = "DefaultEstimatableCost"),
+ ))
+ abstract class EstimatableCostMixIn {
+ }
+
+}
\ No newline at end of file
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/IgnoreLoggerMixIn.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/IgnoreLoggerMixIn.scala
new file mode 100644
index 0000000..ad95291
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/IgnoreLoggerMixIn.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.serialization.mixins
+
+import com.fasterxml.jackson.annotation.JsonIgnore
+import org.apache.logging.log4j.Logger
+
+abstract class IgnoreLoggerMixIn {
+ @JsonIgnore
+ private var logger: Logger = _
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/OperatorMixIns.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/OperatorMixIns.scala
new file mode 100644
index 0000000..c53003c
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/OperatorMixIns.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.serialization.mixins
+
+import com.fasterxml.jackson.annotation.{JsonIdentityInfo, JsonIgnore, JsonSubTypes, JsonTypeInfo, ObjectIdGenerators}
+import org.apache.wayang.basic.operators.{CartesianOperator, CoGroupOperator, CollectionSource, CountOperator, DistinctOperator, DoWhileOperator, FilterOperator, FlatMapOperator, GlobalMaterializedGroupOperator, GlobalReduceOperator, GroupByOperator, IntersectOperator, JoinOperator, LocalCallbackSink, MapOperator, MapPartitionsOperator, MaterializedGroupByOperator, ReduceByOperator, ReduceOperator, RepeatOperator, SampleOperator, SortOperator, TextFileSource, UnionAllOperator, ZipWithIdOperator}
+import org.apache.wayang.core.plan.wayangplan.{ActualOperator, BinaryToUnaryOperator, CompositeOperator, ElementaryOperator, ExecutionOperator, LoopHeadOperator, OperatorBase, Subplan, UnarySink, UnarySource, UnaryToUnaryOperator}
+
+object OperatorMixIns {
+
+ @JsonIdentityInfo(generator = classOf[ObjectIdGenerators.IntSequenceGenerator], property = "@id")
+ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+ @JsonSubTypes(Array(
+ new JsonSubTypes.Type(value = classOf[OperatorBase], name = "OperatorBase"),
+ new JsonSubTypes.Type(value = classOf[ActualOperator], name = "ActualOperator"),
+ new JsonSubTypes.Type(value = classOf[CompositeOperator], name = "CompositeOperator"),
+ new JsonSubTypes.Type(value = classOf[LoopHeadOperator], name = "LoopHeadOperator"),
+ ))
+ abstract class OperatorMixIn {
+ }
+
+ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+ @JsonSubTypes(Array(
+ new JsonSubTypes.Type(value = classOf[ElementaryOperator], name = "ElementaryOperator"),
+ new JsonSubTypes.Type(value = classOf[Subplan], name = "Subplan"),
+ ))
+ abstract class ActualOperatorMixIn {
+ }
+
+ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+ @JsonSubTypes(Array(
+ new JsonSubTypes.Type(value = classOf[UnarySource[_]], name = "UnarySource"),
+ new JsonSubTypes.Type(value = classOf[UnarySink[_]], name = "UnarySink"),
+ new JsonSubTypes.Type(value = classOf[UnaryToUnaryOperator[_, _]], name = "UnaryToUnaryOperator"),
+ new JsonSubTypes.Type(value = classOf[BinaryToUnaryOperator[_, _, _]], name = "BinaryToUnaryOperator"),
+ new JsonSubTypes.Type(value = classOf[DoWhileOperator[_, _]], name = "DoWhileOperator"),
+ new JsonSubTypes.Type(value = classOf[RepeatOperator[_]], name = "RepeatOperator"),
+ ))
+ abstract class OperatorBaseMixIn {
+ @JsonIgnore
+ def getOriginal(): ExecutionOperator
+
+ @JsonIgnore
+ private var original: ExecutionOperator = _
+ }
+
+ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+ @JsonSubTypes(Array(
+ new JsonSubTypes.Type(value = classOf[UnarySource[_]], name = "UnarySource"),
+ new JsonSubTypes.Type(value = classOf[UnarySink[_]], name = "UnarySink"),
+ new JsonSubTypes.Type(value = classOf[UnaryToUnaryOperator[_, _]], name = "UnaryToUnaryOperator"),
+ new JsonSubTypes.Type(value = classOf[BinaryToUnaryOperator[_, _, _]], name = "BinaryToUnaryOperator"),
+ new JsonSubTypes.Type(value = classOf[DoWhileOperator[_, _]], name = "DoWhileOperator"),
+ new JsonSubTypes.Type(value = classOf[RepeatOperator[_]], name = "RepeatOperator"),
+ ))
+ abstract class ElementaryOperatorMixIn {
+ }
+
+ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+ @JsonSubTypes(Array(
+ new JsonSubTypes.Type(value = classOf[TextFileSource], name = "TextFileSource"),
+ new JsonSubTypes.Type(value = classOf[CollectionSource[_]], name = "CollectionSource"),
+ ))
+ abstract class UnarySourceMixIn[T] {
+ }
+
+ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+ @JsonSubTypes(Array(
+ new JsonSubTypes.Type(value = classOf[LocalCallbackSink[_]], name = "LocalCallbackSink"),
+ ))
+ abstract class UnarySinkMixIn[T] {
+ }
+
+ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+ @JsonSubTypes(Array(
+ new JsonSubTypes.Type(value = classOf[MapOperator[_, _]], name = "MapOperator"),
+ new JsonSubTypes.Type(value = classOf[MapPartitionsOperator[_, _]], name = "MapPartitionsOperator"),
+ new JsonSubTypes.Type(value = classOf[FilterOperator[_]], name = "FilterOperator"),
+ new JsonSubTypes.Type(value = classOf[FlatMapOperator[_, _]], name = "FlatMapOperator"),
+ new JsonSubTypes.Type(value = classOf[SampleOperator[_]], name = "SampleOperator"),
+ new JsonSubTypes.Type(value = classOf[ReduceByOperator[_, _]], name = "ReduceByOperator"),
+ new JsonSubTypes.Type(value = classOf[MaterializedGroupByOperator[_, _]], name = "MaterializedGroupByOperator"),
+ new JsonSubTypes.Type(value = classOf[GlobalReduceOperator[_]], name = "GlobalReduceOperator"),
+ new JsonSubTypes.Type(value = classOf[GlobalMaterializedGroupOperator[_]], name = "GlobalMaterializedGroupOperator"),
+ new JsonSubTypes.Type(value = classOf[GroupByOperator[_, _]], name = "GroupByOperator"),
+ new JsonSubTypes.Type(value = classOf[ReduceOperator[_]], name = "ReduceOperator"),
+ new JsonSubTypes.Type(value = classOf[SortOperator[_, _]], name = "SortOperator"),
+ new JsonSubTypes.Type(value = classOf[ZipWithIdOperator[_]], name = "ZipWithIdOperator"),
+ new JsonSubTypes.Type(value = classOf[DistinctOperator[_]], name = "DistinctOperator"),
+ new JsonSubTypes.Type(value = classOf[CountOperator[_]], name = "CountOperator"),
+ ))
+ abstract class UnaryToUnaryOperatorMixIn[InputType, OutputType] {
+ }
+
+ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+ @JsonSubTypes(Array(
+ new JsonSubTypes.Type(value = classOf[CartesianOperator[_, _]], name = "CartesianOperator"),
+ new JsonSubTypes.Type(value = classOf[UnionAllOperator[_]], name = "UnionAllOperator"),
+ new JsonSubTypes.Type(value = classOf[IntersectOperator[_]], name = "IntersectOperator"),
+ new JsonSubTypes.Type(value = classOf[JoinOperator[_, _, _]], name = "JoinOperator"),
+ new JsonSubTypes.Type(value = classOf[CoGroupOperator[_, _, _]], name = "CoGroupOperator"),
+ ))
+ abstract class BinaryToUnaryOperatorMixIn[InputType0, InputType1, OutputType] {
+ }
+
+ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+ @JsonSubTypes(Array(
+ new JsonSubTypes.Type(value = classOf[DoWhileOperator[_, _]], name = "DoWhileOperator"),
+ new JsonSubTypes.Type(value = classOf[RepeatOperator[_]], name = "RepeatOperator"),
+ ))
+ abstract class LoopHeadOperatorMixIn {
+ }
+
+ @JsonIdentityInfo(generator = classOf[ObjectIdGenerators.IntSequenceGenerator], property = "@id")
+ abstract class OperatorPatternMixin {
+ }
+
+}
\ No newline at end of file
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/ProviderMixIns.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/ProviderMixIns.scala
new file mode 100644
index 0000000..50c0c22
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/ProviderMixIns.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.serialization.mixins
+
+import com.fasterxml.jackson.annotation.{JsonCreator, JsonIdentityInfo, JsonIgnore, JsonProperty, JsonSetter, JsonSubTypes, JsonTypeInfo, ObjectIdGenerators}
+import org.apache.logging.log4j.Logger
+import org.apache.wayang.core.api.Configuration
+import org.apache.wayang.core.api.configuration.{ExplicitCollectionProvider, FunctionalCollectionProvider, FunctionalKeyValueProvider, KeyValueProvider, MapBasedKeyValueProvider}
+
+import java.util.function.BiFunction
+
+object ProviderMixIns {
+
+ @JsonIdentityInfo(generator = classOf[ObjectIdGenerators.IntSequenceGenerator], property = "@id")
+ abstract class FunctionalKeyValueProviderMixIn[Key, Value] {
+ @JsonCreator
+ def this(@JsonProperty("parent") parent: KeyValueProvider[Key, Value],
+ @JsonProperty("configuration") configuration: Configuration,
+ @JsonProperty("providerFunction") providerFunction: BiFunction[Key, KeyValueProvider[Key, Value], Value]) = {
+ this()
+ }
+ }
+
+ @JsonIdentityInfo(generator = classOf[ObjectIdGenerators.IntSequenceGenerator], property = "@id")
+ abstract class MapBasedKeyValueProviderMixIn[Key, Value] {
+ @JsonSetter("storedValues")
+ private def setStoredValues(storedValues: Map[Key, Value]): Unit = {}
+
+ @JsonCreator
+ def this(@JsonProperty("parent") parent: KeyValueProvider[Key, Value],
+ @JsonProperty("configuration") configuration: Configuration,
+ @JsonProperty("isCaching") isCaching: Boolean) = {
+ this()
+ }
+ }
+
+ @JsonIdentityInfo(generator = classOf[ObjectIdGenerators.IntSequenceGenerator], property = "@id")
+ abstract class ConstantValueProviderMixIn {
+ }
+
+ @JsonIdentityInfo(generator = classOf[ObjectIdGenerators.IntSequenceGenerator], property = "@id")
+ abstract class ExplicitCollectionProviderMixIn {
+ @JsonIgnore
+ private var logger: Logger = _
+ }
+
+ @JsonIdentityInfo(generator = classOf[ObjectIdGenerators.IntSequenceGenerator], property = "@id")
+ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+ @JsonSubTypes(Array(
+ new JsonSubTypes.Type(value = classOf[FunctionalKeyValueProvider[_, _]], name = "FunctionalKeyValueProvider"),
+ new JsonSubTypes.Type(value = classOf[MapBasedKeyValueProvider[_, _]], name = "MapBasedKeyValueProvider"
+ ))
+ )
+ abstract class KeyValueProviderMixIn {
+ @JsonIgnore
+ private var logger: Logger = _
+ }
+
+ @JsonIdentityInfo(generator = classOf[ObjectIdGenerators.IntSequenceGenerator], property = "@id")
+ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+ @JsonSubTypes(Array(
+ new JsonSubTypes.Type(value = classOf[ExplicitCollectionProvider[_]], name = "ExplicitCollectionProvider"),
+ new JsonSubTypes.Type(value = classOf[FunctionalCollectionProvider[_]], name = "FunctionalCollectionProvider"
+ ))
+ )
+ abstract class CollectionProviderMixIn {
+ }
+
+}
\ No newline at end of file
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/SlotMixIns.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/SlotMixIns.scala
new file mode 100644
index 0000000..66f8895
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/SlotMixIns.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.serialization.mixins
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility
+import com.fasterxml.jackson.annotation.{JsonAutoDetect, JsonIdentityInfo, JsonIgnore, JsonSubTypes, JsonTypeInfo, ObjectIdGenerators}
+import org.apache.wayang.core.plan.wayangplan.{InputSlot, OutputSlot}
+
+import java.util.List
+
+object SlotMixIns {
+
+ @JsonIdentityInfo(generator = classOf[ObjectIdGenerators.IntSequenceGenerator], property = "@id")
+ @JsonAutoDetect(fieldVisibility = Visibility.ANY, getterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
+ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+ @JsonSubTypes(Array(
+ new JsonSubTypes.Type(value = classOf[InputSlot[_]], name = "InputSlot"),
+ new JsonSubTypes.Type(value = classOf[OutputSlot[_]], name = "OutputSlot"),
+ ))
+ abstract class SlotMixIn[T] {
+
+ }
+
+ abstract class OutputSlotMixIn[T] {
+ @JsonIgnore
+ private var occupiedSlots: List[InputSlot[T]] = _
+ }
+
+}
\ No newline at end of file
diff --git a/wayang-api/wayang-api-scala-java/src/test/scala/org/apache/wayang/api/serialization/OperatorSerializationTests.scala b/wayang-api/wayang-api-scala-java/src/test/scala/org/apache/wayang/api/serialization/OperatorSerializationTests.scala
new file mode 100644
index 0000000..6dff434
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/test/scala/org/apache/wayang/api/serialization/OperatorSerializationTests.scala
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.wayang.api.serialization
+
+import org.apache.wayang.api.{MultiContext, PlanBuilder, createPlanBuilder}
+import org.apache.wayang.core.api.{Configuration, WayangContext}
+import org.apache.wayang.java.Java
+import org.apache.wayang.postgres.Postgres
+import org.apache.wayang.postgres.operators.PostgresTableSource
+import org.apache.wayang.sqlite3.Sqlite3
+import org.apache.wayang.sqlite3.operators.Sqlite3TableSource
+import org.junit.Test
+
+import java.io.{File, PrintWriter}
+import java.sql.{Connection, Statement}
+import scala.collection.convert.ImplicitConversions.`iterable AsScalaIterable`
+
+
+class OperatorSerializationTests extends SerializationTestBase {
+
+ @Test
+ def testReadMapCollect(): Unit = {
+ // Set up WayangContext.
+ val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+
+ // Generate some test data.
+ val inputValues = (for (i <- 1 to 10) yield i).toArray
+
+ // Build and execute a Wayang plan.
+ val dq = wayang
+ .loadCollection(inputValues).withName("Load input values")
+ .map(_ + 2).withName("Add 2")
+
+ dq.operator.getTargetPlatforms
+
+ // Check the outcome.
+ val expectedOutputValues = inputValues.map(_ + 2).map(_.toString).toList
+ serializeDeserializeExecuteAssert(dq.operator, wayang, expectedOutputValues)
+ }
+
+ @Test
+ def testFilterDistinctCount(): Unit = {
+ // Set up WayangContext.
+ val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+
+ // Generate some test data.
+ val inputValues = List(1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7, 8, 8)
+
+ // Build and execute a Wayang plan.
+ val dq = wayang
+ .loadCollection(inputValues)
+ .filter(n => n >= 4)
+ .distinct
+ .count
+
+ // Check the outcome.
+ val expected = List("5")
+ serializeDeserializeExecuteAssert(dq.operator, wayang, expected)
+ }
+
+ @Test
+ def testReduce(): Unit = {
+ // Set up WayangContext.
+ val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+
+ // Generate some test data.
+ val inputValues = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
+
+ // Build and execute a Wayang plan.
+ val dq = wayang
+ .loadCollection(inputValues)
+ .reduce((a, b) => a + b)
+
+ // Check the outcome.
+ val expected = List("55")
+ serializeDeserializeExecuteAssert(dq.operator, wayang, expected)
+ }
+
+ @Test
+ def testWordCount(): Unit = {
+ val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+
+ val inputValues = Array("Big data is big.", "Is data big data?")
+
+ val dq = wayang
+ .loadCollection(inputValues).withName("Load input values")
+ .flatMap(_.split("\\s+")).withName("Split words")
+ .map(_.replaceAll("\\W+", "").toLowerCase).withName("To lowercase")
+ .map((_, 1)).withName("Attach counter")
+ .reduceByKey(_._1, (a, b) => (a._1, a._2 + b._2)).withName("Sum counters")
+
+ val expectedWordCounts = List(("big", 3), ("data", 3), ("is", 2)).map(_.toString())
+ serializeDeserializeExecuteAssert(dq.operator, wayang, expectedWordCounts)
+ }
+
+ @Test
+ def testGroupBy(): Unit = {
+ val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+
+ val inputValues = Array(1, 2, 3, 4, 5, 7, 8, 9, 10)
+
+ val dq = wayang
+ .loadCollection(inputValues)
+ .groupByKey(_ % 2).withName("group odd and even")
+ .map {
+ group =>
+
+ val buffer = group.toBuffer
+ buffer.sortBy(identity)
+ if (buffer.size % 2 == 0) (buffer(buffer.size / 2 - 1) + buffer(buffer.size / 2)) / 2
+ else buffer(buffer.size / 2)
+ }.withName("median")
+
+ val expectedOutputValues = List("6", "5")
+ serializeDeserializeExecuteAssert(dq.operator, wayang, expectedOutputValues)
+ }
+
+ @Test
+ def testSort(): Unit = {
+ val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+
+ val inputValues1 = Array(3, 4, 5, 2, 1)
+
+ val builder = new PlanBuilder(wayang)
+ val dataQuanta1 = builder.loadCollection(inputValues1)
+ val dq = dataQuanta1
+ .sort(r => r)
+
+ val expectedValues = List(1, 2, 3, 4, 5).map(_.toString)
+ serializeDeserializeExecuteAssert(dq.operator, wayang, expectedValues)
+ }
+
+ @Test
+ def testMapPartitions(): Unit = {
+ val wayang = new WayangContext().withPlugin(Java.basicPlugin())
+
+ val dq = wayang
+ .loadCollection(Seq(0, 1, 2, 3, 4, 6, 8))
+ .mapPartitions { ints =>
+ var (numOdds, numEvens) = (0, 0)
+ ints.foreach(i => if ((i & 1) == 0) numEvens += 1 else numOdds += 1)
+ Seq(("odd", numOdds), ("even", numEvens))
+ }
+ .reduceByKey(_._1, { case ((kind1, count1), (kind2, count2)) => (kind1, count1 + count2) })
+
+ val expectedValues = List(("even", 5), ("odd", 2)).map(_.toString())
+ serializeDeserializeExecuteAssert(dq.operator, wayang, expectedValues)
+ }
+
+ @Test
+ def testZipWithId(): Unit = {
+ val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+
+ val inputValues = for (i <- 0 until 100; j <- 0 until 42) yield i
+
+ val dq = wayang
+ .loadCollection(inputValues)
+ .zipWithId
+ .groupByKey(_.field1)
+ .map { group =>
+
+ (group.map(_.field0).toSet.size, 1)
+ }
+ .reduceByKey(_._1, (t1, t2) => (t1._1, t1._2 + t2._2))
+
+ val expectedValues = List((42, 100)).map(_.toString())
+ serializeDeserializeExecuteAssert(dq.operator, wayang, expectedValues)
+ }
+
+ // Function to create a temp file and write content to it
+ def createTempFile(content: String, prefix: String): File = {
+ val tempFile = File.createTempFile(prefix, ".txt")
+ val writer = new PrintWriter(tempFile)
+ try {
+ writer.write(content.trim)
+ } finally {
+ writer.close()
+ }
+ tempFile
+ }
+
+ @Test
+ def testJoin(): Unit = {
+ val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+
+ // Contents for the temp files
+ val content1 =
+ """
+ |Water, 0
+ |Tonic, 5
+ |Juice, 10
+ """.stripMargin
+
+ val content2 =
+ """
+ |Apple juice, Juice
+ |Tap water, Water
+ |Orange juice, Juice
+ """.stripMargin
+
+ // Create temp files
+ val tempFile1 = createTempFile(content1, "testFile1")
+ val tempFile2 = createTempFile(content2, "testFile2")
+
+ // Build initial data quanta
+ val builder = new PlanBuilder(wayang)
+ val dataQuanta1 = builder.readTextFile(s"file://$tempFile1")
+ .map(s => {
+ val values = s.split(", ")
+ (values(0), values(1).toInt)
+ })
+ val dataQuanta2 = builder.readTextFile(s"file://$tempFile2")
+ .map(s => {
+ val values = s.split(", ")
+ (values(0), values(1))
+ })
+
+ // Join
+ val dq = dataQuanta1
+ .join[(String, String), String](_._1, dataQuanta2, _._2)
+ .map(joinTuple => (joinTuple.field1._1, joinTuple.field0._2))
+
+ // Assert output
+ val expectedValues = List(("Apple juice", 10), ("Tap water", 0), ("Orange juice", 10))
+ .map(s => s.toString())
+ serializeDeserializeExecuteAssert(dq.operator, wayang, expectedValues)
+
+ // Clean up: Delete the temp files
+ tempFile1.delete()
+ tempFile2.delete()
+ }
+
+ @Test
+ def testJoin2(): Unit = {
+ val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+
+ val inputValues1 = Array(1, 3, 2, 4, 5)
+ val inputValues2 = Array(3, 5)
+
+ val builder = new PlanBuilder(wayang)
+ val dataQuanta1 = builder.loadCollection(inputValues1)
+ val dataQuanta2 = builder.loadCollection(inputValues2)
+ val dq = dataQuanta1
+ .join[Int, Int](n => n, dataQuanta2, n => n)
+ .map(joinTuple => joinTuple.field0)
+
+ val expectedValues = List("3", "5")
+ serializeDeserializeExecuteAssert(dq.operator, wayang, expectedValues)
+ }
+
+ @Test
+ def testCoGroup(): Unit = {
+ val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+
+ // Contents for the temp files
+ val content1 =
+ """
+ |Water, 0
+ |Cola, 5
+ |Juice, 10
+ """.stripMargin
+
+ val content2 =
+ """
+ |Apple juice, Juice
+ |Tap water, Water
+ |Orange juice, Juice
+ """.stripMargin
+
+ // Create temp files
+ val tempFile1 = createTempFile(content1, "testFile1")
+ val tempFile2 = createTempFile(content2, "testFile2")
+
+ // Build initial data quanta
+ val builder = new PlanBuilder(wayang)
+ val dataQuanta1 = builder.readTextFile(s"file://$tempFile1")
+ .map(s => {
+ val values = s.split(", ")
+ (values(0), values(1).toInt)
+ })
+ val dataQuanta2 = builder.readTextFile(s"file://$tempFile2")
+ .map(s => {
+ val values = s.split(", ")
+ (values(0), values(1))
+ })
+
+ // Co-group
+ val dq = dataQuanta1
+ .coGroup[(String, String), String](_._1, dataQuanta2, _._2)
+
+ // Assert output
+ val expectedValues = List(
+ "([(Water,0)], [(Tap water,Water)])",
+ "([(Juice,10)], [(Apple juice,Juice), (Orange juice,Juice)])",
+ "([(Cola,5)], [])"
+ )
+ serializeDeserializeExecuteAssert(dq.operator, wayang, expectedValues)
+
+ // Clean up: Delete the temp files
+ tempFile1.delete()
+ tempFile2.delete()
+ }
+
+
+ @Test
+ def testUnion(): Unit = {
+ val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+
+ val inputValues1 = Array(1, 2, 3, 4)
+ val inputValues2 = Array(0, 1, 3, 5)
+
+ val builder = new PlanBuilder(wayang)
+ val dataQuanta1 = builder.loadCollection(inputValues1)
+ val dataQuanta2 = builder.loadCollection(inputValues2)
+ val dq = dataQuanta1.union(dataQuanta2)
+
+ val unionExpectedValues = List(1, 2, 3, 4, 0, 1, 3, 5).map(_.toString)
+ serializeDeserializeExecuteAssert(dq.operator, wayang, unionExpectedValues)
+ }
+
+ @Test
+ def testIntersect(): Unit = {
+ val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+
+ val inputValues1 = Array(1, 2, 3, 4, 5, 7, 8, 9, 10)
+ val inputValues2 = Array(9, 0, 2, 3, 3, 4, 5, 7, 8, 11)
+
+ val builder = new PlanBuilder(wayang)
+ val dataQuanta1 = builder.loadCollection(inputValues1)
+ val dataQuanta2 = builder.loadCollection(inputValues2)
+ val dq = dataQuanta1.intersect(dataQuanta2)
+
+ val intersectExpectedValues = List(2, 3, 4, 5, 7, 8, 9).map(_.toString)
+ serializeDeserializeExecuteAssert(dq.operator, wayang, intersectExpectedValues)
+ }
+
+ @Test
+ def testRepeat(): Unit = {
+ val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+
+ val inputValues = Array(1, 2)
+
+ val dq = wayang
+ .loadCollection(inputValues)
+ .repeat(3,
+ _.reduce(_ * _)
+ .flatMap(v => Seq(v, v + 1))
+ )
+
+ // initial: 1,2 -> 1st: 2,3 -> 2nd: 6,7 => 3rd: 42,43
+ val expectedValues = List("42", "43")
+ serializeDeserializeExecuteAssert(dq.operator, wayang, expectedValues)
+ }
+
+ @Test
+ def testRepeat2(): Unit = {
+ val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+
+ val inputValues = Array(1, 2)
+
+ val dq = wayang
+ .loadCollection(inputValues)
+ .repeat(3,
+ _.reduce(_ * _)
+ .flatMap(v => Seq(v, v + 1))
+ )
+ .filter(n => n == 43)
+
+ // initial: 1,2 -> 1st: 2,3 -> 2nd: 6,7 => 3rd: 42,43
+ val expectedValues = List("43")
+ serializeDeserializeExecuteAssert(dq.operator, wayang, expectedValues)
+ }
+
+ @Test
+ def testRepeat3(): Unit = {
+ val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+
+ val inputValues = Array(1, 2, 3, 4, 5)
+
+ val dq = wayang
+ .loadCollection(inputValues)
+ .repeat(10, _.map(_ + 1))
+
+ val expectedValues = List("11", "12", "13", "14", "15")
+ serializeDeserializeExecuteAssert(dq.operator, wayang, expectedValues)
+ }
+
+ @Test
+ def testDoWhile(): Unit = {
+ val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+
+ val inputValues = Array(1, 2)
+
+ val dq = wayang
+ .loadCollection(inputValues)
+ .doWhile[Int](vals => vals.max > 100, {
+ start =>
+ val sum = start.reduce(_ + _).withName("Sum")
+ (start.union(sum), sum)
+ })
+
+ val expectedValues = List(1, 2, 3, 6, 12, 24, 48, 96, 192).map(_.toString)
+ serializeDeserializeExecuteAssert(dq.operator, wayang, expectedValues)
+ }
+
+ @Test
+ def testDoWhile2(): Unit = {
+ val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+
+ val inputValues = Array(1, 2)
+
+ val dq = wayang
+ .loadCollection(inputValues)
+ .doWhile[Int](vals => vals.max > 100, {
+ start =>
+ val sum = start.reduce(_ + _).withName("Sum")
+ (start.union(sum), sum)
+ })
+ .filter(n => n > 50)
+
+ val expectedValues = List(96, 192).map(_.toString)
+ serializeDeserializeExecuteAssert(dq.operator, wayang, expectedValues)
+ }
+
+ @Test
+ def testSample(): Unit = {
+ val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+
+ val inputValues = for (i <- 0 until 100) yield i
+
+ val dq = wayang
+ .loadCollection(inputValues)
+ .sample(10)
+
+ val tempFilenameOut = serializeDeserializeExecute(dq.operator, wayang)
+ SerializationTestBase.assertOutputFileLineCount(tempFilenameOut, 10)
+ }
+
+ @Test
+ def testSqlite3(): Unit = {
+ // Initialize some test data.
+ val configuration = new Configuration
+ val sqlite3dbFile = File.createTempFile("wayang-sqlite3", "db")
+ sqlite3dbFile.deleteOnExit()
+ configuration.setProperty("wayang.sqlite3.jdbc.url", "jdbc:sqlite:" + sqlite3dbFile.getAbsolutePath)
+
+ try {
+ val connection: Connection = Sqlite3.platform.createDatabaseDescriptor(configuration).createJdbcConnection
+ try {
+ val statement: Statement = connection.createStatement
+ statement.addBatch("DROP TABLE IF EXISTS customer;")
+ statement.addBatch("CREATE TABLE customer (name TEXT, age INT);")
+ statement.addBatch("INSERT INTO customer VALUES ('John', 20)")
+ statement.addBatch("INSERT INTO customer VALUES ('Timmy', 16)")
+ statement.addBatch("INSERT INTO customer VALUES ('Evelyn', 35)")
+ statement.executeBatch()
+ } finally {
+ if (connection != null) connection.close()
+ }
+ }
+
+ // Set up WayangContext.
+ val wayang = new MultiContext(configuration).withPlugin(Java.basicPlugin).withPlugin(Sqlite3.plugin)
+
+ // Build plan
+ val dq = wayang
+ .readTable(new Sqlite3TableSource("customer", "name", "age"))
+ .filter(r => r.getField(1).asInstanceOf[Integer] >= 18, sqlUdf = "age >= 18").withTargetPlatforms(Java.platform)
+ .projectRecords(Seq("name"))
+ .map(_.getField(0).asInstanceOf[String])
+
+ // Execute and assert
+ val expectedValues = List("John", "Evelyn")
+ serializeDeserializeExecuteAssert(dq.operator, wayang, expectedValues)
+ }
+
+// @Test
+ def testPostgres(): Unit = {
+ // Initialize some test data.
+ val configuration = new Configuration
+ configuration.setProperty("wayang.postgres.jdbc.url", "jdbc:postgresql://localhost:5432/test_erp")
+ configuration.setProperty("wayang.postgres.jdbc.user", "postgres")
+ configuration.setProperty("wayang.postgres.jdbc.password", "1234")
+
+ // Set up WayangContext.
+ val wayang = new MultiContext(configuration).withPlugin(Java.basicPlugin).withPlugin(Postgres.plugin)
+
+ // Build plan
+ val dq = wayang
+ .readTable(new PostgresTableSource("clients", "first_name", "role"))
+ .projectRecords(Seq("first_name"))
+ .map(_.getField(0).asInstanceOf[String]).withTargetPlatforms(Java.platform)
+
+ // Execute and assert
+ val expectedValues = List("John", "Jane")
+ serializeDeserializeExecuteAssert(dq.operator, wayang, expectedValues)
+ }
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/test/scala/org/apache/wayang/api/serialization/OtherSerializationTests.scala b/wayang-api/wayang-api-scala-java/src/test/scala/org/apache/wayang/api/serialization/OtherSerializationTests.scala
new file mode 100644
index 0000000..1f01b2f
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/test/scala/org/apache/wayang/api/serialization/OtherSerializationTests.scala
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.wayang.api.serialization
+
+import org.apache.wayang.api.{MultiContext, MultiContextPlanBuilder, PlanBuilder, createPlanBuilder, toCardinalityEstimator, toLoadEstimator}
+import org.apache.wayang.basic.operators.{MapOperator, TextFileSink}
+import org.apache.wayang.core.api.{Configuration, WayangContext}
+import org.apache.wayang.core.optimizer.costs._
+import org.apache.wayang.core.plan.wayangplan.{Operator, WayangPlan}
+import org.apache.wayang.core.platform.Platform
+import org.apache.wayang.core.util.ReflectionUtils
+import org.apache.wayang.java.Java
+import org.apache.wayang.spark.Spark
+import org.junit.{Assert, Test}
+
+import java.nio.file.{Files, Paths}
+
+
+class OtherSerializationTests extends SerializationTestBase {
+
+ @Test
+ def multiContextSerializationTest(): Unit = {
+ val configuration = new Configuration()
+ configuration.setProperty("spark.master", "random_master_url_1")
+ configuration.setProperty("spark.app.name", "random_app_name_2")
+ val multiContext = new MultiContext(configuration).withPlugin(Spark.basicPlugin()).withTextFileSink("file:///tmp/out11")
+
+ try {
+ val serializedConfiguration = SerializationUtils.serialize(configuration)
+ val deserializedConfiguration = SerializationUtils.deserialize[Configuration](serializedConfiguration)
+ Assert.assertEquals(deserializedConfiguration.getStringProperty("spark.master"), "random_master_url_1")
+ Assert.assertEquals(deserializedConfiguration.getStringProperty("spark.app.name"), "random_app_name_2")
+
+ val serializedMultiContext = SerializationUtils.serialize(multiContext)
+ val deserializedMultiContext = SerializationUtils.deserialize[MultiContext](serializedMultiContext)
+ Assert.assertEquals(deserializedMultiContext.getConfiguration.getStringProperty("spark.master"), "random_master_url_1")
+ Assert.assertEquals(deserializedMultiContext.getConfiguration.getStringProperty("spark.app.name"), "random_app_name_2")
+ Assert.assertEquals(deserializedMultiContext.getSink.get.asInstanceOf[MultiContext.TextFileSink].url, "file:///tmp/out11")
+ Assert.assertArrayEquals(multiContext.getConfiguration.getPlatformProvider.provideAll().toArray, deserializedMultiContext.getConfiguration.getPlatformProvider.provideAll().toArray)
+ } catch {
+ case t: Throwable =>
+ t.printStackTrace()
+ throw t
+ }
+ }
+
+
+ @Test
+ def planBuilderSerializationTest(): Unit = {
+ val configuration1 = new Configuration()
+ configuration1.setProperty("spark.master", "master1")
+
+ val context1 = new MultiContext(configuration1).withPlugin(Spark.basicPlugin()).withTextFileSink("file:///tmp/out11")
+
+ val planBuilder = new PlanBuilder(context1)
+ .withUdfJarsOf(classOf[OtherSerializationTests])
+ .withUdfJars("Aaa", "Bbb", "Ccc")
+
+ try {
+ val serialized = SerializationUtils.serializeAsString(planBuilder)
+ val deserialized = SerializationUtils.deserializeFromString[PlanBuilder](serialized)
+ // SerializationTestBase.log(SerializationUtils.serializeAsString(deserialized), testName.getMethodName + ".log.json")
+
+ Assert.assertEquals(
+ planBuilder.udfJars,
+ deserialized.udfJars
+ )
+ Assert.assertEquals(
+ deserialized.wayangContext.asInstanceOf[MultiContext].getConfiguration.getStringProperty("spark.master"),
+ "master1"
+ )
+ Assert.assertEquals(
+ deserialized.wayangContext.asInstanceOf[MultiContext].getSink.get.asInstanceOf[MultiContext.TextFileSink].url,
+ "file:///tmp/out11"
+ )
+ }
+ catch {
+ case t: Throwable =>
+ t.printStackTrace()
+ throw t
+ }
+ }
+
+
+ @Test
+ def multiContextPlanBuilderSerializationTest(): Unit = {
+ val configuration1 = new Configuration()
+ configuration1.setProperty("spark.master", "master1")
+ val configuration2 = new Configuration()
+ configuration2.setProperty("spark.master", "master2")
+
+ val context1 = new MultiContext(configuration1).withPlugin(Spark.basicPlugin()).withTextFileSink("file:///tmp/out11")
+ val context2 = new MultiContext(configuration2).withPlugin(Spark.basicPlugin()).withObjectFileSink("file:///tmp/out12")
+
+ val multiContextPlanBuilder = new MultiContextPlanBuilder(List(context1, context2))
+ .withUdfJarsOf(classOf[OtherSerializationTests])
+ .withUdfJars("Aaa", "Bbb", "Ccc")
+
+ try {
+ val serialized = SerializationUtils.serializeAsString(multiContextPlanBuilder)
+ val deserialized = SerializationUtils.deserializeFromString[MultiContextPlanBuilder](serialized)
+ // SerializationTestBase.log(SerializationUtils.serializeAsString(deserialized), testName.getMethodName + ".log.json")
+
+ Assert.assertEquals(
+ multiContextPlanBuilder.udfJars,
+ deserialized.udfJars
+ )
+ Assert.assertEquals(
+ multiContextPlanBuilder.multiContexts(0).getConfiguration.getStringProperty("spark.master"),
+ "master1"
+ )
+ Assert.assertEquals(
+ multiContextPlanBuilder.multiContexts(1).getConfiguration.getStringProperty("spark.master"),
+ "master2"
+ )
+ Assert.assertEquals(
+ multiContextPlanBuilder.multiContexts(0).getSink.get.asInstanceOf[MultiContext.TextFileSink].url,
+ "file:///tmp/out11"
+ )
+ Assert.assertEquals(
+ multiContextPlanBuilder.multiContexts(1).getSink.get.asInstanceOf[MultiContext.ObjectFileSink].url,
+ "file:///tmp/out12"
+ )
+ }
+ catch {
+ case t: Throwable =>
+ t.printStackTrace()
+ throw t
+ }
+ }
+
+
+ @Test
+ def serializeToTempFileTest(): Unit = {
+ // Define configuration
+ val configuration = new Configuration()
+ val wayangContext = new WayangContext(configuration)
+ .withPlugin(Java.basicPlugin())
+ val planBuilder = new PlanBuilder(wayangContext)
+ .withUdfJarsOf(classOf[OtherSerializationTests])
+
+ // Define plan
+ val dataQuanta = planBuilder
+ .loadCollection(List("12345", "12345678", "1234567890", "1234567890123"))
+ .map(s => s + " Wayang out")
+ .map(s => (s, "AAAA", "BBBB"))
+ .map(s => List(s._1, "a", "b", "c"))
+ .filter(s => s.head.length > 20)
+ .map(s => s.head)
+
+ val tempfile = TempFileUtils.writeToTempFileAsString(dataQuanta.operator)
+ val operator = TempFileUtils.readFromTempFileFromString[Operator](tempfile)
+
+ // Attach an output sink to deserialized plan
+ val tempFileOut = s"/tmp/${testName.getMethodName}.out"
+ val sink = new TextFileSink[AnyRef](s"file://$tempFileOut", classOf[AnyRef])
+ operator.connectTo(0, sink, 0)
+
+ // Execute plan
+ val plan = new WayangPlan(sink)
+ wayangContext.execute(plan, ReflectionUtils.getDeclaringJar(classOf[OtherSerializationTests]))
+
+ // Check results
+ val expectedLines = List("1234567890 Wayang out", "1234567890123 Wayang out")
+ SerializationTestBase.assertOutputFile(tempFileOut, expectedLines)
+ }
+
+
+ @Test
+ def multiDataQuantaExecuteTest(): Unit = {
+
+ try {
+ // Create multi contexts
+ val out1 = Files.createTempFile("out1", "tmp").toString
+ val out2 = Files.createTempFile("out2", "tmp").toString
+ val context1 = new MultiContext(new Configuration()).withPlugin(Java.basicPlugin()).withTextFileSink(s"file://$out1")
+ val context2 = new MultiContext(new Configuration()).withPlugin(Java.basicPlugin()).withTextFileSink(s"file://$out2")
+
+ // Create multiContextPlanBuilder
+ val multiContextPlanBuilder = new MultiContextPlanBuilder(List(context1, context2))
+ .withUdfJarsOf(classOf[OtherSerializationTests])
+
+ // Build and execute plan
+ multiContextPlanBuilder
+ .forEach(_.loadCollection(List("aaabbb", "aaabbbccc", "aaabbbcccddd", "aaabbbcccdddeee")))
+ .forEach(_.map(s => s + " Wayang out."))
+ .forEach(_.filter(s => s.length > 20))
+ .execute()
+
+ // Check results
+ val expectedLines = List("aaabbbccc Wayang out.", "aaabbbcccddd Wayang out.", "aaabbbcccdddeee Wayang out.")
+ SerializationTestBase.assertOutputFile(out1, expectedLines)
+ SerializationTestBase.assertOutputFile(out2, expectedLines)
+
+ // Delete temp files after usage
+ Files.deleteIfExists(Paths.get(out1))
+ Files.deleteIfExists(Paths.get(out2))
+ }
+ catch {
+ case t: Throwable =>
+ t.printStackTrace()
+ throw t
+ }
+ }
+
+
+ @Test
+ def platformSerializationTest(): Unit = {
+ try {
+ val serialized = SerializationUtils.serialize(Java.platform())
+ val deserialized = SerializationUtils.deserialize[Platform](serialized)
+ Assert.assertEquals(deserialized.getClass.getName, Java.platform().getClass.getName)
+ } catch {
+ case t: Throwable =>
+ t.printStackTrace()
+ throw t
+ }
+ }
+
+
+ @Test
+ def targetPlatformsTest(): Unit = {
+ val configuration = new Configuration()
+ val wayangContext = new WayangContext(configuration)
+ .withPlugin(Java.basicPlugin())
+ val planBuilder = new PlanBuilder(wayangContext)
+ .withUdfJarsOf(classOf[OtherSerializationTests])
+
+ val dataQuanta = planBuilder
+ .loadCollection(List("12345", "12345678", "1234567890", "1234567890123"))
+ .map(s => s + " Wayang out").withTargetPlatforms(Spark.platform()).withTargetPlatforms(Java.platform())
+
+ try {
+ val serialized = SerializationUtils.serializeAsString(dataQuanta.operator)
+ val deserialized = SerializationUtils.deserializeFromString[Operator](serialized)
+ Assert.assertEquals(deserialized.getTargetPlatforms.size(), 2)
+ val deserializedPlatformNames = deserialized.getTargetPlatforms.toArray.map(p => p.getClass.getName)
+ Assert.assertTrue(deserializedPlatformNames.contains(Spark.platform().getClass.getName))
+ Assert.assertTrue(deserializedPlatformNames.contains(Java.platform().getClass.getName))
+ } catch {
+ case t: Throwable =>
+ t.printStackTrace()
+ throw t
+ }
+ }
+
+
+ @Test
+ def targetPlatforms2Test(): Unit = {
+ val configuration = new Configuration()
+ val wayangContext = new WayangContext(configuration)
+ .withPlugin(Java.basicPlugin())
+ val planBuilder = new PlanBuilder(wayangContext)
+ .withUdfJarsOf(classOf[OtherSerializationTests])
+
+ val inputValues1 = Array("Big data is big.", "Is data big data?")
+ val dataQuanta = planBuilder
+ .loadCollection(inputValues1)
+ .flatMap(_.split("\\s+"))
+ .map(_.replaceAll("\\W+", "").toLowerCase)
+ .map((_, 1))
+ .reduceByKey(_._1, (a, b) => (a._1, a._2 + b._2))
+ .withTargetPlatforms(Spark.platform())
+
+ try {
+ val serialized = SerializationUtils.serializeAsString(dataQuanta.operator)
+ val deserialized = SerializationUtils.deserializeFromString[Operator](serialized)
+ Assert.assertEquals(deserialized.getTargetPlatforms.size(), 1)
+ Assert.assertEquals(deserialized.getTargetPlatforms.toArray.toList(0).getClass.getName, Spark.platform().getClass.getName)
+ } catch {
+ case t: Throwable =>
+ t.printStackTrace()
+ throw t
+ }
+ }
+
+
+ @Test
+ def testLoadProfileEstimator(): Unit = {
+ val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+
+ // Create load estimator
+ val loadProfileEstimator: LoadProfileEstimator = new NestableLoadProfileEstimator(
+ (in: Long, _: Long) => 10 * in,
+ (_: Long, _: Long) => 1000L
+ )
+
+ // Create map operator with load profile estimator
+ val dq1 = wayang.loadCollection(List(1, 2, 3))
+ .map(_ + 1, udfLoad = loadProfileEstimator)
+
+ // Serialize and then deserialize the map operator
+ var deserialized: Operator = null
+ try {
+ val serialized = SerializationUtils.serializeAsString(dq1.operator)
+ deserialized = SerializationUtils.deserializeFromString[Operator](serialized)
+ }
+ catch {
+ case e: Exception =>
+ e.printStackTrace()
+ throw e
+ }
+
+ // Check if the load profile estimators are equal
+ val originalLoadProfileEstimator = dq1.operator.asInstanceOf[MapOperator[Any, Any]]
+ .getFunctionDescriptor
+ .getLoadProfileEstimator.get().asInstanceOf[NestableLoadProfileEstimator]
+ val deserializedLoadProfileEstimator = deserialized.asInstanceOf[MapOperator[Any, Any]]
+ .getFunctionDescriptor
+ .getLoadProfileEstimator.get().asInstanceOf[NestableLoadProfileEstimator]
+
+ Assert.assertEquals(originalLoadProfileEstimator.getConfigurationKeys, deserializedLoadProfileEstimator.getConfigurationKeys)
+ Assert.assertEquals(originalLoadProfileEstimator.getTemplateKeys, deserializedLoadProfileEstimator.getTemplateKeys)
+
+ /*// Print the contents of configuration keys array for both the originalLoadProfileEstimator and the deserializedLoadProfileEstimator
+ println("originalLoadProfileEstimator.getConfigurationKeys: " + originalLoadProfileEstimator.getConfigurationKeys.mkString(","))
+ println("deserializedLoadProfileEstimator.getConfigurationKeys: " + deserializedLoadProfileEstimator.getConfigurationKeys.mkString(","))
+
+ // Print the contents of template keys array for both the originalLoadProfileEstimator and the deserializedLoadProfileEstimator
+ println("originalLoadProfileEstimator.getTemplateKeys: " + originalLoadProfileEstimator.getTemplateKeys.mkString(","))
+ println("deserializedLoadProfileEstimator.getTemplateKeys: " + deserializedLoadProfileEstimator.getTemplateKeys.mkString(","))
+
+ // Print the toString representation of both the originalLoadProfileEstimator and the deserializedLoadProfileEstimator
+ println("originalLoadProfileEstimator.toString: " + originalLoadProfileEstimator.toString)
+ println("deserializedLoadProfileEstimator.toString: " + deserializedLoadProfileEstimator.toString)*/
+
+ }
+
+
+ @Test
+ def testCardinalityEstimator(): Unit = {
+ val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+
+ // Create map operator with load profile estimator
+ val dq1 = wayang.loadCollection(List(1, 2, 3))
+ .map(_ + 1)
+ .withCardinalityEstimator((in: Long) => math.round(in * 0.01))
+
+ // Serialize and then deserialize the map operator
+ var deserialized: Operator = null
+ try {
+ val serialized = SerializationUtils.serializeAsString(dq1.operator)
+ deserialized = SerializationUtils.deserializeFromString[Operator](serialized)
+ }
+ catch {
+ case e: Exception =>
+ e.printStackTrace()
+ throw e
+ }
+
+ }
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/test/scala/org/apache/wayang/api/serialization/SerializationTestBase.scala b/wayang-api/wayang-api-scala-java/src/test/scala/org/apache/wayang/api/serialization/SerializationTestBase.scala
new file mode 100644
index 0000000..5b83798
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/test/scala/org/apache/wayang/api/serialization/SerializationTestBase.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.wayang.api.serialization
+
+import org.apache.wayang.basic.operators.TextFileSink
+import org.apache.wayang.core.api.WayangContext
+import org.apache.wayang.core.plan.wayangplan.{LoopHeadOperator, Operator, WayangPlan}
+import org.apache.wayang.core.util.ReflectionUtils
+import org.junit.rules.TestName
+import org.junit.{Assert, Rule}
+
+import java.io.{File, FileWriter}
+import java.nio.file.{Files, Paths}
+import java.util.stream.Collectors
+import scala.jdk.CollectionConverters.asScalaBufferConverter
+
+
+trait SerializationTestBase {
+
+ //
+ // Some magic from https://stackoverflow.com/a/36152864/5589918 in order to get the current test name
+ //
+ var _testName: TestName = new TestName
+
+ @Rule
+ def testName: TestName = _testName
+
+ def testName_=(aTestName: TestName): Unit = {
+ _testName = aTestName
+ }
+
+
+ def serializeDeserializeExecuteAssert(operator: Operator, wayangContext: WayangContext, expectedLines: List[String], log: Boolean = false): Unit = {
+ var tempFileOut: Option[String] = None
+ try {
+ tempFileOut = Some(serializeDeserializeExecute(operator, wayangContext, log)) // Serialize, deserialize, execute
+ SerializationTestBase.assertOutputFile(tempFileOut.get, expectedLines) // Check results
+ }
+ catch {
+ case t: Throwable =>
+ t.printStackTrace()
+ throw t
+ }
+ finally {
+ // Delete tempFileOut if it exists
+ tempFileOut match {
+ case Some(file) => new java.io.File(file).delete()
+ case None => // Do nothing
+ }
+ }
+ }
+
+ def serializeDeserializeExecute(operator: Operator, wayangContext: WayangContext, log: Boolean = false): String = {
+ try {
+ val serialized = SerializationUtils.serializeAsString(operator)
+ if (log) SerializationTestBase.log(serialized, testName.getMethodName + ".log.json")
+ val deserialized = SerializationUtils.deserializeFromString[Operator](serialized)
+
+ // Create an output sink
+ val outType = deserialized.getOutput(0).getType.getDataUnitType.getTypeClass
+ val tempFilenameOut = s"/tmp/${testName.getMethodName}.out"
+ val sink = new TextFileSink(s"file://$tempFilenameOut", outType)
+
+ // And attach it to the deserialized operator
+ deserialized match {
+ case loopHeadOperator: LoopHeadOperator => loopHeadOperator.connectTo(1, sink, 0)
+ case operator: Operator => operator.connectTo(0, sink, 0)
+ }
+
+ // Execute plan
+ val plan = new WayangPlan(sink)
+ wayangContext.execute(plan, ReflectionUtils.getDeclaringJar(classOf[OperatorSerializationTests]))
+ tempFilenameOut
+ }
+ catch {
+ case t: Throwable =>
+ t.printStackTrace()
+ throw t
+ }
+ }
+
+}
+
+object SerializationTestBase {
+
+ def assertOutputFile(outputFilename: String, expectedLines: List[String]): Unit = {
+
+ // Read lines
+ val lines = Files.lines(Paths.get(outputFilename)).collect(Collectors.toList[String]).asScala
+
+ // Assert number of lines
+ Assert.assertEquals("Number of lines in the file should match", expectedLines.size, lines.size)
+
+ // Assert content of lines
+ lines.zip(expectedLines).foreach { case (actual, expected) =>
+ Assert.assertEquals("Line content should match", expected, actual)
+ }
+ }
+
+ def assertOutputFileLineCount(outputFilename: String, expectedNumberOfLines: Int): Unit = {
+
+ // Read lines
+ val lines = Files.lines(Paths.get(outputFilename)).collect(Collectors.toList[String]).asScala
+
+ // Assert number of lines
+ Assert.assertEquals("Number of lines in the file should match", expectedNumberOfLines, lines.size)
+ }
+
+
+ def log(text: String, filename: String = "customLogFile.json"): Unit = {
+
+ // Get the user's desktop path
+ val desktopPath = System.getProperty("user.home") + "/Desktop"
+
+ // Specify the filename for your custom file
+ val customFile = new File(desktopPath, filename)
+
+ // Create the file if it doesn't exist, or overwrite it if it does
+ customFile.createNewFile() // This will not harm the existing file
+
+ // Write to the temp file using a FileWriter
+ val writer = new FileWriter(customFile)
+ try {
+ writer.write(text)
+ } finally {
+ writer.close()
+ }
+
+ // Log the path of the temporary file
+ println(s"Temp file created at: ${customFile.getAbsolutePath}")
+ }
+
+}
\ No newline at end of file
diff --git a/wayang-benchmark/src/main/scala/org/apache/wayang/async/apps/WordCount.scala b/wayang-benchmark/src/main/scala/org/apache/wayang/async/apps/WordCount.scala
new file mode 100644
index 0000000..1298052
--- /dev/null
+++ b/wayang-benchmark/src/main/scala/org/apache/wayang/async/apps/WordCount.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.wayang.async.apps
+
+import org.apache.wayang.api.async.DataQuantaImplicits._
+import org.apache.wayang.api.async.PlanBuilderImplicits._
+import org.apache.wayang.api.{MultiContext, DataQuanta, PlanBuilder}
+import org.apache.wayang.java.Java
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+object WordCount {
+
+ def main(args: Array[String]): Unit = {
+ println("WordCount")
+ println("Scala version:")
+ println(scala.util.Properties.versionString)
+
+ val planBuilder1 = new PlanBuilder(new MultiContext().withPlugin(Java.basicPlugin())).withUdfJarsOf(this.getClass)
+ val planBuilder2 = new PlanBuilder(new MultiContext().withPlugin(Java.basicPlugin())).withUdfJarsOf(this.getClass)
+ val planBuilder3 = new PlanBuilder(new MultiContext().withPlugin(Java.basicPlugin())).withUdfJarsOf(this.getClass)
+
+ val result1 = planBuilder1
+ .loadCollection(List(1, 2, 3, 4, 5))
+ .map(_ * 1)
+ .runAsync(tempFileOut = "file:///tmp/out1.temp")
+
+ val result2 = planBuilder2
+ .loadCollection(List(6, 7, 8, 9, 10))
+ .filter(_ <= 8)
+ .runAsync(tempFileOut = "file:///tmp/out2.temp")
+
+ val dq1: DataQuanta[Int] = planBuilder1.loadAsync(result1)
+ val dq2: DataQuanta[Int] = planBuilder1.loadAsync(result2)
+ val result3 = dq1.union(dq2)
+ .map(_ * 3)
+ .filter(_ < 100)
+ .runAsync(tempFileOut = "file:///tmp/out3.temp", result1, result2)
+
+ val result4 = planBuilder3
+ .loadCollection(List(1, 2, 3, 4, 5))
+ .filter(_ >= 2)
+ .runAsync(tempFileOut = "file:///tmp/out4.temp")
+
+ val dq3: DataQuanta[Int] = planBuilder1.loadAsync(result3)
+ val dq4: DataQuanta[Int] = planBuilder1.loadAsync(result4)
+ val result5 = dq3.intersect(dq4)
+ .map(_ * 4)
+ .writeTextFileAsync(url = "file:///tmp/out5.final", result3, result4)
+
+ Await.result(result5, Duration.Inf)
+ println("DONE!")
+
+ }
+
+}
+
diff --git a/wayang-benchmark/src/main/scala/org/apache/wayang/multicontext/apps/package.scala b/wayang-benchmark/src/main/scala/org/apache/wayang/multicontext/apps/package.scala
new file mode 100644
index 0000000..bcef35f
--- /dev/null
+++ b/wayang-benchmark/src/main/scala/org/apache/wayang/multicontext/apps/package.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.multicontext
+
+import org.apache.wayang.core.api.Configuration
+
+package object apps {
+
+ def loadConfig(args: Array[String]): (Configuration, Configuration) = {
+ if (args.length < 2) {
+ println("Loading default configurations.")
+ (new Configuration(), new Configuration())
+ } else {
+ println("Loading custom configurations.")
+ (loadConfigFromUrl(args(0)), loadConfigFromUrl(args(1)))
+ }
+ }
+
+ private def loadConfigFromUrl(url: String): Configuration = {
+ try {
+ new Configuration(url)
+ } catch {
+ case unexpected: Exception =>
+ unexpected.printStackTrace()
+ println(s"Can't load configuration from $url")
+ new Configuration()
+ }
+ }
+
+}
diff --git a/wayang-benchmark/src/main/scala/org/apache/wayang/multicontext/apps/tpch/Query1.scala b/wayang-benchmark/src/main/scala/org/apache/wayang/multicontext/apps/tpch/Query1.scala
new file mode 100644
index 0000000..5239dcd
--- /dev/null
+++ b/wayang-benchmark/src/main/scala/org/apache/wayang/multicontext/apps/tpch/Query1.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.wayang.multicontext.apps.tpch
+
+import org.apache.wayang.api.{MultiContext, MultiContextPlanBuilder}
+import org.apache.wayang.apps.tpch.CsvUtils
+import org.apache.wayang.apps.tpch.data.LineItem
+import org.apache.wayang.java.Java
+import org.apache.wayang.spark.Spark
+import org.apache.wayang.apps.tpch.queries.{Query1 => Query1Utils}
+import org.apache.wayang.multicontext.apps.loadConfig
+
+class Query1 {}
+
+object Query1 {
+
+ def main(args: Array[String]): Unit = {
+
+ if (args.length != 2 && args.length != 4) {
+ println("Usage: <lineitem-url> <delta> <optional-path-to-config-1> <optional-path-to-config-2>")
+ System.exit(1)
+ }
+
+ println("TPC-H querying #1 in multi context wayang!")
+ println("Scala version:")
+ println(scala.util.Properties.versionString)
+
+ val (configuration1, configuration2) = loadConfig(args.drop(2))
+
+ val context1 = new MultiContext(configuration1)
+ .withPlugin(Java.basicPlugin())
+ .withPlugin(Spark.basicPlugin())
+ .withTextFileSink("file:///tmp/out11")
+ val context2 = new MultiContext(configuration2)
+ .withPlugin(Java.basicPlugin())
+ .withPlugin(Spark.basicPlugin())
+ .withTextFileSink("file:///tmp/out12")
+
+ val multiContextPlanBuilder = new MultiContextPlanBuilder(List(context1, context2)).withUdfJarsOf(classOf[Query1])
+
+ // Example structure of lineitem file:
+ // 1|155190|7706|1|17|21168.23|0.04|0.02|N|O|1996-03-13|1996-02-12|1996-03-22|DELIVER IN PERSON|TRUCK|egular courts above the
+ // 1|67310|7311|2|36|45983.16|0.09|0.06|N|O|1996-04-12|1996-02-28|1996-04-20|TAKE BACK RETURN|MAIL|ly final dependencies: slyly bold
+ // ...
+ val lineItemFile = args(0)
+ val delta = args(1).toInt
+
+ multiContextPlanBuilder
+
+ // Load lineitem file
+ .forEach(_.readTextFile(lineItemFile))
+
+ // Parse
+ .forEach(_.map(s => LineItem.parseCsv(s)))
+
+ // Filter line items
+ .forEach(_.filter(t => t.shipDate <= CsvUtils.parseDate("1998-12-01") - delta))
+
+ // Project line items
+ .forEach(_.map(t => (t.returnFlag, t.lineStatus, t.quantity, t.extendedPrice, t.discount, t.tax)))
+
+ // Calculate result fields
+ .forEach(_.map { case (returnFlag, lineStatus, quantity, extendedPrice, discount, tax) =>
+ Query1Utils.Result(
+ returnFlag.toString,
+ lineStatus.toString,
+ quantity,
+ extendedPrice,
+ extendedPrice * (1 - discount),
+ extendedPrice * (1 - discount) * (1 + tax),
+ quantity,
+ extendedPrice,
+ discount,
+ 1
+ )
+ })
+
+ // Aggregate line items
+ .forEach(_.reduceByKey(
+ result => (result.l_returnflag, result.l_linestatus),
+ (r1, r2) => Query1Utils.Result(
+ r1.l_returnflag,
+ r1.l_linestatus,
+ r1.sum_qty + r2.sum_qty,
+ r1.sum_base_price + r2.sum_base_price,
+ r1.sum_disc_price + r2.sum_disc_price,
+ r1.sum_charge + r2.sum_charge,
+ r1.avg_qty + r2.avg_qty,
+ r1.avg_price + r2.avg_price,
+ r1.avg_disc + r2.avg_disc,
+ r1.count_order + r2.count_order
+ )
+ ))
+
+ // Post-process line items aggregates
+ .forEach(_.map(result => Query1Utils.Result(
+ result.l_returnflag,
+ result.l_linestatus,
+ result.sum_qty,
+ result.sum_base_price,
+ result.sum_disc_price,
+ result.sum_charge,
+ result.avg_qty / result.count_order,
+ result.avg_price / result.count_order,
+ result.avg_disc / result.count_order,
+ result.count_order
+ )))
+
+ // Execute
+ .execute()
+
+
+ }
+}
diff --git a/wayang-benchmark/src/main/scala/org/apache/wayang/multicontext/apps/wordcount/WordCount.scala b/wayang-benchmark/src/main/scala/org/apache/wayang/multicontext/apps/wordcount/WordCount.scala
new file mode 100644
index 0000000..c87145a
--- /dev/null
+++ b/wayang-benchmark/src/main/scala/org/apache/wayang/multicontext/apps/wordcount/WordCount.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.wayang.multicontext.apps.wordcount
+
+import org.apache.wayang.api.{MultiContext, MultiContextPlanBuilder}
+import org.apache.wayang.multicontext.apps.loadConfig
+import org.apache.wayang.java.Java
+import org.apache.wayang.spark.Spark
+
+class WordCount {}
+
+object WordCount {
+
+ def main(args: Array[String]): Unit = {
+ println("WordCount")
+ println("Scala version:")
+ println(scala.util.Properties.versionString)
+
+ val (configuration1, configuration2) = loadConfig(args)
+
+ val context1 = new MultiContext(configuration1)
+ .withPlugin(Spark.basicPlugin())
+ .withTextFileSink("file:///tmp/out11")
+ val context2 = new MultiContext(configuration2)
+ .withPlugin(Spark.basicPlugin())
+ .withTextFileSink("file:///tmp/out12")
+
+ val multiContextPlanBuilder = new MultiContextPlanBuilder(List(context1, context2))
+ .withUdfJarsOf(this.getClass)
+
+ // Generate some test data
+ val inputValues = Array("Big data is big.", "Is data big data?")
+
+
+ // Build and execute a word count
+ multiContextPlanBuilder.forEach(_
+ .loadCollection(inputValues)
+ .flatMap(_.split("\\s+"))
+ .map(_.replaceAll("\\W+", "").toLowerCase)
+ .map((_, 1))
+ .reduceByKey(_._1, (a, b) => (a._1, a._2 + b._2))
+ ).execute()
+
+ /*.forEach(_.loadCollection(inputValues))
+ .forEach(_.flatMap(_.split("\\s+")))
+ .forEach(_.map(_.replaceAll("\\W+", "").toLowerCase))
+ .forEach(_.map((_, 1)))
+ .forEach(_.reduceByKey(_._1, (a, b) => (a._1, a._2 + b._2)))
+ .execute()*/
+
+ }
+
+}
+
diff --git a/wayang-benchmark/src/main/scala/org/apache/wayang/multicontext/apps/wordcount/WordCountCombineEach.scala b/wayang-benchmark/src/main/scala/org/apache/wayang/multicontext/apps/wordcount/WordCountCombineEach.scala
new file mode 100644
index 0000000..850ceba
--- /dev/null
+++ b/wayang-benchmark/src/main/scala/org/apache/wayang/multicontext/apps/wordcount/WordCountCombineEach.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.wayang.multicontext.apps.wordcount
+
+import org.apache.wayang.api.{MultiContext, DataQuanta, MultiContextPlanBuilder}
+import org.apache.wayang.java.Java
+import org.apache.wayang.multicontext.apps.loadConfig
+
+object WordCountCombineEach {
+
+ def main(args: Array[String]): Unit = {
+ println("WordCountCombineEach")
+ println("Scala version:")
+ println(scala.util.Properties.versionString)
+
+ val (configuration1, configuration2) = loadConfig(args)
+
+ val context1 = new MultiContext(configuration1)
+ .withPlugin(Java.basicPlugin())
+ .withTextFileSink("file:///tmp/out11")
+ val context2 = new MultiContext(configuration2)
+ .withPlugin(Java.basicPlugin())
+ .withTextFileSink("file:///tmp/out12")
+
+ val multiContextPlanBuilder = new MultiContextPlanBuilder(List(context1, context2))
+ .withUdfJarsOf(this.getClass)
+
+ // Generate some test data
+ val inputValues = Array("Big data is big.", "Is data big data?")
+
+ // Build and execute a word count
+ val dq1 = multiContextPlanBuilder
+ .forEach(_.loadCollection(inputValues))
+ .forEach(_.flatMap(_.split("\\s+")))
+ .forEach(_.map(_.replaceAll("\\W+", "").toLowerCase))
+ .forEach(_.map((_, 1)))
+ .forEach(_.reduceByKey(_._1, (a, b) => (a._1, a._2 + b._2)))
+
+ val dq2 = multiContextPlanBuilder
+ .forEach(_.loadCollection(inputValues))
+ .forEach(_.flatMap(_.split("\\s+")))
+ .forEach(_.map(_.replaceAll("\\W+", "").toLowerCase))
+ .forEach(_.map((_, 1)))
+ .forEach(_.reduceByKey(_._1, (a, _) => (a._1, 100)))
+
+ dq1.combineEach(dq2, (dq1: DataQuanta[(String, Int)], dq2: DataQuanta[(String, Int)]) => dq1.union(dq2))
+ .forEach(_.map(t => (t._1 + " wayang out", t._2)))
+ .execute()
+ }
+
+}
+
diff --git a/wayang-benchmark/src/main/scala/org/apache/wayang/multicontext/apps/wordcount/WordCountWithMerge.scala b/wayang-benchmark/src/main/scala/org/apache/wayang/multicontext/apps/wordcount/WordCountWithMerge.scala
new file mode 100644
index 0000000..a391e94
--- /dev/null
+++ b/wayang-benchmark/src/main/scala/org/apache/wayang/multicontext/apps/wordcount/WordCountWithMerge.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.wayang.multicontext.apps.wordcount
+
+import org.apache.wayang.api.{MultiContext, MultiContextPlanBuilder}
+import org.apache.wayang.core.api.{Configuration, WayangContext}
+import org.apache.wayang.java.Java
+import org.apache.wayang.multicontext.apps.loadConfig
+import org.apache.wayang.spark.Spark
+
+object WordCountWithMerge {
+
+ def main(args: Array[String]): Unit = {
+ println("WordCountWithMerge")
+ println("Scala version:")
+ println(scala.util.Properties.versionString)
+
+ val (configuration1, configuration2) = loadConfig(args)
+
+ val context1 = new MultiContext(configuration1)
+ .withPlugin(Java.basicPlugin())
+ .withMergeFileSink("file:///tmp/out11") // The mergeContext will read the output of context 1 from here
+ val context2 = new MultiContext(configuration2)
+ .withPlugin(Java.basicPlugin())
+ .withMergeFileSink("file:///tmp/out12") // The mergeContext will read the output of context 2 from here
+
+ val multiContextPlanBuilder = new MultiContextPlanBuilder(List(context1, context2))
+ .withUdfJarsOf(this.getClass)
+
+ // To be used after merging the previous two
+ val mergeContext = new WayangContext(new Configuration())
+ .withPlugin(Java.basicPlugin())
+
+ // Generate some test data
+ val inputValues1 = Array("Big data is big.", "Is data big data?")
+ val inputValues2 = Array("Big big data is big big.", "Is data big data big?")
+
+ // Build and execute a word count in 2 different contexts
+ multiContextPlanBuilder
+ .loadCollection(context1, inputValues1)
+ .loadCollection(context2, inputValues2)
+ .forEach(_.flatMap(_.split("\\s+")))
+ .forEach(_.map(_.replaceAll("\\W+", "").toLowerCase))
+ .forEach(_.map((_, 1)))
+ .forEach(_.reduceByKey(_._1, (a, b) => (a._1, a._2 + b._2)))
+
+ // Merge contexts with union operator
+ .mergeUnion(mergeContext)
+
+ // Continue processing merged DataQuanta
+ .filter(_._2 >= 3)
+ .reduceByKey(_._1, (t1, t2) => (t1._1, t1._2 + t2._2))
+
+ // Write out
+ // Writes:
+ // (big,9)
+ // (data,6)
+ .writeTextFile("file:///tmp/out1.merged", s => s.toString())
+
+ }
+
+}
diff --git a/wayang-benchmark/src/main/scala/org/apache/wayang/multicontext/apps/wordcount/WordCountWithTargetPlatforms.scala b/wayang-benchmark/src/main/scala/org/apache/wayang/multicontext/apps/wordcount/WordCountWithTargetPlatforms.scala
new file mode 100644
index 0000000..5e7d185
--- /dev/null
+++ b/wayang-benchmark/src/main/scala/org/apache/wayang/multicontext/apps/wordcount/WordCountWithTargetPlatforms.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.wayang.multicontext.apps.wordcount
+
+import org.apache.wayang.api.{MultiContext, MultiContextPlanBuilder}
+import org.apache.wayang.java.Java
+import org.apache.wayang.multicontext.apps.loadConfig
+import org.apache.wayang.spark.Spark
+
+object WordCountWithTargetPlatforms {
+
+ def main(args: Array[String]): Unit = {
+ println("WordCountWithTargetPlatforms")
+ println("Scala version:")
+ println(scala.util.Properties.versionString)
+
+ val (configuration1, configuration2) = loadConfig(args)
+
+ val context1 = new MultiContext(configuration1)
+ .withPlugin(Java.basicPlugin())
+ .withPlugin(Spark.basicPlugin())
+ .withTextFileSink("file:///tmp/out11")
+ val context2 = new MultiContext(configuration2)
+ .withPlugin(Java.basicPlugin())
+ .withPlugin(Spark.basicPlugin())
+ .withTextFileSink("file:///tmp/out12")
+
+ val multiContextPlanBuilder = new MultiContextPlanBuilder(List(context1, context2))
+ .withUdfJarsOf(this.getClass)
+
+ // Generate some test data
+ val inputValues1 = Array("Big data is big.", "Is data big data?")
+ val inputValues2 = Array("Big big data is big big.", "Is data big data big?")
+
+ multiContextPlanBuilder
+ .loadCollection(context1, inputValues1)
+ .loadCollection(context2, inputValues2)
+
+ .forEach(_.flatMap(_.split("\\s+")))
+ .withTargetPlatforms(context1, Spark.platform())
+ .withTargetPlatforms(context2, Java.platform())
+
+ .forEach(_.map(_.replaceAll("\\W+", "").toLowerCase))
+ .withTargetPlatforms(Java.platform())
+
+ .forEach(_.map((_, 1)))
+
+ .forEach(_.reduceByKey(_._1, (a, b) => (a._1, a._2 + b._2)))
+ .withTargetPlatforms(context1, Spark.platform())
+ .execute()
+ }
+}
\ No newline at end of file
diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/SampleOperator.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/SampleOperator.java
index 50db0d7..c5a9c14 100644
--- a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/SampleOperator.java
+++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/SampleOperator.java
@@ -20,6 +20,7 @@
import org.apache.commons.lang3.Validate;
import org.apache.wayang.core.api.Configuration;
+import org.apache.wayang.core.function.FunctionDescriptor;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator;
import org.apache.wayang.core.optimizer.cardinality.FixedSizeCardinalityEstimator;
@@ -29,8 +30,6 @@
import org.apache.logging.log4j.Logger;
import java.util.Optional;
-import java.util.function.IntUnaryOperator;
-import java.util.function.LongUnaryOperator;
/**
* A random sample operator randomly selects its inputs from the input slot and pushes that element to the output slot.
@@ -78,12 +77,12 @@
/**
* This function determines the sample size by the number of iterations.
*/
- protected IntUnaryOperator sampleSizeFunction;
+ protected FunctionDescriptor.SerializableIntUnaryOperator sampleSizeFunction;
/**
* This function optionally determines the seed by the number of iterations.
*/
- protected LongUnaryOperator seedFunction;
+ protected FunctionDescriptor.SerializableLongUnaryOperator seedFunction;
/**
* Size of the dataset to be sampled or {@value #UNKNOWN_DATASET_SIZE} if a dataset size is not known.
@@ -108,7 +107,7 @@
* @param sampleSizeFunction user-specified size of the sample in dependence of the current iteration number
* @param type {@link DataSetType} of the sampled dataset
*/
- public SampleOperator(IntUnaryOperator sampleSizeFunction, DataSetType<Type> type) {
+ public SampleOperator(FunctionDescriptor.SerializableIntUnaryOperator sampleSizeFunction, DataSetType<Type> type) {
this(sampleSizeFunction, type, Methods.ANY, iterationNumber -> randomSeed());
}
@@ -122,21 +121,21 @@
/**
* Creates a new instance given the sample size and the method.
*/
- public SampleOperator(IntUnaryOperator sampleSizeFunction, DataSetType<Type> type, Methods sampleMethod) {
+ public SampleOperator(FunctionDescriptor.SerializableIntUnaryOperator sampleSizeFunction, DataSetType<Type> type, Methods sampleMethod) {
this(sampleSizeFunction, type, sampleMethod, iterationNumber -> randomSeed());
}
/**
* Creates a new instance given a user-defined sample size.
*/
- public SampleOperator(IntUnaryOperator sampleSizeFunction, DataSetType<Type> type, Methods sampleMethod, long seed) {
+ public SampleOperator(FunctionDescriptor.SerializableIntUnaryOperator sampleSizeFunction, DataSetType<Type> type, Methods sampleMethod, long seed) {
this(sampleSizeFunction, type, sampleMethod, iterationNumber -> seed);
}
/**
* Creates a new instance given user-defined sample size and seed methods.
*/
- public SampleOperator(IntUnaryOperator sampleSizeFunction, DataSetType<Type> type, Methods sampleMethod, LongUnaryOperator seedFunction) {
+ public SampleOperator(FunctionDescriptor.SerializableIntUnaryOperator sampleSizeFunction, DataSetType<Type> type, Methods sampleMethod, FunctionDescriptor.SerializableLongUnaryOperator seedFunction) {
super(type, type, true);
this.sampleSizeFunction = sampleSizeFunction;
this.sampleMethod = sampleMethod;
@@ -186,7 +185,7 @@
this.sampleMethod = sampleMethod;
}
- public void setSeedFunction(LongUnaryOperator seedFunction) {
+ public void setSeedFunction(FunctionDescriptor.SerializableLongUnaryOperator seedFunction) {
this.seedFunction = seedFunction;
}
diff --git a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/function/FunctionDescriptor.java b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/function/FunctionDescriptor.java
index 18d13e0..c78d004 100644
--- a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/function/FunctionDescriptor.java
+++ b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/function/FunctionDescriptor.java
@@ -25,11 +25,7 @@
import java.io.Serializable;
import java.util.Optional;
-import java.util.function.BiFunction;
-import java.util.function.BinaryOperator;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.function.Predicate;
+import java.util.function.*;
/**
* A function operates on single data units or collections of those.
@@ -140,4 +136,29 @@
public interface ExtendedSerializableConsumer<T> extends SerializableConsumer<T>, ExtendedFunction{
}
+
+ @FunctionalInterface
+ public interface SerializableIntUnaryOperator extends IntUnaryOperator, Serializable {
+
+ }
+
+ @FunctionalInterface
+ public interface SerializableLongUnaryOperator extends LongUnaryOperator, Serializable {
+
+ }
+
+ @FunctionalInterface
+ public interface SerializableToLongBiFunction<T, U> extends ToLongBiFunction<T, U>, Serializable {
+
+ }
+
+ @FunctionalInterface
+ public interface SerializableToDoubleBiFunction<T, U> extends ToDoubleBiFunction<T, U>, Serializable {
+
+ }
+
+ @FunctionalInterface
+ public interface SerializableToLongFunction<T> extends ToLongFunction<T>, Serializable {
+
+ }
}
diff --git a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/cardinality/DefaultCardinalityEstimator.java b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/cardinality/DefaultCardinalityEstimator.java
index 7582261..ebddcc1 100644
--- a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/cardinality/DefaultCardinalityEstimator.java
+++ b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/cardinality/DefaultCardinalityEstimator.java
@@ -19,11 +19,10 @@
package org.apache.wayang.core.optimizer.cardinality;
import org.apache.wayang.core.api.Configuration;
+import org.apache.wayang.core.function.FunctionDescriptor;
import org.apache.wayang.core.optimizer.OptimizationContext;
import java.util.Arrays;
-import java.util.function.ToLongBiFunction;
-import java.util.function.ToLongFunction;
/**
* Default implementation of the {@link CardinalityEstimator}. Generalizes a single-point estimation function.
@@ -34,7 +33,7 @@
private final int numInputs;
- private final ToLongBiFunction<long[], Configuration> singlePointEstimator;
+ private final FunctionDescriptor.SerializableToLongBiFunction<long[], Configuration> singlePointEstimator;
/**
* If {@code true}, receiving more than {@link #numInputs} is also fine.
@@ -44,7 +43,7 @@
public DefaultCardinalityEstimator(double certaintyProb,
int numInputs,
boolean isAllowMoreInputs,
- ToLongFunction<long[]> singlePointEstimator) {
+ FunctionDescriptor.SerializableToLongFunction<long[]> singlePointEstimator) {
this(certaintyProb,
numInputs,
isAllowMoreInputs,
@@ -54,7 +53,7 @@
public DefaultCardinalityEstimator(double certaintyProb,
int numInputs,
boolean isAllowMoreInputs,
- ToLongBiFunction<long[], Configuration> singlePointEstimator) {
+ FunctionDescriptor.SerializableToLongBiFunction<long[], Configuration> singlePointEstimator) {
this.certaintyProb = certaintyProb;
this.numInputs = numInputs;
this.singlePointEstimator = singlePointEstimator;
diff --git a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/costs/DefaultLoadEstimator.java b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/costs/DefaultLoadEstimator.java
index 775129e..9d9a4ea 100644
--- a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/costs/DefaultLoadEstimator.java
+++ b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/costs/DefaultLoadEstimator.java
@@ -19,6 +19,7 @@
package org.apache.wayang.core.optimizer.costs;
import org.apache.commons.lang3.Validate;
+import org.apache.wayang.core.function.FunctionDescriptor;
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimate;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
@@ -41,7 +42,7 @@
public DefaultLoadEstimator(int numInputs,
int numOutputs,
double correctnessProbability,
- ToLongBiFunction<long[], long[]> singlePointFunction) {
+ FunctionDescriptor.SerializableToLongBiFunction<long[], long[]> singlePointFunction) {
this(numInputs, numOutputs, correctnessProbability, null, singlePointFunction);
}
@@ -49,7 +50,7 @@
int numOutputs,
double correctnessProbability,
CardinalityEstimate nullCardinalityReplacement,
- ToLongBiFunction<long[], long[]> singlePointFunction) {
+ FunctionDescriptor.SerializableToLongBiFunction<long[], long[]> singlePointFunction) {
this(
numInputs, numOutputs, correctnessProbability, nullCardinalityReplacement,
(context, inputEstimates, outputEstimates) -> singlePointFunction.applyAsLong(inputEstimates, outputEstimates)
diff --git a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/costs/LoadEstimator.java b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/costs/LoadEstimator.java
index 0f07067..4892343 100644
--- a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/costs/LoadEstimator.java
+++ b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/costs/LoadEstimator.java
@@ -22,6 +22,7 @@
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimate;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
+import java.io.Serializable;
import java.util.Arrays;
/**
@@ -36,7 +37,7 @@
* Functional interface for lambda expressions to express single-point load estimation functions.
*/
@FunctionalInterface
- public interface SinglePointEstimationFunction {
+ public interface SinglePointEstimationFunction extends Serializable {
/**
* Estimate the load for the given artifact, input, and output estimates.
diff --git a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/costs/LoadProfileEstimators.java b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/costs/LoadProfileEstimators.java
index d4093b9..be42d6c 100644
--- a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/costs/LoadProfileEstimators.java
+++ b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/costs/LoadProfileEstimators.java
@@ -198,7 +198,7 @@
);
long overhead = spec.has("overhead") ? spec.getLong("overhead") : 0L;
- ToDoubleBiFunction<long[], long[]> resourceUtilizationEstimator = spec.has("ru") ?
+ FunctionDescriptor.SerializableToDoubleBiFunction<long[], long[]> resourceUtilizationEstimator = spec.has("ru") ?
parseResourceUsageJuel(spec.getString("ru"), numInputs, numOutputs) :
DEFAULT_RESOURCE_UTILIZATION_ESTIMATOR;
return new NestableLoadProfileEstimator(
@@ -273,7 +273,7 @@
);
long overhead = spec.has("overhead") ? spec.getLong("overhead") : 0L;
- ToDoubleBiFunction<long[], long[]> resourceUtilizationEstimator = spec.has("ru") ?
+ FunctionDescriptor.SerializableToDoubleBiFunction<long[], long[]> resourceUtilizationEstimator = spec.has("ru") ?
compileResourceUsage(spec.getString("ru")) :
DEFAULT_RESOURCE_UTILIZATION_ESTIMATOR;
return new NestableLoadProfileEstimator(
@@ -318,7 +318,7 @@
* @param numOutputs the number of outputs of the estimated operator, reflected as JUEL variables {@code out0}, {@code out1}, ...
* @return a {@link ToLongBiFunction} wrapping the JUEL expression
*/
- private static ToDoubleBiFunction<long[], long[]> parseResourceUsageJuel(String juel, int numInputs, int numOutputs) {
+ private static FunctionDescriptor.SerializableToDoubleBiFunction<long[], long[]> parseResourceUsageJuel(String juel, int numInputs, int numOutputs) {
final Map<String, Class<?>> parameterClasses = createJuelParameterClasses(numInputs, numOutputs);
final JuelUtils.JuelFunction<Double> juelFunction = new JuelUtils.JuelFunction<>(juel, Double.class, parameterClasses);
return (inCards, outCards) -> applyJuelFunction(juelFunction, null, inCards, outCards, Collections.emptyList());
@@ -391,7 +391,7 @@
* @param expression a mathematical expression
* @return a {@link ToLongBiFunction} wrapping the expression
*/
- private static ToDoubleBiFunction<long[], long[]> compileResourceUsage(String expression) {
+ private static FunctionDescriptor.SerializableToDoubleBiFunction<long[], long[]> compileResourceUsage(String expression) {
final Expression expr = ExpressionBuilder.parse(expression).specify(baseContext);
return (inCards, outCards) -> {
Context mathContext = createMathContext(null, inCards, outCards);
@@ -484,6 +484,6 @@
}
- private static final ToDoubleBiFunction<long[], long[]> DEFAULT_RESOURCE_UTILIZATION_ESTIMATOR = (in, out) -> 1d;
+ private static final FunctionDescriptor.SerializableToDoubleBiFunction<long[], long[]> DEFAULT_RESOURCE_UTILIZATION_ESTIMATOR = (in, out) -> 1d;
}
diff --git a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/costs/NestableLoadProfileEstimator.java b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/costs/NestableLoadProfileEstimator.java
index 7bd64c2..4510c01 100644
--- a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/costs/NestableLoadProfileEstimator.java
+++ b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/costs/NestableLoadProfileEstimator.java
@@ -20,6 +20,7 @@
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.exception.WayangException;
+import org.apache.wayang.core.function.FunctionDescriptor;
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimate;
import java.util.Collection;
@@ -40,7 +41,7 @@
/**
* The degree to which the load profile can utilize available resources.
*/
- private final ToDoubleBiFunction<long[], long[]> resourceUtilizationEstimator;
+ private final FunctionDescriptor.SerializableToDoubleBiFunction<long[], long[]> resourceUtilizationEstimator;
/**
* Milliseconds overhead that this load profile incurs.
@@ -98,7 +99,7 @@
LoadEstimator ramLoadEstimator,
LoadEstimator diskLoadEstimator,
LoadEstimator networkLoadEstimator,
- ToDoubleBiFunction<long[], long[]> resourceUtilizationEstimator,
+ FunctionDescriptor.SerializableToDoubleBiFunction<long[], long[]> resourceUtilizationEstimator,
long overheadMillis) {
this(
cpuLoadEstimator, ramLoadEstimator, diskLoadEstimator, networkLoadEstimator,
@@ -121,7 +122,7 @@
LoadEstimator ramLoadEstimator,
LoadEstimator diskLoadEstimator,
LoadEstimator networkLoadEstimator,
- ToDoubleBiFunction<long[], long[]> resourceUtilizationEstimator,
+ FunctionDescriptor.SerializableToDoubleBiFunction<long[], long[]> resourceUtilizationEstimator,
long overheadMillis,
String configurationKey) {
this.cpuLoadEstimator = cpuLoadEstimator;
diff --git a/wayang-commons/wayang-core/src/test/java/org/apache/wayang/core/optimizer/cardinality/DefaultCardinalityEstimatorTest.java b/wayang-commons/wayang-core/src/test/java/org/apache/wayang/core/optimizer/cardinality/DefaultCardinalityEstimatorTest.java
index 5ffc3ab..44584a3 100644
--- a/wayang-commons/wayang-core/src/test/java/org/apache/wayang/core/optimizer/cardinality/DefaultCardinalityEstimatorTest.java
+++ b/wayang-commons/wayang-core/src/test/java/org/apache/wayang/core/optimizer/cardinality/DefaultCardinalityEstimatorTest.java
@@ -18,12 +18,11 @@
package org.apache.wayang.core.optimizer.cardinality;
+import org.apache.wayang.core.api.Configuration;
+import org.apache.wayang.core.function.FunctionDescriptor;
+import org.apache.wayang.core.optimizer.OptimizationContext;
import org.junit.Assert;
import org.junit.Test;
-import org.apache.wayang.core.api.Configuration;
-import org.apache.wayang.core.optimizer.OptimizationContext;
-
-import java.util.function.ToLongFunction;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -42,7 +41,7 @@
CardinalityEstimate inputEstimate1 = new CardinalityEstimate(50, 60, 0.8);
CardinalityEstimate inputEstimate2 = new CardinalityEstimate(10, 100, 0.4);
- final ToLongFunction<long[]> singlePointEstimator =
+ final FunctionDescriptor.SerializableToLongFunction<long[]> singlePointEstimator =
inputEstimates -> (long) Math.ceil(0.8 * inputEstimates[0] * inputEstimates[1]);
CardinalityEstimator estimator = new DefaultCardinalityEstimator(
diff --git a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkSampleOperator.java b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkSampleOperator.java
index 86b0613..fb4e50b 100644
--- a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkSampleOperator.java
+++ b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkSampleOperator.java
@@ -22,6 +22,7 @@
import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.wayang.basic.operators.SampleOperator;
import org.apache.wayang.core.api.exception.WayangException;
+import org.apache.wayang.core.function.FunctionDescriptor;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
import org.apache.wayang.core.platform.ChannelDescriptor;
@@ -52,7 +53,7 @@
/**
* Creates a new instance.
*/
- public FlinkSampleOperator(IntUnaryOperator sampleSizeFunction, DataSetType<Type> type, LongUnaryOperator seedFunction) {
+ public FlinkSampleOperator(FunctionDescriptor.SerializableIntUnaryOperator sampleSizeFunction, DataSetType<Type> type, FunctionDescriptor.SerializableLongUnaryOperator seedFunction) {
super(sampleSizeFunction, type, Methods.RANDOM, seedFunction);
}
diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaRandomSampleOperator.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaRandomSampleOperator.java
index 528f716..bf48176 100644
--- a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaRandomSampleOperator.java
+++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaRandomSampleOperator.java
@@ -19,6 +19,7 @@
package org.apache.wayang.java.operators;
import org.apache.wayang.basic.operators.SampleOperator;
+import org.apache.wayang.core.function.FunctionDescriptor;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
import org.apache.wayang.core.platform.ChannelDescriptor;
@@ -55,7 +56,7 @@
*
* @param sampleSizeFunction udf-based size of sample
*/
- public JavaRandomSampleOperator(IntUnaryOperator sampleSizeFunction, DataSetType<Type> type, LongUnaryOperator seedFunction) {
+ public JavaRandomSampleOperator(FunctionDescriptor.SerializableIntUnaryOperator sampleSizeFunction, DataSetType<Type> type, FunctionDescriptor.SerializableLongUnaryOperator seedFunction) {
super(sampleSizeFunction, type, Methods.RANDOM, seedFunction);
}
diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaReservoirSampleOperator.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaReservoirSampleOperator.java
index f6946e6..66730af 100644
--- a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaReservoirSampleOperator.java
+++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaReservoirSampleOperator.java
@@ -19,6 +19,7 @@
package org.apache.wayang.java.operators;
import org.apache.wayang.basic.operators.SampleOperator;
+import org.apache.wayang.core.function.FunctionDescriptor;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
import org.apache.wayang.core.platform.ChannelDescriptor;
@@ -54,7 +55,7 @@
/**
* Creates a new instance.
*/
- public JavaReservoirSampleOperator(IntUnaryOperator sampleSizeFunction, DataSetType<Type> type, LongUnaryOperator seed) {
+ public JavaReservoirSampleOperator(FunctionDescriptor.SerializableIntUnaryOperator sampleSizeFunction, DataSetType<Type> type, FunctionDescriptor.SerializableLongUnaryOperator seed) {
super(sampleSizeFunction, type, Methods.RESERVOIR, seed);
}
diff --git a/wayang-platforms/wayang-jdbc-template/pom.xml b/wayang-platforms/wayang-jdbc-template/pom.xml
index 2a64798..e2f655b 100644
--- a/wayang-platforms/wayang-jdbc-template/pom.xml
+++ b/wayang-platforms/wayang-jdbc-template/pom.xml
@@ -68,6 +68,11 @@
<version>0.7.1</version>
</dependency>
<dependency>
+ <groupId>org.apache.wayang</groupId>
+ <artifactId>wayang-spark</artifactId>
+ <version>0.7.1</version>
+ </dependency>
+ <dependency>
<groupId>org.hsqldb</groupId>
<artifactId>hsqldb</artifactId>
<version>${hsqldb.version}</version>
diff --git a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/SqlToRddOperator.java b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/SqlToRddOperator.java
index 08672ca..48cff71 100644
--- a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/SqlToRddOperator.java
+++ b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/SqlToRddOperator.java
@@ -36,10 +36,7 @@
import org.apache.wayang.spark.operators.SparkExecutionOperator;
import java.sql.Connection;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
+import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
diff --git a/wayang-platforms/wayang-postgres/pom.xml b/wayang-platforms/wayang-postgres/pom.xml
index 5bc9ce3..ee75b36 100644
--- a/wayang-platforms/wayang-postgres/pom.xml
+++ b/wayang-platforms/wayang-postgres/pom.xml
@@ -55,6 +55,11 @@
<artifactId>wayang-jdbc-template</artifactId>
<version>0.7.1</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.wayang</groupId>
+ <artifactId>wayang-spark</artifactId>
+ <version>0.7.1</version>
+ </dependency>
</dependencies>
diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkBernoulliSampleOperator.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkBernoulliSampleOperator.java
index be041ae..8ec074f 100644
--- a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkBernoulliSampleOperator.java
+++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkBernoulliSampleOperator.java
@@ -20,6 +20,7 @@
import org.apache.spark.api.java.JavaRDD;
import org.apache.wayang.basic.operators.SampleOperator;
+import org.apache.wayang.core.function.FunctionDescriptor;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
import org.apache.wayang.core.platform.ChannelDescriptor;
@@ -49,7 +50,7 @@
/**
* Creates a new instance.
*/
- public SparkBernoulliSampleOperator(IntUnaryOperator sampleSizeFunction, DataSetType<Type> type, LongUnaryOperator seedFunction) {
+ public SparkBernoulliSampleOperator(FunctionDescriptor.SerializableIntUnaryOperator sampleSizeFunction, DataSetType<Type> type, FunctionDescriptor.SerializableLongUnaryOperator seedFunction) {
super(sampleSizeFunction, type, Methods.BERNOULLI, seedFunction);
}
diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkRandomPartitionSampleOperator.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkRandomPartitionSampleOperator.java
index 8333a17..25212be 100644
--- a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkRandomPartitionSampleOperator.java
+++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkRandomPartitionSampleOperator.java
@@ -22,6 +22,7 @@
import org.apache.spark.api.java.JavaRDD;
import org.apache.wayang.basic.operators.SampleOperator;
import org.apache.wayang.core.api.exception.WayangException;
+import org.apache.wayang.core.function.FunctionDescriptor;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
import org.apache.wayang.core.platform.ChannelDescriptor;
@@ -70,7 +71,7 @@
/**
* Creates a new instance.
*/
- public SparkRandomPartitionSampleOperator(IntUnaryOperator sampleSizeFunction, DataSetType<Type> type, LongUnaryOperator seedFunction) {
+ public SparkRandomPartitionSampleOperator(FunctionDescriptor.SerializableIntUnaryOperator sampleSizeFunction, DataSetType<Type> type, FunctionDescriptor.SerializableLongUnaryOperator seedFunction) {
super(sampleSizeFunction, type, Methods.RANDOM, seedFunction);
}
diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkShufflePartitionSampleOperator.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkShufflePartitionSampleOperator.java
index ca1c36b..3169da7 100644
--- a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkShufflePartitionSampleOperator.java
+++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkShufflePartitionSampleOperator.java
@@ -22,6 +22,7 @@
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function2;
import org.apache.wayang.basic.operators.SampleOperator;
+import org.apache.wayang.core.function.FunctionDescriptor;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
import org.apache.wayang.core.platform.ChannelDescriptor;
@@ -65,7 +66,7 @@
/**
* Creates a new instance.
*/
- public SparkShufflePartitionSampleOperator(IntUnaryOperator sampleSizeFunction, DataSetType<Type> type, LongUnaryOperator seedFunction) {
+ public SparkShufflePartitionSampleOperator(FunctionDescriptor.SerializableIntUnaryOperator sampleSizeFunction, DataSetType<Type> type, FunctionDescriptor.SerializableLongUnaryOperator seedFunction) {
super(sampleSizeFunction, type, Methods.SHUFFLE_PARTITION_FIRST, seedFunction);
}