Merge pull request #429 from damik3/main

Multicontext features
diff --git a/wayang-api/wayang-api-scala-java/README.md b/wayang-api/wayang-api-scala-java/README.md
new file mode 100644
index 0000000..085741b
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/README.md
@@ -0,0 +1,364 @@
+# Wayang scala-java API
+
+Wayang provides two options for performing multiple asynchronous tasks - `async` API and `multicontext` API. 
+
+The above examples can be found in the `async.apps` package and the `multicontext.apps` package in wayang-benchmark.
+
+## `async` API
+Our example shows how to utilize the `async` API with an example program containing several asynchronous jobs:
+
+```scala
+import org.apache.wayang.api.async.DataQuantaImplicits._
+import org.apache.wayang.api.async.PlanBuilderImplicits._
+import org.apache.wayang.api.{MultiContext, DataQuanta, PlanBuilder}
+import org.apache.wayang.java.Java
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+object Example {
+
+  def main(args: Array[String]): Unit = {
+    val planBuilder1 = new PlanBuilder(new MultiContext().withPlugin(Java.basicPlugin())).withUdfJarsOf(this.getClass)
+    val planBuilder2 = new PlanBuilder(new MultiContext().withPlugin(Java.basicPlugin())).withUdfJarsOf(this.getClass)
+    val planBuilder3 = new PlanBuilder(new MultiContext().withPlugin(Java.basicPlugin())).withUdfJarsOf(this.getClass)
+
+    // Async job 1
+    val result1 = planBuilder1
+      .loadCollection(List(1, 2, 3, 4, 5))
+      .map(_ * 1)
+      .runAsync(tempFileOut = "file:///tmp/out1.temp")
+
+    // Async job 2
+    val result2 = planBuilder2
+      .loadCollection(List(6, 7, 8, 9, 10))
+      .filter(_ <= 8)
+      .runAsync(tempFileOut = "file:///tmp/out2.temp")
+
+    // Async job 3 which merges 1 and 2
+    val dq1: DataQuanta[Int] = planBuilder1.loadAsync(result1)
+    val dq2: DataQuanta[Int] = planBuilder1.loadAsync(result2)
+    val result3 = dq1.union(dq2)
+      .map(_ * 3)
+      .filter(_ < 100)
+      .runAsync(tempFileOut = "file:///tmp/out3.temp", result1, result2)
+
+    // Async job 4 which runs independently from 1, 2 and 3
+    val result4 = planBuilder3
+      .loadCollection(List(1, 2, 3, 4, 5))
+      .filter(_ >= 2)
+      .runAsync(tempFileOut = "file:///tmp/out4.temp")
+
+    // Async job 5 which merges 3 and 4
+    val dq3: DataQuanta[Int] = planBuilder1.loadAsync(result3)
+    val dq4: DataQuanta[Int] = planBuilder1.loadAsync(result4)
+    val result5 = dq3.intersect(dq4)
+      .map(_ * 4)
+      .writeTextFileAsync(url = "file:///tmp/out5.final", result3, result4)
+
+    Await.result(result5, Duration.Inf)
+  }
+
+}
+```
+
+**Key Points:**
+- Jobs 1, 2, and 4 are executed concurrently as there is no dependency.
+ 
+ 
+- The output of these jobs is written to the provided path (`.runAsync(tempFileOut = "file:///tmp/out1.temp")`).
+ 
+ 
+- Job 3 waits for jobs 1 and 2 to finish before starting (`.runAsync(tempFileOut = "file:///tmp/out3.temp", result1, result2)`). It reads the results from the aforementioned paths, unites them, and processes further.
+ 
+ 
+- Job 5 awaits jobs 3 and 4's completion to begin. This is indicated by  `.writeTextFileAsync("file:///tmp/out5.final", result3, result4)`. Instead of using `runAsync`, we use `writeTextFileAsync` to finish the execution.
+
+
+## `multicontext` API
+
+The examples below demonstrate the capabilities of the multi context api.
+
+### Basic usage
+
+```scala
+import org.apache.wayang.api.{MultiContext, MultiContextPlanBuilder}
+import org.apache.wayang.core.api.Configuration
+import org.apache.wayang.java.Java
+import org.apache.wayang.spark.Spark
+
+class WordCount {}
+
+object WordCount {
+
+  def main(args: Array[String]): Unit = {
+    println("WordCount")
+    println("Scala version:")
+    println(scala.util.Properties.versionString)
+
+    val configuration1 = new Configuration()
+    val configuration2 = new Configuration()
+
+    val context1 = new MultiContext(configuration1)
+      .withPlugin(Java.basicPlugin())
+      .withTextFileSink("file:///tmp/out11")
+    val context2 = new MultiContext(configuration2)
+      .withPlugin(Spark.basicPlugin())
+      .withTextFileSink("file:///tmp/out12")
+
+    val multiContextPlanBuilder = new MultiContextPlanBuilder(List(context1, context2))
+      .withUdfJarsOf(this.getClass)
+
+    // Generate some test data
+    val inputValues = Array("Big data is big.", "Is data big data?")
+
+    // Build and execute a word count
+    multiContextPlanBuilder.forEach(_
+      .loadCollection(inputValues)
+      .flatMap(_.split("\\s+"))
+      .map(_.replaceAll("\\W+", "").toLowerCase)
+      .map((_, 1))
+      .reduceByKey(_._1, (a, b) => (a._1, a._2 + b._2))
+    ).execute()
+
+  }
+}
+```
+
+The above program executes the same plan concurrently for two contexts, `context1` which runs on Java and writes to text file `file:///tmp/out11`, and `context2` which runs on Spark (and can be configured through `configuration2`) and writes to `file:///tmp/out12`.
+
+The same job can also be written like this:
+
+```scala
+multiContextPlanBuilder
+    .forEach(_.loadCollection(inputValues))
+    .forEach(_.flatMap(_.split("\\s+")))
+    .forEach(_.map(_.replaceAll("\\W+", "").toLowerCase))
+    .forEach(_.map((_, 1)))
+    .forEach(_.reduceByKey(_._1, (a, b) => (a._1, a._2 + b._2)))
+    .execute()
+```
+
+### Target platforms
+
+```scala
+import org.apache.wayang.api.{MultiContext, MultiContextPlanBuilder}
+import org.apache.wayang.java.Java
+import org.apache.wayang.spark.Spark
+
+object WordCountWithTargetPlatforms {
+
+  def main(args: Array[String]): Unit = {
+    val configuration1 = new Configuration()
+    val configuration2 = new Configuration()
+
+    val context1 = new MultiContext(configuration1)
+      .withPlugin(Java.basicPlugin())
+      .withPlugin(Spark.basicPlugin())
+      .withTextFileSink("file:///tmp/out11")
+    val context2 = new MultiContext(configuration2)
+      .withPlugin(Java.basicPlugin())
+      .withPlugin(Spark.basicPlugin())
+      .withTextFileSink("file:///tmp/out12")
+
+    val multiContextPlanBuilder = new MultiContextPlanBuilder(List(context1, context2))
+      .withUdfJarsOf(this.getClass)
+
+    // Generate some test data
+    val inputValues1 = Array("Big data is big.", "Is data big data?")
+    val inputValues2 = Array("Big big data is big big.", "Is data big data big?")
+
+    multiContextPlanBuilder
+      .loadCollection(context1, inputValues1)
+      .loadCollection(context2, inputValues2)
+
+      .forEach(_.flatMap(_.split("\\s+")))
+      .withTargetPlatforms(context1, Spark.platform())
+      .withTargetPlatforms(context2, Java.platform())
+
+      .forEach(_.map(_.replaceAll("\\W+", "").toLowerCase))
+      .withTargetPlatforms(Java.platform())
+
+      .forEach(_.map((_, 1)))
+
+      .forEach(_.reduceByKey(_._1, (a, b) => (a._1, a._2 + b._2)))
+      .withTargetPlatforms(context1, Spark.platform())
+      .execute()
+  }
+}
+```
+
+Here we add the ability to execute an operation on different platforms for each context. 
+
+In the snippet below
+
+```scala
+      .forEach(_.flatMap(_.split("\\s+")))
+      .withTargetPlatforms(context1, Spark.platform())
+      .withTargetPlatforms(context2, Java.platform())
+```
+
+the `flatMap` operator will be executed on Spark for `context1` and on Java for `context2`.
+
+In the snippet below
+
+```scala
+      .forEach(_.map(_.replaceAll("\\W+", "").toLowerCase))
+      .withTargetPlatforms(Java.platform())
+```
+
+the `map` operator gets executed on the Java platform for all contexts, since none is specifically stated.
+
+In the snippet below
+
+```scala
+      .forEach(_.reduceByKey(_._1, (a, b) => (a._1, a._2 + b._2)))
+      .withTargetPlatforms(context1, Spark.platform())
+```
+
+the `reduceByKey` operator will be executed on spark for `context1` and on the default platform (decided by the optimzer) for `context2.
+
+### Merge
+
+```scala
+import org.apache.wayang.api.{MultiContext, MultiContextPlanBuilder}
+import org.apache.wayang.core.api.{Configuration, WayangContext}
+import org.apache.wayang.java.Java
+import org.apache.wayang.spark.Spark
+
+object WordCountWithMerge {
+
+  def main(args: Array[String]): Unit = {
+    val configuration1 = new Configuration()
+    val configuration2 = new Configuration()
+
+    val context1 = new MultiContext(configuration1)
+      .withPlugin(Java.basicPlugin())
+      .withMergeFileSink("file:///tmp/out11") // The mergeContext will read the output of context 1 from here
+    val context2 = new MultiContext(configuration2)
+      .withPlugin(Java.basicPlugin())
+      .withMergeFileSink("file:///tmp/out12") // The mergeContext will read the output of context 2 from here
+
+    val multiContextPlanBuilder = new MultiContextPlanBuilder(List(context1, context2))
+      .withUdfJarsOf(this.getClass)
+
+    // To be used after merging the previous two
+    val mergeContext = new WayangContext(new Configuration())
+      .withPlugin(Java.basicPlugin())
+
+    // Generate some test data
+    val inputValues1 = Array("Big data is big.", "Is data big data?")
+    val inputValues2 = Array("Big big data is big big.", "Is data big data big?")
+
+    // Build and execute a word count in 2 different contexts
+    multiContextPlanBuilder
+      .loadCollection(context1, inputValues1)
+      .loadCollection(context2, inputValues2)
+      .forEach(_.flatMap(_.split("\\s+")))
+      .forEach(_.map(_.replaceAll("\\W+", "").toLowerCase))
+      .forEach(_.map((_, 1)))
+      .forEach(_.reduceByKey(_._1, (a, b) => (a._1, a._2 + b._2)))
+
+      // Merge contexts with union operator
+      .mergeUnion(mergeContext)
+
+      // Continue processing merged DataQuanta
+      .filter(_._2 >= 3)
+      .reduceByKey(_._1, (t1, t2) => (t1._1, t1._2 + t2._2))
+
+      // Write out
+      // Writes:
+      //    (big,9)
+      //    (data,6)
+      .writeTextFile("file:///tmp/out1.merged", s => s.toString())
+
+  }
+
+}
+```
+
+Here the two contexts start executing concurrently as in the above examples, but they get merged here
+
+```scala
+      .mergeUnion(mergeContext)
+```
+
+The merging is happening at the merge context which can be one of the above or a completely new one. 
+
+Note that the merge context reads the data to merge from the paths specified for each context here
+
+```scala
+      .withMergeFileSink("file:///tmp/out11")   // The mergeContext will read the output of context 1 from here
+      ...
+      .withMergeFileSink("file:///tmp/out12")   // The mergeContext will read the output of context 2 from here
+      ...
+```
+
+The rest of the execution happens at the merge context and is a now single job.
+
+### Combine each
+
+```scala
+import org.apache.wayang.api.{MultiContext, DataQuanta, MultiContextPlanBuilder}
+import org.apache.wayang.java.Java
+
+object WordCountCombineEach {
+
+  def main(args: Array[String]): Unit = {
+    val configuration1 = new Configuration()
+    val configuration2 = new Configuration()
+
+    val context1 = new MultiContext(configuration1)
+      .withPlugin(Java.basicPlugin())
+      .withTextFileSink("file:///tmp/out11")
+    val context2 = new MultiContext(configuration2)
+      .withPlugin(Java.basicPlugin())
+      .withTextFileSink("file:///tmp/out12")
+
+    val multiContextPlanBuilder = new MultiContextPlanBuilder(List(context1, context2))
+      .withUdfJarsOf(this.getClass)
+
+    // Generate some test data
+    val inputValues = Array("Big data is big.", "Is data big data?")
+
+    // Build and execute a word count
+    val dq1 = multiContextPlanBuilder
+      .forEach(_.loadCollection(inputValues))
+      .forEach(_.flatMap(_.split("\\s+")))
+      .forEach(_.map(_.replaceAll("\\W+", "").toLowerCase))
+      .forEach(_.map((_, 1)))
+      .forEach(_.reduceByKey(_._1, (a, b) => (a._1, a._2 + b._2)))
+
+    val dq2 = multiContextPlanBuilder
+      .forEach(_.loadCollection(inputValues))
+      .forEach(_.flatMap(_.split("\\s+")))
+      .forEach(_.map(_.replaceAll("\\W+", "").toLowerCase))
+      .forEach(_.map((_, 1)))
+      .forEach(_.reduceByKey(_._1, (a, _) => (a._1, 100)))
+
+    dq1.combineEach(dq2, (dq1: DataQuanta[(String, Int)], dq2: DataQuanta[(String, Int)]) => dq1.union(dq2))
+      .forEach(_.map(t => (t._1 + " wayang out", t._2)))
+      .execute()
+  }
+
+}
+```
+
+Here we add the capability of performing a binary operator on each context. 
+
+With this line
+
+```scala
+    dq1.combineEach(dq2, (dq1: DataQuanta[(String, Int)], dq2: DataQuanta[(String, Int)]) => dq1.union(dq2))
+```
+
+the `dq1.union(dq2)` operation is happening for each context. 
+
+The rest of the execution
+
+```scala
+      .forEach(_.map(t => (t._1 + " wayang out", t._2)))
+      .execute()
+```
+
+continues as a multi context job. 
\ No newline at end of file
diff --git a/wayang-api/wayang-api-scala-java/pom.xml b/wayang-api/wayang-api-scala-java/pom.xml
index 750f3b0..bdee691 100644
--- a/wayang-api/wayang-api-scala-java/pom.xml
+++ b/wayang-api/wayang-api-scala-java/pom.xml
@@ -61,11 +61,6 @@
         </dependency>
         <dependency>
             <groupId>org.apache.wayang</groupId>
-            <artifactId>wayang-api-scala-java_2.11</artifactId>
-            <version>0.7.1</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.wayang</groupId>
             <artifactId>wayang-utils-profile-db</artifactId>
             <version>0.7.1</version>
         </dependency>
@@ -78,13 +73,16 @@
             <groupId>org.apache.wayang</groupId>
             <artifactId>wayang-java</artifactId>
             <version>0.7.1</version>
-            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.wayang</groupId>
             <artifactId>wayang-sqlite3</artifactId>
             <version>0.7.1</version>
-            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.wayang</groupId>
+            <artifactId>wayang-postgres</artifactId>
+            <version>0.7.1</version>
         </dependency>
         <dependency>
             <groupId>org.apache.hadoop</groupId>
@@ -100,7 +98,6 @@
           <groupId>org.apache.wayang</groupId>
           <artifactId>wayang-spark</artifactId>
           <version>0.7.1</version>
-          <scope>test</scope>
         </dependency>
         <dependency>
           <groupId>org.apache.spark</groupId>
@@ -130,5 +127,13 @@
           <artifactId>netty-all</artifactId>
           <version>4.1.45.Final</version>
         </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.module</groupId>
+            <artifactId>jackson-module-scala_2.12</artifactId>
+        </dependency>
     </dependencies>
 </project>
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala
index 4582269..c5ff0e1 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala
@@ -28,7 +28,7 @@
 import org.apache.commons.lang3.Validate
 import org.apache.wayang.basic.function.ProjectionDescriptor
 import org.apache.wayang.basic.operators._
-import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBinaryOperator, SerializableFunction, SerializablePredicate}
+import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBinaryOperator, SerializableFunction, SerializableIntUnaryOperator, SerializablePredicate}
 import org.apache.wayang.core.function._
 import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
 import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator
@@ -239,7 +239,7 @@
                     seed: Option[Long] = None,
                     sampleMethod: SampleOperator.Methods = SampleOperator.Methods.ANY): DataQuanta[Out] =
     this.sampleDynamicJava(
-      new IntUnaryOperator {
+      new SerializableIntUnaryOperator {
         override def applyAsInt(operand: Int): Int = sampleSizeFunction(operand)
       },
       datasetSize,
@@ -256,7 +256,7 @@
     * @param sampleMethod       the [[SampleOperator.Methods]] to use for sampling
     * @return a new instance representing the [[FlatMapOperator]]'s output
     */
-  def sampleDynamicJava(sampleSizeFunction: IntUnaryOperator,
+  def sampleDynamicJava(sampleSizeFunction: SerializableIntUnaryOperator,
                         datasetSize: Long = SampleOperator.UNKNOWN_DATASET_SIZE,
                         seed: Option[Long] = None,
                         sampleMethod: SampleOperator.Methods = SampleOperator.Methods.ANY): DataQuanta[Out] = {
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala
index ece3a50..6c55dd0 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala
@@ -29,7 +29,7 @@
 import org.apache.wayang.basic.data.{Record, Tuple2 => RT2}
 import org.apache.wayang.basic.operators.{GlobalReduceOperator, LocalCallbackSink, MapOperator, SampleOperator}
 import org.apache.wayang.commons.util.profiledb.model.Experiment
-import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBiFunction, SerializableBinaryOperator, SerializableFunction, SerializablePredicate}
+import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBiFunction, SerializableBinaryOperator, SerializableFunction, SerializableIntUnaryOperator, SerializablePredicate}
 import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
 import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator
 import org.apache.wayang.core.optimizer.costs.{LoadEstimator, LoadProfile, LoadProfileEstimator}
@@ -187,7 +187,7 @@
     * @param sampleSize the absolute size of the sample
     * @return a [[SampleDataQuantaBuilder]]
     */
-  def sample(sampleSize: Int): SampleDataQuantaBuilder[Out] = this.sample(new IntUnaryOperator {
+  def sample(sampleSize: Int): SampleDataQuantaBuilder[Out] = this.sample(new SerializableIntUnaryOperator {
     override def applyAsInt(operand: Int): Int = sampleSize
   })
 
@@ -198,7 +198,7 @@
     * @param sampleSizeFunction the absolute size of the sample as a function of the current iteration number
     * @return a [[SampleDataQuantaBuilder]]
     */
-  def sample(sampleSizeFunction: IntUnaryOperator) = new SampleDataQuantaBuilder[Out](this, sampleSizeFunction)
+  def sample(sampleSizeFunction: SerializableIntUnaryOperator) = new SampleDataQuantaBuilder[Out](this, sampleSizeFunction)
 
   /**
     * Annotates a key to this instance.
@@ -936,7 +936,7 @@
   * @param inputDataQuanta    [[DataQuantaBuilder]] for the input [[DataQuanta]]
   * @param sampleSizeFunction the absolute size of the sample as a function of the current iteration number
   */
-class SampleDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T], sampleSizeFunction: IntUnaryOperator)
+class SampleDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T], sampleSizeFunction: SerializableIntUnaryOperator)
                                 (implicit javaPlanBuilder: JavaPlanBuilder)
   extends BasicDataQuantaBuilder[SampleDataQuantaBuilder[T], T] {
 
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/MultiContext.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/MultiContext.scala
new file mode 100644
index 0000000..1f1697b
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/MultiContext.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api
+
+import org.apache.wayang.core.api.{Configuration, WayangContext}
+import org.apache.wayang.core.plugin.Plugin
+
+class MultiContext(configuration: Configuration) extends WayangContext(configuration) {
+
+  val id: Long = MultiContext.nextId()
+
+  private var sink: Option[MultiContext.UnarySink] = None
+  private var plugins: List[String] = List()
+
+  def this() = {
+    this(new Configuration())
+  }
+
+  override def withPlugin(plugin: Plugin): MultiContext = {
+    this.plugins = this.plugins :+ plugin.getClass.getName
+    super.withPlugin(plugin)
+    this
+  }
+
+  def withTextFileSink(url: String): MultiContext = {
+    this.sink = Some(MultiContext.TextFileSink(url))
+    this
+  }
+
+  def withObjectFileSink(url: String): MultiContext = {
+    this.sink = Some(MultiContext.ObjectFileSink(url))
+    this
+  }
+
+  def withMergeFileSink(url: String): MultiContext = {
+    this.sink = Some(MultiContext.MergeFileSink(url))
+    this
+  }
+
+  def getSink: Option[MultiContext.UnarySink] = sink
+  def getPlugins: List[String] = plugins
+}
+
+object MultiContext {
+
+  private var lastId = 0L
+
+  private def nextId(): Long = {
+    lastId += 1
+    lastId
+  }
+
+  private[api] trait UnarySink
+  private[api] case class TextFileSink(url: String) extends UnarySink
+  private[api] case class ObjectFileSink(url: String) extends UnarySink
+  private[api] case class MergeFileSink(url: String) extends UnarySink
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/MultiContextDataQuanta.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/MultiContextDataQuanta.scala
new file mode 100644
index 0000000..3826b2a
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/MultiContextDataQuanta.scala
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.wayang.api
+
+import org.apache.wayang.api.async.DataQuantaImplicits._
+import org.apache.wayang.api.async.PlanBuilderImplicits._
+import org.apache.wayang.core.api.WayangContext
+import org.apache.wayang.core.api.exception.WayangException
+import org.apache.wayang.core.plan.wayangplan.Operator
+import org.apache.wayang.core.platform.Platform
+
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
+import scala.reflect.ClassTag
+
+class MultiContextDataQuanta[Out: ClassTag](private val dataQuantaMap: Map[Long, DataQuanta[Out]])(private val multiContextPlanBuilder: MultiContextPlanBuilder) {
+
+  /**
+   * Apply the specified function to each [[DataQuanta]].
+   *
+   * @tparam NewOut the type of elements in the resulting [[MultiContextDataQuanta]]
+   * @param f the function to apply to each element
+   * @return a new [[MultiContextDataQuanta]] with elements transformed by the function
+   */
+  def forEach[NewOut: ClassTag](f: DataQuanta[Out] => DataQuanta[NewOut]): MultiContextDataQuanta[NewOut] =
+    new MultiContextDataQuanta[NewOut](dataQuantaMap.mapValues(f))(this.multiContextPlanBuilder)
+
+
+  /**
+   * Merges this and `that` [[MultiContextDataQuanta]], by using the `f` function to merge the corresponding [[DataQuanta]].
+   * Returns a new [[MultiContextDataQuanta]] containing the results.
+   *
+   * @tparam ThatOut the type of data contained in `that` [[MultiContextDataQuanta]]
+   * @tparam NewOut  the type of data contained in the resulting [[MultiContextDataQuanta]]
+   * @param that the [[MultiContextDataQuanta]] to apply the function on
+   * @param f    the function to apply on the corresponding data quanta
+   * @return a new [[MultiContextDataQuanta]] containing the results of applying the function
+   */
+  def combineEach[ThatOut: ClassTag, NewOut: ClassTag](that: MultiContextDataQuanta[ThatOut],
+                                                       f: (DataQuanta[Out], DataQuanta[ThatOut]) => DataQuanta[NewOut]): MultiContextDataQuanta[NewOut] =
+    new MultiContextDataQuanta[NewOut](this.dataQuantaMap.map { case (key, thisDataQuanta) =>
+      val thatDataQuanta = that.dataQuantaMap(key)
+      key -> f(thisDataQuanta, thatDataQuanta)
+    })(this.multiContextPlanBuilder)
+
+
+  /**
+   * Restrict the [[Operator]] of each [[MultiContext]] to run on certain [[Platform]]s.
+   *
+   * @param platforms on that the [[Operator]] may be executed
+   * @return this instance
+   */
+  def withTargetPlatforms(platforms: Platform*): MultiContextDataQuanta[Out] = {
+    new MultiContextDataQuanta[Out](this.dataQuantaMap.mapValues(_.withTargetPlatforms(platforms: _*)))(multiContextPlanBuilder)
+  }
+
+
+  /**
+   * Restrict the [[Operator]] of specified [[MultiContext]] to run on certain [[Platform]]s.
+   *
+   * @param multiContext the [[MultiContext]] to restrict
+   * @param platforms      on that the [[Operator]] may be executed
+   * @return this instance
+   */
+  def withTargetPlatforms(multiContext: MultiContext, platforms: Platform*): MultiContextDataQuanta[Out] = {
+    val updatedDataQuanta = dataQuantaMap(multiContext.id).withTargetPlatforms(platforms: _*)
+    val updatedDataQuantaMap = dataQuantaMap.updated(multiContext.id, updatedDataQuanta)
+    new MultiContextDataQuanta[Out](updatedDataQuantaMap)(this.multiContextPlanBuilder)
+  }
+
+
+  /**
+   * Executes the plan asynchronously.
+   *
+   * @param timeout the maximum time to wait for the execution to complete.
+   *                If not specified, the execution will block indefinitely.
+   */
+  def execute(timeout: Duration = Duration.Inf): Unit = {
+
+    val asyncResults = multiContextPlanBuilder.multiContexts.map(multiContext => {
+
+      // For each multiContext get its corresponding dataQuanta
+      val dataQuanta = dataQuantaMap(multiContext.id)
+
+      multiContext.getSink match {
+
+        // Execute plan asynchronously
+        case Some(textFileSink: MultiContext.TextFileSink) =>
+          dataQuanta.writeTextFileAsync(textFileSink.url)
+
+        // Execute plan asynchronously
+        case Some(objectFileSink: MultiContext.ObjectFileSink) =>
+          dataQuanta.writeObjectFileAsync(objectFileSink.url)
+
+        case None =>
+          throw new WayangException("All contexts must be attached to an output sink.")
+
+        case _ =>
+          throw new WayangException("Invalid sink.")
+      }
+    })
+
+    // Block indefinitely until all futures finish
+    val aggregateFuture: Future[List[Any]] = Future.sequence(asyncResults)
+    Await.result(aggregateFuture, timeout)
+
+  }
+
+
+  /**
+   * Merge the underlying [[DataQuanta]]s asynchronously using the [[DataQuanta.union]] operator.
+   *
+   * @param mergeContext The Wayang context for merging the [[DataQuanta]]s.
+   * @param timeout      The maximum time to wait for all futures to finish. Default is [[Duration.Inf]] (indefinitely).
+   * @return A [[DataQuanta]] representing the result of merging and unioning each of the [[DataQuanta]].
+   * @throws WayangException if any of the contexts are not attached to a merge sink or if the sink is invalid.
+   */
+  def mergeUnion(mergeContext: WayangContext, timeout: Duration = Duration.Inf): DataQuanta[Out] = {
+
+    // Execute plans asynchronously
+    val asyncResults = multiContextPlanBuilder.multiContexts.map(multiContext => {
+
+      // For each multiContext get its corresponding dataQuanta
+      val dataQuanta = dataQuantaMap(multiContext.id)
+
+      // Get the sink of the multiContext (it should be a merge sink)
+      multiContext.getSink match {
+
+        // And execute plan asynchronously
+        case Some(mergeFileSink: MultiContext.MergeFileSink) =>
+          dataQuanta.runAsync(mergeFileSink.url)
+
+        case None =>
+          throw new WayangException("All contexts must be attached to a merge sink.")
+
+        case _ =>
+          throw new WayangException("Invalid sink.")
+      }
+    })
+
+    // Get futures
+    val futures: List[Future[Any]] = asyncResults.map(_.future)
+
+    // Block indefinitely until all futures finish
+    val aggregateFuture: Future[List[Any]] = Future.sequence(futures)
+    Await.result(aggregateFuture, timeout)
+
+    // Create plan builder for the new merge context
+    val planBuilder = new PlanBuilder(mergeContext).withUdfJarsOf(this.getClass)
+
+    // Sources to merge
+    var sources: List[DataQuanta[Out]] = List()
+
+    // Create sources by loading each one using the merge context
+    asyncResults.foreach(dataQuantaAsyncResult2 => sources = sources :+ planBuilder.loadAsync(dataQuantaAsyncResult2))
+
+    // Merge sources with union and return
+    sources.reduce((dq1, dq2) => dq1.union(dq2))
+  }
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/MultiContextPlanBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/MultiContextPlanBuilder.scala
new file mode 100644
index 0000000..a49a908
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/MultiContextPlanBuilder.scala
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api
+
+import org.apache.wayang.core.util.ReflectionUtils
+
+import scala.collection.JavaConverters._
+import scala.language.implicitConversions
+import scala.reflect.ClassTag
+
+class MultiContextPlanBuilder(private[api] val multiContexts: List[MultiContext]) {
+
+  private[api] var udfJars = scala.collection.mutable.Set[String]()
+
+  private val multiContextMap: Map[Long, MultiContext] = multiContexts.map(context => context.id -> context).toMap
+
+  private var dataQuantaMap: Map[Long, DataQuanta[_]] = Map()
+
+  private var planBuilderMap: Map[Long, PlanBuilder] = multiContexts.map(context => context.id -> new PlanBuilder(context)).toMap
+
+  def this(multiContexts: java.util.List[MultiContext]) =
+    this(multiContexts.asScala.toList)
+
+
+  /**
+   * Defines user-code JAR files that might be needed to transfer to execution platforms.
+   *
+   * @param paths paths to JAR files that should be transferred
+   * @return this instance
+   */
+  def withUdfJars(paths: String*): MultiContextPlanBuilder = {
+    // For each planBuilder in planBuilderMap,
+    // call its `withUdfJars` method with the value of `this.udfJars` and update map with the new PlanBuilder
+    planBuilderMap = planBuilderMap.mapValues(_.withUdfJars(paths.toSeq: _*))
+    this
+  }
+
+  /**
+   * Defines user-code JAR files that might be needed to transfer to execution platforms.
+   *
+   * @param classes whose JAR files should be transferred
+   * @return this instance
+   */
+  def withUdfJarsOf(classes: Class[_]*): MultiContextPlanBuilder = {
+    withUdfJars(classes.map(ReflectionUtils.getDeclaringJar).filterNot(_ == null): _*)
+    this
+  }
+
+  /**
+   * Applies a function `f` to each [[PlanBuilder]] in the contexts.
+   * Returns a [[MultiContextDataQuanta]] containing the [[DataQuanta]].
+   *
+   * @param f the function to apply to each [[PlanBuilder]]
+   * @tparam Out the type parameter for the output of the `f` function
+   * @return a [[MultiContextDataQuanta]] containing the results of applying `f` to each [[PlanBuilder]]
+   */
+  def forEach[Out: ClassTag](f: PlanBuilder => DataQuanta[Out]): MultiContextDataQuanta[Out] = {
+    val dataQuantaMap = multiContexts.map(context => context.id -> f(planBuilderMap(context.id))).toMap
+    new MultiContextDataQuanta[Out](dataQuantaMap)(this)
+  }
+
+  /**
+   * Same as [[PlanBuilder.readTextFile]], but for specified `multiContext`
+   *
+   * @param multiContext   The multi context.
+   * @param url            The URL of the text file to be read.
+   * @return The ReadTextFileMultiContextPlanBuilder with the added data quanta.
+   */
+  def readTextFile(multiContext: MultiContext, url: String): ReadTextFileMultiContextPlanBuilder = {
+    dataQuantaMap += (multiContext.id -> planBuilderMap(multiContext.id).readTextFile(url))
+    new ReadTextFileMultiContextPlanBuilder(this, multiContextMap, dataQuantaMap.asInstanceOf[Map[Long, DataQuanta[String]]])
+  }
+
+  /**
+   * Same as [[PlanBuilder.readObjectFile()]], but for specified `multiContext`
+   *
+   * @param multiContext   The multi context.
+   * @param url            The URL of the object file to be read.
+   * @return The ReadObjectFileMultiContextPlanBuilder with the added data quanta.
+   */
+  def readObjectFile[T: ClassTag](multiContext: MultiContext, url: String): ReadObjectFileMultiContextPlanBuilder[T] = {
+    dataQuantaMap += (multiContext.id -> planBuilderMap(multiContext.id).readObjectFile(url))
+    new ReadObjectFileMultiContextPlanBuilder[T](this, multiContextMap, dataQuantaMap.asInstanceOf[Map[Long, DataQuanta[T]]])
+  }
+
+  /**
+   * Same as [[PlanBuilder.loadCollection]], but for specified `multiContext`
+   *
+   * @param multiContext   The multi context.
+   * @param iterable       The collection to be loaded.
+   * @return The LoadCollectionMultiContextPlanBuilder with the added data quanta.
+   */
+  def loadCollection[T: ClassTag](multiContext: MultiContext, iterable: Iterable[T]): LoadCollectionMultiContextPlanBuilder[T] = {
+    dataQuantaMap += (multiContext.id -> planBuilderMap(multiContext.id).loadCollection(iterable))
+    new LoadCollectionMultiContextPlanBuilder[T](this, multiContextMap, dataQuantaMap.asInstanceOf[Map[Long, DataQuanta[T]]])
+  }
+
+}
+
+
+class ReadTextFileMultiContextPlanBuilder(private val multiContextPlanBuilder: MultiContextPlanBuilder,
+                                          private val multiContextMap: Map[Long, MultiContext],
+                                          private var dataQuantaMap: Map[Long, DataQuanta[String]] = Map()) {
+
+  /**
+   * Same as [[PlanBuilder.readTextFile]], but for specified `multiContext`
+   *
+   * @param multiContext   The multi context.
+   * @param url            The URL of the text file to be read.
+   * @return The ReadTextFileMultiContextPlanBuilder with the added data quanta.
+   */
+  def readTextFile(multiContext: MultiContext, url: String): ReadTextFileMultiContextPlanBuilder = {
+    dataQuantaMap += (multiContext.id -> multiContextMap(multiContext.id).readTextFile(url))
+    this
+  }
+}
+
+object ReadTextFileMultiContextPlanBuilder {
+  implicit def toMultiContextDataQuanta(builder: ReadTextFileMultiContextPlanBuilder): MultiContextDataQuanta[String] = {
+    new MultiContextDataQuanta[String](builder.dataQuantaMap)(builder.multiContextPlanBuilder)
+  }
+}
+
+
+class ReadObjectFileMultiContextPlanBuilder[T: ClassTag](private val multiContextPlanBuilder: MultiContextPlanBuilder,
+                                                         private val multiContextMap: Map[Long, MultiContext],
+                                                         private var dataQuantaMap: Map[Long, DataQuanta[T]] = Map()) {
+
+  /**
+   * Same as [[PlanBuilder.readObjectFile()]], but for specified `multiContext`
+   *
+   * @param multiContext The multi context.
+   * @param url            The URL of the object file to be read.
+   * @return The ReadObjectFileMultiContextPlanBuilder with the added data quanta.
+   */
+  def readObjectFile(multiContext: MultiContext, url: String): ReadObjectFileMultiContextPlanBuilder[T] = {
+    dataQuantaMap += (multiContext.id -> multiContextMap(multiContext.id).readObjectFile(url))
+    this
+  }
+}
+
+object ReadObjectFileMultiContextPlanBuilder {
+  implicit def toMultiContextDataQuanta[T: ClassTag](builder: ReadObjectFileMultiContextPlanBuilder[T]): MultiContextDataQuanta[T] = {
+    new MultiContextDataQuanta[T](builder.dataQuantaMap)(builder.multiContextPlanBuilder)
+  }
+}
+
+
+class LoadCollectionMultiContextPlanBuilder[T: ClassTag](private val multiContextPlanBuilder: MultiContextPlanBuilder,
+                                                         private val multiContextMap: Map[Long, MultiContext],
+                                                         private var dataQuantaMap: Map[Long, DataQuanta[T]] = Map()) {
+
+  /**
+   * Same as [[PlanBuilder.loadCollection]], but for specified `multiContext`
+   *
+   * @param multiContext The multi context.
+   * @param iterable       The collection to be loaded.
+   * @return The LoadCollectionMultiContextPlanBuilder with the added data quanta.
+   */
+  def loadCollection(multiContext: MultiContext, iterable: Iterable[T]): LoadCollectionMultiContextPlanBuilder[T] = {
+    dataQuantaMap += (multiContext.id -> multiContextMap(multiContext.id).loadCollection(iterable))
+    this
+  }
+}
+
+object LoadCollectionMultiContextPlanBuilder {
+  implicit def toMultiContextDataQuanta[T: ClassTag](builder: LoadCollectionMultiContextPlanBuilder[T]): MultiContextDataQuanta[T] = {
+    new MultiContextDataQuanta[T](builder.dataQuantaMap)(builder.multiContextPlanBuilder)
+  }
+}
+
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala
index 1578421..a00d3f5 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala
@@ -38,11 +38,11 @@
 /**
   * Utility to build [[WayangPlan]]s.
   */
-class PlanBuilder(wayangContext: WayangContext, private var jobName: String = null) {
+class PlanBuilder(private[api] val wayangContext: WayangContext, private var jobName: String = null) {
 
   private[api] val sinks = ListBuffer[Operator]()
 
-  private val udfJars = scala.collection.mutable.Set[String]()
+  private[api] val udfJars = scala.collection.mutable.Set[String]()
 
   private var experiment: Experiment = _
 
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/async/DataQuantaAsyncResult.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/async/DataQuantaAsyncResult.scala
new file mode 100644
index 0000000..1090bf6
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/async/DataQuantaAsyncResult.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.wayang.api.async
+
+import scala.concurrent.Future
+import scala.reflect.ClassTag
+
+/**
+ * Represents an asynchronous result of data quanta processing.
+ *
+ * The `DataQuantaAsyncResult2` class is a case class used to encapsulate the result of an asynchronous data quanta processing.
+ * It contains the temporary output file path, the class tag of the output type, and the future representing the completion
+ * of the processing.
+ *
+ * @param tempFileOut The temporary output file path.
+ * @param classTag    The class tag of the output type.
+ * @param future      The future representing the completion of the processing.
+ * @tparam Out The type of the output.
+ */
+case class DataQuantaAsyncResult[Out: ClassTag](tempFileOut: String, classTag: ClassTag[Out], future: Future[_])
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/async/DataQuantaImplicits.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/async/DataQuantaImplicits.scala
new file mode 100644
index 0000000..9aa28b3
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/async/DataQuantaImplicits.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.async
+
+import org.apache.wayang.api
+import org.apache.wayang.api.{MultiContext, DataQuanta, async}
+
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+import scala.reflect.ClassTag
+
+/**
+ * Implicit conversions and utility methods for asynchronous operations on `DataQuanta`.
+ */
+object DataQuantaImplicits {
+
+  implicit class DataQuantaRunAsyncImplicits[T: ClassTag](dataQuanta: DataQuanta[T]) {
+
+    /**
+     * Asynchronously runs a Wayang plan and writes out to `tempFileOut`.
+     * This method schedules a new future to run after the previous futures in `dataQuantaAsyncResults` have completed.
+     * The result of the execution is represented by an instance of `DataQuantaAsyncResult`.
+     *
+     * @param tempFileOut            the temporary file output path
+     * @param dataQuantaAsyncResults a variable number of previous asynchronous results
+     * @return a future representing the asynchronous result of running the data quanta
+     */
+    def runAsync(tempFileOut: String, dataQuantaAsyncResults: DataQuantaAsyncResult[_]*): DataQuantaAsyncResult[T] = {
+      // Schedule new future to run after the previous futures in `dataQuantaAsyncResults` have completed
+      val resultFuture: Future[_] = getFutureSequence(dataQuantaAsyncResults: _*).flatMap { _ =>
+        async.runAsyncWithTempFileOut(dataQuanta, tempFileOut)
+      }
+      DataQuantaAsyncResult(tempFileOut, implicitly[ClassTag[T]], resultFuture)
+    }
+
+    /**
+     * Similar to [[DataQuanta.writeTextFile]] but for asynchronous execution.
+     *
+     * @param url                    The URL where the text file should be written.
+     * @param dataQuantaAsyncResults The asynchronous results of data quanta operations.
+     *                               Variable arguments of type 'DataQuantaAsyncResult[_]'.
+     *                               Multiple results can be provided in any order.
+     *                               The method will wait for all the provided futures to complete
+     *                               before executing the file writing operation.
+     * @return A future unit indicating the completion of the text file writing operation.
+     * @throws RuntimeException if any of the provided asynchronous results
+     *                          fails with an exception during execution.
+     */
+    def writeTextFileAsync(url: String, dataQuantaAsyncResults: DataQuantaAsyncResult[_]*): Future[Unit] = {
+      // Schedule new future to run after the previous futures in `dataQuantaAsyncResults` have completed
+      getFutureSequence(dataQuantaAsyncResults: _*).flatMap { _ =>
+        async.runAsyncWithTextFileOut(dataQuanta, url)
+      }
+    }
+
+    /**
+     * Similar to [[DataQuanta.writeObjectFile]] but for asynchronous execution.
+     *
+     * @param url                    The URL of the object file sink.
+     * @param dataQuantaAsyncResults The asynchronous results of data quanta operations.
+     *                               Variable arguments of type 'DataQuantaAsyncResult[_]'.
+     *                               Multiple results can be provided in any order.
+     *                               The method will wait for all the provided futures to complete
+     *                               before executing the file writing operation.* @return A `Future` that completes when the data quanta have been written successfully.
+     */
+    def writeObjectFileAsync(url: String, dataQuantaAsyncResults: DataQuantaAsyncResult[_]*): Future[Unit] = {
+      // Schedule new future to run after the previous futures in `dataQuantaAsyncResults` have completed
+      getFutureSequence(dataQuantaAsyncResults: _*).flatMap { _ =>
+        async.runAsyncWithObjectFileOut(dataQuanta, url)
+      }
+    }
+
+    // Extract futures
+    private def getFutureSequence(dataQuantaAsyncResult2: DataQuantaAsyncResult[_]*): Future[Seq[Any]] = {
+      // Extract the futures
+      val futures: Seq[Future[Any]] = dataQuantaAsyncResult2.map(_.future)
+
+      // Create a Future of Seq[Any]
+      Future.sequence(futures)
+    }
+  }
+
+}
\ No newline at end of file
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/async/Main.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/async/Main.scala
new file mode 100644
index 0000000..8be4f4b
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/async/Main.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.async
+
+import org.apache.wayang.api.serialization.TempFileUtils
+import org.apache.wayang.api.{MultiContext, PlanBuilder}
+import org.apache.wayang.basic.operators.{ObjectFileSink, TextFileSink}
+import org.apache.wayang.core.api.exception.WayangException
+import org.apache.wayang.core.plan.wayangplan.{Operator, WayangPlan}
+
+import java.nio.file.Path
+
+object Main {
+
+  /**
+   * Main method for executing a Wayang plan.
+   * Used as a spawned process for async operations and the MultiContextPlanBuilder API.
+   *
+   * @param args Array of command line arguments. Two arguments are expected: paths to the serialized operator and context.
+   */
+  def main(args: Array[String]): Unit = {
+    println("org.apache.wayang.api.async.Main")
+    println(args.mkString("Array(", ", ", ")"))
+
+    if (args.length != 2) {
+      println("Expected two arguments: paths to the serialized operator and context.")
+      System.exit(1)
+    }
+
+    // Parse file paths
+    val operatorPath = Path.of(args(0))
+    val planBuilderPath = Path.of(args(1))
+
+    // Parse operator and multiContextPlanBuilder
+    val operator = TempFileUtils.readFromTempFileFromString[Operator](operatorPath)
+    val planBuilder = TempFileUtils.readFromTempFileFromString[PlanBuilder](planBuilderPath)
+
+    // Get context
+    val context = planBuilder.wayangContext.asInstanceOf[MultiContext]
+
+    // Get udf jars
+    val udfJars = planBuilder.udfJars
+    println(s"udfJars: $udfJars")
+
+    // Get out output type to create sink with
+    val outType = operator.getOutput(0).getType.getDataUnitType.getTypeClass
+
+    // Connect to sink and execute plan
+    context.getSink match {
+      case Some(textFileSink: MultiContext.TextFileSink) =>
+        connectToSinkAndExecutePlan(new TextFileSink(textFileSink.url, outType))
+
+      case Some(objectFileSink: MultiContext.ObjectFileSink) =>
+        connectToSinkAndExecutePlan(new ObjectFileSink(objectFileSink.url, outType))
+
+      case None =>
+        throw new WayangException("All contexts must be attached to an output sink.")
+
+      case _ =>
+        throw new WayangException("Invalid sink..")
+    }
+
+
+    def connectToSinkAndExecutePlan(sink: Operator): Unit = {
+      operator.connectTo(0, sink, 0)
+      context.execute(new WayangPlan(sink), udfJars.toSeq: _*)
+    }
+  }
+}
\ No newline at end of file
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/async/PlanBuilderImplicits.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/async/PlanBuilderImplicits.scala
new file mode 100644
index 0000000..c6d841e
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/async/PlanBuilderImplicits.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.wayang.api.async
+
+import org.apache.wayang.api.{DataQuanta, PlanBuilder, async}
+
+import scala.concurrent.Future
+import scala.reflect.ClassTag
+
+
+object PlanBuilderImplicits {
+
+  /**
+   * An implicit class that adds additional functionality to the PlanBuilder class
+   * for loading results of previous asynchronous operations.
+   *
+   * @param planBuilder The PlanBuilder instance to extend.
+   * @tparam Out The type of data to be loaded.
+   */
+  implicit class PlanBuilderLoadAsyncImplicit[Out: ClassTag](planBuilder: PlanBuilder) {
+    /**
+     * Loads the data from the result of a previous asynchronous operation.
+     *
+     * @param asyncResult The asynchronous result object containing the temporary file path of the data to be loaded.
+     * @return A DataQuanta representing the loaded data.
+     */
+    def loadAsync(asyncResult: DataQuantaAsyncResult[Out]): DataQuanta[Out] = {
+      planBuilder.readObjectFile[Out](asyncResult.tempFileOut)
+    }
+  }
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/async/package.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/async/package.scala
new file mode 100644
index 0000000..771e618
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/async/package.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api
+
+import org.apache.logging.log4j.{LogManager, Logger}
+import org.apache.wayang.api.serialization.TempFileUtils
+import org.apache.wayang.core.api.exception.WayangException
+
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Files, Paths}
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+import scala.reflect.ClassTag
+
+package object async {
+
+  val logger: Logger = LogManager.getLogger(getClass)
+
+  /**
+   * Runs the given data quanta asynchronously with a temporary file as the output.
+   *
+   * @param dataQuanta  The data quanta to be executed.
+   * @param tempFileOut The path to the temporary output file.
+   * @tparam Out The type parameter of the data quanta and the output.
+   * @return A future representing the completion of the execution, which will contain a
+   *         DataQuantaAsyncResult object that holds the path to the temporary output file
+   *         and the class tag for the output type.
+   */
+  def runAsyncWithTempFileOut[Out: ClassTag](dataQuanta: DataQuanta[Out], tempFileOut: String): Future[Unit] = {
+    runAsyncWithObjectFileOut(dataQuanta, tempFileOut)
+  }
+
+
+  /**
+   * Runs the given DataQuanta asynchronously and writes the output to a text file.
+   *
+   * @param dataQuanta the DataQuanta to be executed
+   * @param url        the URL of the text file to write the output to
+   * @tparam Out the type of the output DataQuanta
+   * @return a Future representing the completion of the execution
+   * @throws WayangException if the WayangContext is not of type MultiContext
+   */
+  def runAsyncWithTextFileOut[Out: ClassTag](dataQuanta: DataQuanta[Out], url: String): Future[Unit] = {
+    // Add sink to multi context and then pass to runAsyncBody
+    val wayangContext = dataQuanta.planBuilder.wayangContext
+    wayangContext match {
+      case context: MultiContext =>
+        val updatedContext = context.withTextFileSink(url)
+        runAsyncBody(dataQuanta, updatedContext)
+      case _ =>
+        throw new WayangException("WayangContext is not of type MultiContext")
+    }
+  }
+
+
+  /**
+   * Runs the given DataQuanta asynchronously and writes the output to an object file specified by the URL.
+   *
+   * @param dataQuanta the DataQuanta to be executed
+   * @param url        the URL of the object file to write the output to
+   * @tparam Out the type parameter for the output DataQuanta
+   * @return a Future that represents the execution of the DataQuanta
+   * @throws WayangException if the WayangContext is not of type MultiContext
+   */
+  def runAsyncWithObjectFileOut[Out: ClassTag](dataQuanta: DataQuanta[Out], url: String): Future[Unit] = {
+    // Add sink to multi context and then pass to runAsyncBody
+    val wayangContext = dataQuanta.planBuilder.wayangContext
+    wayangContext match {
+      case context: MultiContext =>
+        val updatedContext = context.withObjectFileSink(url)
+        runAsyncBody(dataQuanta, updatedContext)
+      case _ =>
+        throw new WayangException("WayangContext is not of type MultiContext")
+    }
+  }
+
+
+  def runAsyncBody[Out: ClassTag](dataQuanta: DataQuanta[Out], multiContext: MultiContext): Future[Unit] = Future {
+
+    import scala.concurrent.blocking
+
+    // Write plan builder to temp file
+    val planBuilderPath = TempFileUtils.writeToTempFileAsString(dataQuanta.planBuilder.withUdfJarsOf(this.getClass))
+
+    // Write operator to temp file
+    val operatorPath = TempFileUtils.writeToTempFileAsString(dataQuanta.operator)
+
+    var process: Process = null
+
+    try {
+      val mainClass = "org.apache.wayang.api.async.Main"
+      val classpath = System.getProperty("java.class.path") // get classpath from parent JVM
+
+      // Child process
+      val processBuilder = new ProcessBuilder(
+        "java",
+        "-cp",
+        classpath,
+        mainClass,
+        operatorPath.toString,
+        planBuilderPath.toString
+      )
+
+      // Redirect children output to parent output
+      processBuilder.redirectOutput(ProcessBuilder.Redirect.INHERIT)
+      processBuilder.redirectError(ProcessBuilder.Redirect.INHERIT)
+
+      // Start child process
+      process = processBuilder.start()
+
+      // And block this future while waiting for it
+      blocking {
+        process.waitFor()
+      }
+    }
+
+    finally {
+      Files.deleteIfExists(planBuilderPath)
+      Files.deleteIfExists(operatorPath)
+    }
+
+  }
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/package.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/package.scala
index 715741c..2514f09 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/package.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/package.scala
@@ -20,10 +20,10 @@
 
 import _root_.java.lang.{Class => JavaClass, Iterable => JavaIterable}
 import _root_.java.util.function.{Consumer, ToLongBiFunction, ToLongFunction}
-
 import org.apache.wayang.basic.data.{Record, Tuple2 => WayangTuple2}
 import org.apache.wayang.core.api.WayangContext
-import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBinaryOperator, SerializableFunction, SerializablePredicate}
+import org.apache.wayang.core.function.FunctionDescriptor
+import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBinaryOperator, SerializableFunction, SerializablePredicate, SerializableToLongBiFunction}
 import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
 import org.apache.wayang.core.optimizer.cardinality.{CardinalityEstimate, CardinalityEstimator, DefaultCardinalityEstimator, FixedSizeCardinalityEstimator}
 import org.apache.wayang.core.optimizer.costs.{DefaultLoadEstimator, LoadEstimator, LoadProfileEstimator, NestableLoadProfileEstimator}
@@ -103,12 +103,12 @@
     new FixedSizeCardinalityEstimator(fixCardinality, true)
 
   implicit def toCardinalityEstimator(f: Long => Long): CardinalityEstimator =
-    new DefaultCardinalityEstimator(.99d, 1, true, new ToLongFunction[Array[Long]] {
+    new DefaultCardinalityEstimator(.99d, 1, true, new FunctionDescriptor.SerializableToLongFunction[Array[Long]] {
       override def applyAsLong(inCards: Array[Long]): Long = f.apply(inCards(0))
     })
 
   implicit def toCardinalityEstimator(f: (Long, Long) => Long): CardinalityEstimator =
-    new DefaultCardinalityEstimator(.99d, 1, true, new ToLongFunction[Array[Long]] {
+    new DefaultCardinalityEstimator(.99d, 1, true, new FunctionDescriptor.SerializableToLongFunction[Array[Long]] {
       override def applyAsLong(inCards: Array[Long]): Long = f.apply(inCards(0), inCards(1))
     })
 
@@ -118,7 +118,7 @@
       1,
       .99d,
       CardinalityEstimate.EMPTY_ESTIMATE,
-      new ToLongBiFunction[Array[Long], Array[Long]] {
+      new SerializableToLongBiFunction[Array[Long], Array[Long]] {
         override def applyAsLong(t: Array[Long], u: Array[Long]): Long = f.apply(t(0), u(0))
       }
     )
@@ -129,7 +129,7 @@
       1,
       .99d,
       CardinalityEstimate.EMPTY_ESTIMATE,
-      new ToLongBiFunction[Array[Long], Array[Long]] {
+      new SerializableToLongBiFunction[Array[Long], Array[Long]] {
         override def applyAsLong(t: Array[Long], u: Array[Long]): Long = f.apply(t(0), t(1), u(0))
       }
     )
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/SerializationUtils.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/SerializationUtils.scala
new file mode 100644
index 0000000..541d13d
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/SerializationUtils.scala
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.serialization
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility
+import com.fasterxml.jackson.annotation._
+import com.fasterxml.jackson.databind._
+import com.fasterxml.jackson.databind.module.SimpleModule
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import org.apache.wayang.api.serialization.customserializers._
+import org.apache.wayang.api.serialization.mixins.ConfigurationAndContextMixIns._
+import org.apache.wayang.api.serialization.mixins.DataTypeMixIns._
+import org.apache.wayang.api.serialization.mixins.DescriptorMixIns._
+import org.apache.wayang.api.serialization.mixins.EstimatorMixIns._
+import org.apache.wayang.api.serialization.mixins.IgnoreLoggerMixIn
+import org.apache.wayang.api.serialization.mixins.OperatorMixIns._
+import org.apache.wayang.api.serialization.mixins.ProviderMixIns._
+import org.apache.wayang.api.serialization.mixins.SlotMixIns._
+import org.apache.wayang.api.{MultiContext, MultiContextPlanBuilder}
+import org.apache.wayang.basic.function.ProjectionDescriptor
+import org.apache.wayang.basic.operators._
+import org.apache.wayang.basic.types.RecordType
+import org.apache.wayang.core.api.configuration._
+import org.apache.wayang.core.api.{Configuration, Job, WayangContext}
+import org.apache.wayang.core.function.FunctionDescriptor._
+import org.apache.wayang.core.function._
+import org.apache.wayang.core.mapping.{OperatorPattern, PlanTransformation}
+import org.apache.wayang.core.optimizer.cardinality.{CardinalityEstimate, CardinalityEstimator, CardinalityEstimatorManager, CardinalityPusher, DefaultCardinalityEstimator}
+import org.apache.wayang.core.optimizer.channels.ChannelConversionGraph
+import org.apache.wayang.core.optimizer.costs._
+import org.apache.wayang.core.optimizer.enumeration._
+import org.apache.wayang.core.optimizer.{OptimizationContext, OptimizationUtils, ProbabilisticDoubleInterval, SanityChecker}
+import org.apache.wayang.core.plan.executionplan.{Channel, ExecutionPlan}
+import org.apache.wayang.core.plan.wayangplan._
+import org.apache.wayang.core.plan.wayangplan.traversal.AbstractTopologicalTraversal
+import org.apache.wayang.core.platform._
+import org.apache.wayang.core.profiling.{CardinalityRepository, ExecutionLog}
+import org.apache.wayang.core.types.{BasicDataUnitType, DataSetType, DataUnitGroupType, DataUnitType}
+import org.apache.wayang.core.util.fs.{FileSystems, HadoopFileSystem, LocalFileSystem}
+import org.apache.wayang.core.util.{AbstractReferenceCountable, ReflectionUtils}
+
+import scala.reflect.ClassTag
+
+object SerializationUtils {
+
+  val mapper: ObjectMapper = {
+    val mapper = new ObjectMapper()
+      .setVisibility(PropertyAccessor.FIELD, Visibility.ANY)
+      .configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false)
+      .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+      .enable(SerializationFeature.INDENT_OUTPUT)
+      .registerModule(DefaultScalaModule)
+
+      // Custom serializers
+      .registerModule(new SimpleModule().addSerializer(classOf[MultiContext], new MultiContextSerializer()))
+      .registerModule(new SimpleModule().addDeserializer(classOf[MultiContext], new MultiContextDeserializer()))
+      .registerModule(new SimpleModule().addSerializer(classOf[Platform], new PlatformSerializer()))
+      .registerModule(new SimpleModule().addDeserializer(classOf[Platform], new PlatformDeserializer()))
+      .registerModule(new SimpleModule().addDeserializer(classOf[Operator], new OperatorDeserializer()))
+
+      // Custom serializers for UDFs
+      .registerModule(new SimpleModule().addSerializer(classOf[SerializablePredicate[_]], new GenericSerializableSerializer[SerializablePredicate[_]]()))
+      .registerModule(new SimpleModule().addDeserializer(classOf[SerializablePredicate[_]], new GenericSerializableDeserializer[SerializablePredicate[_]]()))
+      .registerModule(new SimpleModule().addSerializer(classOf[SerializableFunction[_, _]], new GenericSerializableSerializer[FunctionDescriptor.SerializableFunction[_, _]]()))
+      .registerModule(new SimpleModule().addDeserializer(classOf[SerializableFunction[_, _]], new GenericSerializableDeserializer[FunctionDescriptor.SerializableFunction[_, _]]()))
+      .registerModule(new SimpleModule().addSerializer(classOf[SerializableBinaryOperator[_]], new GenericSerializableSerializer[SerializableBinaryOperator[_]]()))
+      .registerModule(new SimpleModule().addDeserializer(classOf[SerializableBinaryOperator[_]], new GenericSerializableDeserializer[SerializableBinaryOperator[_]]()))
+      .registerModule(new SimpleModule().addSerializer(classOf[SerializableConsumer[_]], new GenericSerializableSerializer[SerializableConsumer[_]]()))
+      .registerModule(new SimpleModule().addDeserializer(classOf[SerializableConsumer[_]], new GenericSerializableDeserializer[SerializableConsumer[_]]()))
+      .registerModule(new SimpleModule().addSerializer(classOf[SerializableIntUnaryOperator], new GenericSerializableSerializer[SerializableIntUnaryOperator]()))
+      .registerModule(new SimpleModule().addDeserializer(classOf[SerializableIntUnaryOperator], new GenericSerializableDeserializer[SerializableIntUnaryOperator]()))
+      .registerModule(new SimpleModule().addSerializer(classOf[SerializableLongUnaryOperator], new GenericSerializableSerializer[SerializableLongUnaryOperator]()))
+      .registerModule(new SimpleModule().addDeserializer(classOf[SerializableLongUnaryOperator], new GenericSerializableDeserializer[SerializableLongUnaryOperator]()))
+      .registerModule(new SimpleModule().addSerializer(classOf[LoadEstimator.SinglePointEstimationFunction], new GenericSerializableSerializer[LoadEstimator.SinglePointEstimationFunction]()))
+      .registerModule(new SimpleModule().addDeserializer(classOf[LoadEstimator.SinglePointEstimationFunction], new GenericSerializableDeserializer[LoadEstimator.SinglePointEstimationFunction]()))
+      .registerModule(new SimpleModule().addSerializer(classOf[SerializableToLongBiFunction[_, _]], new GenericSerializableSerializer[SerializableToLongBiFunction[_, _]]()))
+      .registerModule(new SimpleModule().addDeserializer(classOf[SerializableToLongBiFunction[_, _]], new GenericSerializableDeserializer[SerializableToLongBiFunction[_, _]]()))
+      .registerModule(new SimpleModule().addSerializer(classOf[SerializableToDoubleBiFunction[_, _]], new GenericSerializableSerializer[SerializableToDoubleBiFunction[_, _]]()))
+      .registerModule(new SimpleModule().addDeserializer(classOf[SerializableToDoubleBiFunction[_, _]], new GenericSerializableDeserializer[SerializableToDoubleBiFunction[_, _]]()))
+
+    // Register mix-ins
+    mapper
+      .addMixIn(classOf[MultiContextPlanBuilder], classOf[MultiContextPlanBuilderMixIn])
+      .addMixIn(classOf[WayangContext], classOf[WayangContextMixIn])
+      .addMixIn(classOf[Configuration], classOf[ConfigurationMixIn])
+      .addMixIn(classOf[CardinalityRepository], classOf[IgnoreLoggerMixIn])
+      .addMixIn(classOf[KeyValueProvider[_, _]], classOf[KeyValueProviderMixIn])
+      .addMixIn(classOf[ValueProvider[_]], classOf[IgnoreLoggerMixIn])
+      .addMixIn(classOf[CollectionProvider[_]], classOf[CollectionProviderMixIn])
+      .addMixIn(classOf[ExplicitCollectionProvider[_]], classOf[ExplicitCollectionProviderMixIn])
+      .addMixIn(classOf[FunctionalKeyValueProvider[_, _]], classOf[FunctionalKeyValueProviderMixIn[_, _]])
+      .addMixIn(classOf[MapBasedKeyValueProvider[_, _]], classOf[MapBasedKeyValueProviderMixIn[_, _]])
+      .addMixIn(classOf[ConstantValueProvider[_]], classOf[ConstantValueProviderMixIn])
+      .addMixIn(classOf[PlanTransformation], classOf[IgnoreLoggerMixIn])
+      .addMixIn(classOf[OperatorPattern[_]], classOf[OperatorPatternMixin])
+      .addMixIn(classOf[Slot[_]], classOf[SlotMixIn[_]])
+      .addMixIn(classOf[OutputSlot[_]], classOf[OutputSlotMixIn[_]])
+      .addMixIn(classOf[OperatorBase], classOf[OperatorBaseMixIn])
+      .addMixIn(classOf[MultiContext.UnarySink], classOf[MultiContextUnarySinkMixIn])
+      .addMixIn(classOf[ElementaryOperator], classOf[ElementaryOperatorMixIn])
+      .addMixIn(classOf[ActualOperator], classOf[ActualOperatorMixIn])
+      .addMixIn(classOf[Operator], classOf[OperatorMixIn])
+      .addMixIn(classOf[PredicateDescriptor[_]], classOf[PredicateDescriptorMixIn[_]])
+      .addMixIn(classOf[TransformationDescriptor[_, _]], classOf[TransformationDescriptorMixIn[_, _]])
+      .addMixIn(classOf[ProjectionDescriptor[_, _]], classOf[ProjectionDescriptorMixIn[_, _]])
+      .addMixIn(classOf[ReduceDescriptor[_]], classOf[ReduceDescriptorMixIn[_]])
+      .addMixIn(classOf[FlatMapDescriptor[_, _]], classOf[FlatMapDescriptorMixIn[_, _]])
+      .addMixIn(classOf[MapPartitionsDescriptor[_, _]], classOf[MapPartitionsDescriptorMixIn[_, _]])
+      .addMixIn(classOf[BasicDataUnitType[_]], classOf[BasicDataUnitTypeMixIn[_]])
+      .addMixIn(classOf[RecordType], classOf[RecordTypeMixIn])
+      .addMixIn(classOf[DataUnitGroupType[_]], classOf[DataUnitGroupTypeMixIn[_]])
+      .addMixIn(classOf[ProbabilisticDoubleInterval], classOf[ProbabilisticDoubleIntervalMixIn])
+      .addMixIn(classOf[LoadProfileEstimator], classOf[LoadProfileEstimatorMixIn])
+      .addMixIn(classOf[FunctionDescriptor], classOf[FunctionDescriptorMixIn])
+      .addMixIn(classOf[NestableLoadProfileEstimator], classOf[NestableLoadProfileEstimatorMixIn])
+      .addMixIn(classOf[LoadEstimator], classOf[LoadEstimatorMixIn])
+      .addMixIn(classOf[DefaultLoadEstimator], classOf[DefaultLoadEstimatorMixIn])
+      .addMixIn(classOf[CardinalityEstimate], classOf[CardinalityEstimateMixIn])
+      .addMixIn(classOf[DataSetType[_]], classOf[DataSetTypeMixIn[_]])
+      .addMixIn(classOf[DataUnitType[_]], classOf[DataUnitTypeMixIn])
+      .addMixIn(classOf[TextFileSource], classOf[IgnoreLoggerMixIn])
+      .addMixIn(classOf[UnarySource[_]], classOf[UnarySourceMixIn[_]])
+      .addMixIn(classOf[UnarySink[_]], classOf[UnarySinkMixIn[_]])
+      .addMixIn(classOf[UnaryToUnaryOperator[_, _]], classOf[UnaryToUnaryOperatorMixIn[_, _]])
+      .addMixIn(classOf[BinaryToUnaryOperator[_, _, _]], classOf[BinaryToUnaryOperatorMixIn[_, _, _]])
+      .addMixIn(classOf[LoopHeadOperator], classOf[LoopHeadOperatorMixIn])
+      .addMixIn(classOf[SampleOperator[_]], classOf[IgnoreLoggerMixIn])
+      .addMixIn(classOf[CardinalityEstimator], classOf[CardinalityEstimatorMixIn])
+      .addMixIn(classOf[DefaultCardinalityEstimator], classOf[DefaultCardinalityEstimatorMixIn])
+      .addMixIn(classOf[EstimatableCost], classOf[EstimatableCostMixIn])
+
+
+    // Ignore loggers
+      .addMixIn(classOf[Job], classOf[IgnoreLoggerMixIn])
+      .addMixIn(classOf[OptimizationContext], classOf[IgnoreLoggerMixIn])
+      .addMixIn(classOf[OptimizationUtils], classOf[IgnoreLoggerMixIn])
+      .addMixIn(classOf[SanityChecker], classOf[IgnoreLoggerMixIn])
+      .addMixIn(classOf[CardinalityEstimatorManager], classOf[IgnoreLoggerMixIn])
+      .addMixIn(classOf[CardinalityPusher], classOf[IgnoreLoggerMixIn])
+      .addMixIn(classOf[ChannelConversionGraph], classOf[IgnoreLoggerMixIn])
+      .addMixIn(classOf[LoadProfileEstimators], classOf[IgnoreLoggerMixIn])
+      .addMixIn(classOf[LatentOperatorPruningStrategy], classOf[IgnoreLoggerMixIn])
+      .addMixIn(classOf[PlanEnumeration], classOf[IgnoreLoggerMixIn])
+      .addMixIn(classOf[PlanEnumerator], classOf[IgnoreLoggerMixIn])
+      .addMixIn(classOf[PlanImplementation], classOf[IgnoreLoggerMixIn])
+      .addMixIn(classOf[StageAssignmentTraversal], classOf[IgnoreLoggerMixIn])
+      .addMixIn(classOf[Channel], classOf[IgnoreLoggerMixIn])
+      .addMixIn(classOf[ExecutionPlan], classOf[IgnoreLoggerMixIn])
+      .addMixIn(classOf[PlanTraversal], classOf[IgnoreLoggerMixIn])
+      .addMixIn(classOf[SlotMapping], classOf[IgnoreLoggerMixIn])
+      .addMixIn(classOf[WayangPlan], classOf[IgnoreLoggerMixIn])
+      .addMixIn(classOf[AbstractTopologicalTraversal[_, _]], classOf[IgnoreLoggerMixIn])
+      .addMixIn(classOf[CardinalityBreakpoint], classOf[IgnoreLoggerMixIn])
+      .addMixIn(classOf[CrossPlatformExecutor], classOf[IgnoreLoggerMixIn])
+      .addMixIn(classOf[ExecutorTemplate], classOf[IgnoreLoggerMixIn])
+      .addMixIn(classOf[Junction], classOf[IgnoreLoggerMixIn])
+      .addMixIn(classOf[ExecutionLog], classOf[IgnoreLoggerMixIn])
+      .addMixIn(classOf[AbstractReferenceCountable], classOf[IgnoreLoggerMixIn])
+      .addMixIn(classOf[ReflectionUtils], classOf[IgnoreLoggerMixIn])
+      .addMixIn(classOf[FileSystems], classOf[IgnoreLoggerMixIn])
+      .addMixIn(classOf[HadoopFileSystem], classOf[IgnoreLoggerMixIn])
+      .addMixIn(classOf[LocalFileSystem], classOf[IgnoreLoggerMixIn])
+      .addMixIn(classOf[ObjectFileSource[_]], classOf[IgnoreLoggerMixIn])
+
+    mapper
+  }
+
+
+  def serialize(obj: AnyRef): Array[Byte] = {
+    mapper.writeValueAsBytes(obj)
+  }
+
+  def serializeAsString(obj: AnyRef): String = {
+    mapper.writeValueAsString(obj)
+  }
+
+  def deserialize[T: ClassTag](bytes: Array[Byte]): T = {
+    val clazz = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]
+    mapper.readValue(bytes, clazz)
+  }
+
+  def deserializeFromString[T: ClassTag](string: String): T = {
+    val clazz = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]
+    mapper.readValue(string, clazz)
+  }
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/TempFileUtils.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/TempFileUtils.scala
new file mode 100644
index 0000000..82c37bd
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/TempFileUtils.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.serialization
+
+import java.io.{FileInputStream, FileOutputStream}
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Files, Path}
+import scala.reflect.ClassTag
+
+object TempFileUtils {
+
+  def writeToTempFileAsBinary(obj: AnyRef): Path = {
+    val tempFile = Files.createTempFile("serialized", ".tmp")
+    val fos = new FileOutputStream(tempFile.toFile)
+    try {
+      fos.write(SerializationUtils.serialize(obj))
+    } finally {
+      fos.close()
+    }
+    tempFile
+  }
+
+
+  def readFromTempFileFromBinary[T : ClassTag](path: Path): T = {
+    val fis = new FileInputStream(path.toFile)
+    try {
+      SerializationUtils.deserialize[T](fis.readAllBytes())
+    } finally {
+      fis.close()
+      Files.deleteIfExists(path)
+    }
+  }
+
+
+  def writeToTempFileAsString(obj: AnyRef): Path = {
+    val tempFile = Files.createTempFile("serialized", ".tmp")
+    val serializedString = SerializationUtils.serializeAsString(obj)
+    Files.writeString(tempFile, serializedString, StandardCharsets.UTF_8)
+    tempFile
+  }
+
+
+  def readFromTempFileFromString[T: ClassTag](path: Path): T = {
+    val serializedString = Files.readString(path, StandardCharsets.UTF_8)
+    val deserializedObject = SerializationUtils.deserializeFromString[T](serializedString)
+    Files.deleteIfExists(path)
+    deserializedObject
+  }
+}
\ No newline at end of file
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/GenericSerializableDeserializer.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/GenericSerializableDeserializer.scala
new file mode 100644
index 0000000..5514aa7
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/GenericSerializableDeserializer.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.wayang.api.serialization.customserializers
+
+import com.fasterxml.jackson.core.JsonParser
+import com.fasterxml.jackson.databind.{DeserializationContext, JsonDeserializer}
+
+import java.io.{ByteArrayInputStream, ObjectInputStream}
+
+class GenericSerializableDeserializer[T] extends JsonDeserializer[T] {
+
+  override def deserialize(p: JsonParser, ctxt: DeserializationContext): T = {
+    val value = p.getBinaryValue
+    val byteArrayInputStream: ByteArrayInputStream = new ByteArrayInputStream(value)
+    val inputStream: ObjectInputStream = new ObjectInputStream(byteArrayInputStream)
+    inputStream.readObject().asInstanceOf[T]
+  }
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/GenericSerializableSerializer.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/GenericSerializableSerializer.scala
new file mode 100644
index 0000000..991fcdf
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/GenericSerializableSerializer.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.wayang.api.serialization.customserializers
+
+import com.fasterxml.jackson.core.JsonGenerator
+import com.fasterxml.jackson.databind.{JsonSerializer, SerializerProvider}
+
+import java.io.{ByteArrayOutputStream, ObjectOutputStream}
+
+class GenericSerializableSerializer[T] extends JsonSerializer[T] {
+  override def serialize(value: T, gen: JsonGenerator, serializers: SerializerProvider): Unit = {
+    val byteArrayOutputStream: ByteArrayOutputStream = new ByteArrayOutputStream()
+    val outputStream: ObjectOutputStream = new ObjectOutputStream(byteArrayOutputStream)
+    outputStream.writeObject(value)
+    gen.writeBinary(byteArrayOutputStream.toByteArray)
+  }
+}
+
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/MultiContextDeserializer.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/MultiContextDeserializer.scala
new file mode 100644
index 0000000..e98fb8e
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/MultiContextDeserializer.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.wayang.api.serialization.customserializers
+
+import com.fasterxml.jackson.core.{JsonParser, JsonProcessingException}
+import com.fasterxml.jackson.core.`type`.TypeReference
+import com.fasterxml.jackson.databind.{DeserializationContext, JsonDeserializer, JsonNode}
+import com.fasterxml.jackson.databind.jsontype.TypeDeserializer
+import org.apache.wayang.api.MultiContext
+import org.apache.wayang.api.serialization.SerializationUtils.mapper
+import org.apache.wayang.core.api.Configuration
+import org.apache.wayang.java.Java
+import org.apache.wayang.postgres.Postgres
+import org.apache.wayang.spark.Spark
+import org.apache.wayang.sqlite3.Sqlite3
+
+import java.io.IOException
+
+class MultiContextDeserializer extends JsonDeserializer[MultiContext] {
+
+  override def deserializeWithType(p: JsonParser, ctxt: DeserializationContext, typeDeserializer: TypeDeserializer): AnyRef = {
+    this.deserialize(p, ctxt)
+  }
+
+  @throws[IOException]
+  @throws[JsonProcessingException]
+  override def deserialize(jp: JsonParser, ctxt: DeserializationContext): MultiContext = {
+
+    // Deserialize each field of MultiContext separately
+    val node: JsonNode = jp.getCodec.readTree(jp)
+
+    val configurationParser: JsonParser = node.get("configuration").traverse(jp.getCodec)
+    val configuration: Configuration = mapper.readValue(configurationParser, classOf[Configuration])
+
+    val sinkParser: JsonParser = node.get("sink").traverse(jp.getCodec)
+    val sink: Option[MultiContext.UnarySink] = mapper.readValue(sinkParser, new TypeReference[Option[MultiContext.UnarySink]]() {})
+
+    val pluginsParser: JsonParser = node.get("plugins").traverse(jp.getCodec)
+    val plugins: List[String] = mapper.readValue(pluginsParser, new TypeReference[List[String]]() {})
+
+    //
+    // Create the whole deserialized multi context
+    //
+    // 1. Add configuration
+    val multiContext = new MultiContext(configuration)
+
+    // 2. Add sink
+    sink match {
+      case Some(MultiContext.TextFileSink(url)) =>
+        println(s"It's a TextFileSink with url: $url")
+        multiContext.withTextFileSink(url)
+      case Some(MultiContext.ObjectFileSink(url)) =>
+        println(s"It's an ObjectFileSink with url: $url")
+        multiContext.withObjectFileSink(url)
+      case None =>
+        println("No sink defined")
+      case _ =>
+        println("Unknown sink type")
+    }
+
+    // TODO: Add all plugins
+    // 3. Add plugins
+    val javaPluginName = Java.basicPlugin.getClass.getName
+    val sparkPluginName = Spark.basicPlugin.getClass.getName
+    val postgresPluginName = Postgres.plugin().getClass.getName
+    // val flinkPluginName = Flink.basicPlugin().getClass.getName
+    val sqlite3PluginName = Sqlite3.plugin().getClass.getName
+
+    plugins.foreach {
+      case pluginName if pluginName == javaPluginName => multiContext.register(Java.basicPlugin())
+      case pluginName if pluginName == sparkPluginName => multiContext.register(Spark.basicPlugin())
+      case pluginName if pluginName == postgresPluginName => multiContext.register(Postgres.plugin())
+      // case pluginName if pluginName == flinkPluginName => multiContext.register(Flink.basicPlugin())
+      case pluginName if pluginName == sqlite3PluginName => multiContext.register(Sqlite3.plugin())
+      case _ => println("Unknown plugin detected")
+    }
+
+    multiContext
+  }
+}
+
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/MultiContextSerializer.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/MultiContextSerializer.scala
new file mode 100644
index 0000000..1b49299
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/MultiContextSerializer.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.wayang.api.serialization.customserializers
+
+import com.fasterxml.jackson.core.{JsonGenerator, JsonProcessingException}
+import com.fasterxml.jackson.databind.SerializerProvider
+import com.fasterxml.jackson.databind.jsontype.TypeSerializer
+import com.fasterxml.jackson.databind.ser.std.StdSerializer
+import org.apache.wayang.api.MultiContext
+
+import java.io.IOException
+
+
+class MultiContextSerializer extends StdSerializer[MultiContext](classOf[MultiContext]) {
+
+  override def serializeWithType(value: MultiContext, gen: JsonGenerator, serializers: SerializerProvider, typeSer: TypeSerializer): Unit = {
+    this.serialize(value, gen, serializers)
+  }
+
+  @throws[IOException]
+  @throws[JsonProcessingException]
+  def serialize(multiContext: MultiContext, jsonGenerator: JsonGenerator, serializerProvider: SerializerProvider): Unit = {
+    jsonGenerator.writeStartObject()
+
+    // Use default serialization for the 'configuration' field
+    jsonGenerator.writeFieldName("configuration")
+    serializerProvider.defaultSerializeValue(multiContext.getConfiguration, jsonGenerator)
+
+    // Use default serialization for the 'sink' field
+    jsonGenerator.writeFieldName("sink")
+    multiContext.getSink match {
+      case Some(sink) => serializerProvider.defaultSerializeValue(sink, jsonGenerator)
+      case None => jsonGenerator.writeNull()
+    }
+
+    // Serialize the plugins list as an array of strings
+    jsonGenerator.writeArrayFieldStart("plugins")
+    multiContext.getPlugins.foreach(plugin => jsonGenerator.writeString(plugin))
+    jsonGenerator.writeEndArray()
+
+    // Write the type of the context
+    jsonGenerator.writeStringField("@type", "MultiContext")
+
+    jsonGenerator.writeEndObject()
+  }
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/OperatorDeserializer.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/OperatorDeserializer.scala
new file mode 100644
index 0000000..174876b
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/OperatorDeserializer.scala
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.wayang.api.serialization.customserializers
+
+import com.fasterxml.jackson.core.JsonParser
+import com.fasterxml.jackson.core.`type`.TypeReference
+import com.fasterxml.jackson.databind.jsontype.TypeDeserializer
+import com.fasterxml.jackson.databind.node.ArrayNode
+import com.fasterxml.jackson.databind.{DeserializationContext, JsonDeserializer, JsonNode}
+import org.apache.wayang.api.serialization.SerializationUtils.mapper
+import org.apache.wayang.api.serialization.customserializers.OperatorDeserializer.{inputSlotOwnerIdMap, outputSlotOwnerIdMap}
+import org.apache.wayang.basic.data.Record
+import org.apache.wayang.basic.operators._
+import org.apache.wayang.basic.types.RecordType
+import org.apache.wayang.core.api.exception.WayangException
+import org.apache.wayang.core.function.FunctionDescriptor.{SerializableIntUnaryOperator, SerializableLongUnaryOperator}
+import org.apache.wayang.core.function._
+import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator
+import org.apache.wayang.core.plan.wayangplan.{LoopHeadOperator, Operator, OperatorBase}
+import org.apache.wayang.core.platform.Platform
+import org.apache.wayang.core.types.DataSetType
+import org.apache.wayang.jdbc.operators.JdbcTableSource
+import org.apache.wayang.postgres.operators.PostgresTableSource
+import org.apache.wayang.sqlite3.operators.Sqlite3TableSource
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+class OperatorDeserializer extends JsonDeserializer[Operator] {
+
+  // Define a type for the deserialization function
+  private type DeserializerFunction = (JsonParser, JsonNode) => Operator
+
+  // Map type names to deserialization functions
+  private val deserializers: Map[String, DeserializerFunction] = Map(
+
+    // Source
+    "TextFileSource" -> deserializeTextFileSource,
+    "ObjectFileSource" -> deserializeObjectFileSource,
+    "CollectionSource" -> deserializeCollectionSource,
+    "Sqlite3TableSource" -> deserializeSqlite3TableSource,
+    "PostgresTableSource" -> deserializePostgresTableSource,
+
+    // Unary
+    "MapOperator" -> deserializeMapOperator,
+    "MapPartitionsOperator" -> deserializeMapPartitionsOperator,
+    "FilterOperator" -> deserializeFilterOperator,
+    "FlatMapOperator" -> deserializeFlatMapOperator,
+    "SampleOperator" -> deserializeSampleOperator,
+    "ReduceByOperator" -> deserializeReduceByOperator,
+    "MaterializedGroupByOperator" -> deserializeMaterializedGroupByOperator,
+    "GlobalReduceOperator" -> deserializeGlobalReduceOperator,
+    "GlobalMaterializedGroupOperator" -> deserializeGlobalMaterializedGroupOperator,
+    "GroupByOperator" -> deserializeGroupByOperator,
+    "ReduceOperator" -> deserializeReduceOperator,
+    "SortOperator" -> deserializeSortOperator,
+    "ZipWithIdOperator" -> deserializeZipWithIdOperator,
+    "DistinctOperator" -> deserializeDistinctOperator,
+    "CountOperator" -> deserializeCountOperator,
+
+    // Binary
+    "CartesianOperator" -> deserializeCartesianOperator,
+    "UnionAllOperator" -> deserializeUnionAllOperator,
+    "IntersectOperator" -> deserializeIntersectOperator,
+    "JoinOperator" -> deserializeJoinOperator,
+    "CoGroupOperator" -> deserializeCoGroupOperator,
+
+    // Loop
+    "DoWhileOperator" -> deserializeDoWhileOperator,
+    "RepeatOperator" -> deserializeRepeatOperator,
+
+    /*
+        "LocalCallbackSink" -> deserializeLocalCallbackSink,
+     */
+  )
+
+
+  override def deserialize(jp: JsonParser, ctxt: DeserializationContext): Operator = {
+    val objectIdMap = OperatorDeserializer.operatorIdMap.get()
+    val jsonNodeOperator: JsonNode = mapper.readTree(jp)
+
+    // If operator does not have any fields (or equivalently the standard @type field)
+    // and is just a number (a Jackson id of an output slot),
+    // then it means we have already parsed that operator and already stored it,
+    // so return the stored one
+    if (jsonNodeOperator.get("@type") == null) {
+      objectIdMap.get(jsonNodeOperator.asLong()) match {
+        case Some(operator) => return operator
+        case None => throw new WayangException(s"Can't deserialize operator with id ${jsonNodeOperator.asLong()}")
+      }
+    }
+
+    val typeName = jsonNodeOperator.get("@type").asText
+    val id = jsonNodeOperator.get("@id").asLong
+    // println(s"Processing operator $typeName")
+
+    deserializers.get(typeName) match {
+
+      case Some(deserializeFunc) =>
+
+        // Deserialize operator
+        val operator = deserializeFunc(jp, jsonNodeOperator)
+
+        // Add target platforms
+        val targetPlatformsNode: JsonNode = jsonNodeOperator.get("targetPlatforms")
+        targetPlatformsNode.asInstanceOf[ArrayNode].elements().asScala.foreach(   // Iterate over json array
+          platformStringNode => {
+            val platform = mapper.treeToValue(platformStringNode, classOf[Platform])  // Custom Platform deserializer gets called here
+            operator.addTargetPlatform(platform)  // Add to operator
+          }
+        )
+
+        // Add target platforms
+        val cardinalityEstimatorsNode: JsonNode = jsonNodeOperator.get("cardinalityEstimators")
+        cardinalityEstimatorsNode.asInstanceOf[ArrayNode].elements().asScala.foreach(   // Iterate over json array
+          cardinalityEstimatorNode => {
+            val cardinalityEstimator = mapper.treeToValue(cardinalityEstimatorNode, classOf[CardinalityEstimator])  // Custom Platform deserializer gets called here
+
+            // TODO: Check hard coded output index 0
+            operator.asInstanceOf[OperatorBase].setCardinalityEstimator(0, cardinalityEstimator)  // Add to operator
+          }
+        )
+
+        // Store in map id -> operator
+        objectIdMap.put(id, operator)
+        // println(s"\tStoring $typeName with id ${id}")
+
+        // Connect to input operators and return
+        connectToInputOperatorsAndReturn(jsonNodeOperator, operator)
+
+      // If no deserialization function is matched, throw error
+      case None =>
+        throw new IllegalArgumentException(s"Unknown type: $typeName")
+    }
+  }
+
+
+  //
+  // Custom deserialization functions for each type
+  //
+  private def deserializeTextFileSource(jp: JsonParser, rootNode: JsonNode): Operator = {
+    val inputUrl = rootNode.get("inputUrl").asText
+    new TextFileSource(inputUrl)
+  }
+
+  private def deserializeObjectFileSource(jp: JsonParser, rootNode: JsonNode): Operator = {
+    val inputUrl = rootNode.get("inputUrl").asText
+    val tClass = mapper.treeToValue(rootNode.get("tClass"), classOf[Class[AnyRef]])
+    new ObjectFileSource(inputUrl, tClass)
+  }
+
+  private def deserializeCollectionSource(jp: JsonParser, rootNode: JsonNode): Operator = {
+    val collection = mapper.treeToValue(rootNode.get("collection"), classOf[Iterable[AnyRef]])
+    val t = mapper.treeToValue(rootNode.get("type"), classOf[DataSetType[AnyRef]])
+    new CollectionSource(collection.asJavaCollection, t)
+  }
+
+  private def deserializeSqlite3TableSource(jp: JsonParser, rootNode: JsonNode): Operator = {
+    val tableName = mapper.treeToValue(rootNode.get("tableName"), classOf[String])
+    val t = mapper.treeToValue(rootNode.get("type"), classOf[DataSetType[Record]])
+    new Sqlite3TableSource(tableName, t.getDataUnitType.asInstanceOf[RecordType].getFieldNames: _*)
+  }
+
+  private def deserializePostgresTableSource(jp: JsonParser, rootNode: JsonNode): Operator = {
+    val tableName = mapper.treeToValue(rootNode.get("tableName"), classOf[String])
+    val t = mapper.treeToValue(rootNode.get("type"), classOf[DataSetType[Record]])
+    new PostgresTableSource(tableName, t.getDataUnitType.asInstanceOf[RecordType].getFieldNames: _*)
+  }
+
+  private def deserializeMapOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+    val functionDescriptor = mapper.treeToValue(rootNode.get("functionDescriptor"), classOf[TransformationDescriptor[AnyRef, AnyRef]])
+    new MapOperator(functionDescriptor)
+  }
+
+  private def deserializeMapPartitionsOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+    val functionDescriptor = mapper.treeToValue(rootNode.get("functionDescriptor"), classOf[MapPartitionsDescriptor[AnyRef, AnyRef]])
+    new MapPartitionsOperator(functionDescriptor)
+  }
+
+  private def deserializeFilterOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+    val predicateDescriptor = mapper.treeToValue(rootNode.get("predicateDescriptor"), classOf[PredicateDescriptor[AnyRef]])
+    new FilterOperator(predicateDescriptor)
+  }
+
+  private def deserializeFlatMapOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+    val functionDescriptor = mapper.treeToValue(rootNode.get("functionDescriptor"), classOf[FlatMapDescriptor[AnyRef, AnyRef]])
+    new FlatMapOperator(functionDescriptor)
+  }
+
+  private def deserializeSampleOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+    val sampleSizeFunction = mapper.treeToValue(rootNode.get("sampleSizeFunction"), classOf[SerializableIntUnaryOperator])
+    val typeValue = mapper.treeToValue(rootNode.get("type"), classOf[DataSetType[AnyRef]])
+    val sampleMethod = mapper.treeToValue(rootNode.get("sampleMethod"), classOf[SampleOperator.Methods])
+    val seedFunction = mapper.treeToValue(rootNode.get("seedFunction"), classOf[SerializableLongUnaryOperator])
+    new SampleOperator(sampleSizeFunction, typeValue, sampleMethod, seedFunction)
+  }
+
+  private def deserializeReduceByOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+    val keyDescriptor = mapper.treeToValue(rootNode.get("keyDescriptor"), classOf[TransformationDescriptor[AnyRef, AnyRef]])
+    val reduceDescriptor = mapper.treeToValue(rootNode.get("reduceDescriptor"), classOf[ReduceDescriptor[AnyRef]])
+    new ReduceByOperator(keyDescriptor, reduceDescriptor)
+  }
+
+  private def deserializeMaterializedGroupByOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+    val keyDescriptor = mapper.treeToValue(rootNode.get("keyDescriptor"), classOf[TransformationDescriptor[AnyRef, AnyRef]])
+    new MaterializedGroupByOperator(keyDescriptor)
+  }
+
+  private def deserializeGlobalReduceOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+    val reduceDescriptor = mapper.treeToValue(rootNode.get("reduceDescriptor"), classOf[ReduceDescriptor[AnyRef]])
+    new GlobalReduceOperator(reduceDescriptor)
+  }
+
+  private def deserializeGlobalMaterializedGroupOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+    val inputType = mapper.treeToValue(rootNode.get("inputType"), classOf[DataSetType[AnyRef]])
+    val outputType = mapper.treeToValue(rootNode.get("outputType"), classOf[DataSetType[java.lang.Iterable[AnyRef]]])
+    new GlobalMaterializedGroupOperator(inputType, outputType)
+  }
+
+  private def deserializeGroupByOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+    val keyDescriptor = mapper.treeToValue(rootNode.get("keyDescriptor"), classOf[TransformationDescriptor[AnyRef, AnyRef]])
+    new GroupByOperator(keyDescriptor)
+  }
+
+  private def deserializeReduceOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+    val reduceDescriptor = mapper.treeToValue(rootNode.get("reduceDescriptor"), classOf[ReduceDescriptor[AnyRef]])
+    val inputType = mapper.treeToValue(rootNode.get("inputType"), classOf[DataSetType[AnyRef]])
+    val outputType = mapper.treeToValue(rootNode.get("outputType"), classOf[DataSetType[AnyRef]])
+    new ReduceOperator(reduceDescriptor, inputType, outputType)
+  }
+
+  private def deserializeSortOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+    val keyDescriptor = mapper.treeToValue(rootNode.get("keyDescriptor"), classOf[TransformationDescriptor[AnyRef, AnyRef]])
+    new SortOperator(keyDescriptor)
+  }
+
+  private def deserializeZipWithIdOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+    val inputType = mapper.treeToValue(rootNode.get("inputType"), classOf[DataSetType[AnyRef]])
+    new ZipWithIdOperator(inputType)
+  }
+
+  private def deserializeDistinctOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+    val inputType = mapper.treeToValue(rootNode.get("inputType"), classOf[DataSetType[AnyRef]])
+    new DistinctOperator(inputType)
+  }
+
+  private def deserializeCountOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+    val inputType = mapper.treeToValue(rootNode.get("inputType"), classOf[DataSetType[AnyRef]])
+    new CountOperator(inputType)
+  }
+
+  private def deserializeCartesianOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+    val inputType0 = mapper.treeToValue(rootNode.get("inputType0"), classOf[DataSetType[AnyRef]])
+    val inputType1 = mapper.treeToValue(rootNode.get("inputType1"), classOf[DataSetType[AnyRef]])
+    new CartesianOperator(inputType0, inputType1)
+  }
+
+  private def deserializeUnionAllOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+    val inputType0 = mapper.treeToValue(rootNode.get("inputType0"), classOf[DataSetType[AnyRef]])
+    new UnionAllOperator(inputType0)
+  }
+
+  private def deserializeIntersectOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+    val inputType0 = mapper.treeToValue(rootNode.get("inputType0"), classOf[DataSetType[AnyRef]])
+    new IntersectOperator(inputType0)
+  }
+
+  private def deserializeJoinOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+    val keyDescriptor0 = mapper.treeToValue(rootNode.get("keyDescriptor0"), classOf[TransformationDescriptor[AnyRef, AnyRef]])
+    val keyDescriptor1 = mapper.treeToValue(rootNode.get("keyDescriptor1"), classOf[TransformationDescriptor[AnyRef, AnyRef]])
+    new JoinOperator(keyDescriptor0, keyDescriptor1)
+  }
+
+  private def deserializeCoGroupOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+    val keyDescriptor0 = mapper.treeToValue(rootNode.get("keyDescriptor0"), classOf[TransformationDescriptor[AnyRef, AnyRef]])
+    val keyDescriptor1 = mapper.treeToValue(rootNode.get("keyDescriptor1"), classOf[TransformationDescriptor[AnyRef, AnyRef]])
+    new CoGroupOperator(keyDescriptor0, keyDescriptor1)
+  }
+
+  private def deserializeDoWhileOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+    val inputType = mapper.treeToValue(rootNode.get("inputType"), classOf[DataSetType[AnyRef]])
+    val convergenceType = mapper.treeToValue(rootNode.get("convergenceType"), classOf[DataSetType[AnyRef]])
+    val criterionDescriptor = mapper.treeToValue(rootNode.get("criterionDescriptor"), classOf[PredicateDescriptor[java.util.Collection[AnyRef]]])
+    val numExpectedIterations = mapper.treeToValue(rootNode.get("numExpectedIterations"), classOf[Integer])
+    new DoWhileOperator(inputType, convergenceType, criterionDescriptor, numExpectedIterations)
+  }
+
+  private def deserializeRepeatOperator(jp: JsonParser, rootNode: JsonNode): Operator = {
+    val numIterations = mapper.treeToValue(rootNode.get("numIterations"), classOf[Integer])
+    val typeValue = mapper.treeToValue(rootNode.get("type"), classOf[DataSetType[AnyRef]])
+    new RepeatOperator(numIterations, typeValue)
+  }
+
+
+  private def connectToInputOperatorsAndReturn(node: JsonNode, operator: Operator): Operator = {
+    val inputOperators = getInputOperators(node)
+    for ((inputOperator, index) <- inputOperators.zipWithIndex) {
+      val thisOutputIndex = if (isLoopOutput(node)) 1 else 0
+      inputOperator.connectTo(thisOutputIndex, operator, index)
+    }
+    operator
+  }
+
+  // If the inputSlot->occupant->outputSlot has the "finOut" name,
+  // then it means this node is the output of a loop operator
+  private def isLoopOutput(node: JsonNode): Boolean = {
+    val inputSlots = node.get("inputSlots")
+    if (inputSlots != null && inputSlots.isArray && inputSlots.size() == 1) {
+      // For each input slot
+      inputSlots.elements().forEachRemaining { inputSlot =>
+        // Access occupant
+        if (inputSlot.get("occupant") != null) {
+          val outputSlot = inputSlot.get("occupant")
+          // Access owner
+          if (outputSlot.get("name") != null) {
+            val name = outputSlot.get("name").asText()
+            return name == "finOut"
+          }
+        }
+      }
+    }
+    return false
+  }
+
+  private def getInputOperators(node: JsonNode): List[Operator] = {
+
+    var inputOperators: List[Operator] = List()
+
+    // Navigate to inputSlots
+    val inputSlots = node.get("inputSlots")
+    if (inputSlots != null && inputSlots.isArray) {
+
+      // For each input slot
+      inputSlots.elements().forEachRemaining { inputSlot =>
+
+        // Access occupant
+        if (inputSlot.get("@id") != null) {
+
+          val inputSlotId = inputSlot.get("@id").asLong()
+          // println(s"Processing input slot with id ${inputSlotId}")
+
+          val outputSlot = inputSlot.get("occupant")
+
+          // Access owner
+          if (outputSlot.get("@id") != null) {
+
+            val outputSlotId = outputSlot.get("@id").asLong
+            // println(s"Processing output slot with id ${outputSlotId}")
+
+            val owner = outputSlot.get("owner")
+
+            // Deserialize the nested owner operator and add it into list to be returned
+            val jsonParser = owner.traverse(mapper)
+            jsonParser.nextToken()
+            val inputOperator = mapper.readValue[Operator](jsonParser, classOf[Operator])
+            inputOperators = inputOperators :+ inputOperator
+
+            // println(s"\tStoring input slot with id ${inputSlotId}")
+            inputSlotOwnerIdMap.get().put(inputSlotId, inputOperator)
+            // println(s"\tStoring output slot with id ${outputSlotId}")
+            outputSlotOwnerIdMap.get().put(outputSlotId, inputOperator)
+          }
+
+          // If owner does not have any fields and is just a number (a Jackson id of an output slot),
+          // then it means we have already parsed that node and associated it to an operator
+          else {
+            val inputOperator = outputSlotOwnerIdMap.get().get(outputSlot.asLong)
+            inputOperator match {
+              case Some(operator) => inputOperators = inputOperators :+ operator
+              case None => throw new WayangException(s"Can't find output slot ${outputSlot.asLong}")
+            }
+          }
+        }
+
+        // If occupant does not have any fields and is just a number a Jackson id of an input slot),
+        // then it means we have already parsed that node and associated it to an operator
+        else {
+          val inputOperator = inputSlotOwnerIdMap.get().get(inputSlot.asLong)
+          inputOperator match {
+            case Some(operator) => inputOperators = inputOperators :+ operator
+            case None => throw new WayangException(s"Can't find input slot ${inputSlot.asLong}")
+          }
+        }
+      }
+    }
+
+    inputOperators
+  }
+
+
+  override def deserializeWithType(p: JsonParser, ctxt: DeserializationContext, typeDeserializer: TypeDeserializer): Operator = {
+    deserialize(p, ctxt)
+  }
+
+
+  override def deserializeWithType(p: JsonParser, ctxt: DeserializationContext, typeDeserializer: TypeDeserializer, intoValue: Operator): Operator = {
+    deserialize(p, ctxt)
+  }
+}
+
+
+object OperatorDeserializer {
+
+  // operator serialization id -> operator
+  private val operatorIdMap: ThreadLocal[mutable.Map[Long, Operator]] = ThreadLocal.withInitial(() => mutable.Map[Long, Operator]())
+
+  // input slot serialization id  -> input slot owner
+  private val inputSlotOwnerIdMap: ThreadLocal[mutable.Map[Long, Operator]] = ThreadLocal.withInitial(() => mutable.Map[Long, Operator]())
+
+  // output slot serialization id  -> input slot owner
+  private val outputSlotOwnerIdMap: ThreadLocal[mutable.Map[Long, Operator]] = ThreadLocal.withInitial(() => mutable.Map[Long, Operator]())
+
+}
\ No newline at end of file
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/PlatformDeserializer.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/PlatformDeserializer.scala
new file mode 100644
index 0000000..762ca27
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/PlatformDeserializer.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.wayang.api.serialization.customserializers
+
+import com.fasterxml.jackson.core.JsonParser
+import com.fasterxml.jackson.databind.{DeserializationContext, JsonDeserializer}
+import org.apache.wayang.core.api.exception.WayangException
+import org.apache.wayang.core.platform.Platform
+import org.apache.wayang.java.Java
+import org.apache.wayang.postgres.Postgres
+import org.apache.wayang.spark.Spark
+
+class PlatformDeserializer extends JsonDeserializer[Platform]{
+
+  override def deserialize(p: JsonParser, ctxt: DeserializationContext): Platform = {
+    val className = p.getValueAsString
+
+    // TODO: Add all platforms
+    if (className == Java.platform().getClass.getName) {
+      Java.platform()
+    } else if (className == Spark.platform().getClass.getName) {
+      Spark.platform()
+    } else if (className == Postgres.platform().getClass.getName) {
+      Postgres.platform()
+    } else {
+      throw new WayangException(s"Can't deserialize platform: $className")
+    }
+  }
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/PlatformSerializer.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/PlatformSerializer.scala
new file mode 100644
index 0000000..68b203b
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/PlatformSerializer.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.wayang.api.serialization.customserializers
+
+import com.fasterxml.jackson.core.JsonGenerator
+import com.fasterxml.jackson.databind.{JsonSerializer, SerializerProvider}
+import org.apache.wayang.core.platform.Platform
+
+class PlatformSerializer extends JsonSerializer[Platform]{
+
+  def serialize(platform: Platform, jsonGenerator: JsonGenerator, serializerProvider: SerializerProvider): Unit = {
+    jsonGenerator.writeString(platform.getClass.getName)
+  }
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/ConfigurationAndContextMixIns.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/ConfigurationAndContextMixIns.scala
new file mode 100644
index 0000000..00eb473
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/ConfigurationAndContextMixIns.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.serialization.mixins
+
+import com.fasterxml.jackson.annotation.{JsonIdentityInfo, JsonIgnore, JsonSubTypes, JsonTypeInfo, ObjectIdGenerators}
+import org.apache.logging.log4j.Logger
+import org.apache.wayang.api.{MultiContext, DataQuanta, PlanBuilder}
+import org.apache.wayang.core.api.configuration.{CollectionProvider, ExplicitCollectionProvider, KeyValueProvider, MapBasedKeyValueProvider, ValueProvider}
+import org.apache.wayang.core.function.FunctionDescriptor
+import org.apache.wayang.core.mapping.Mapping
+import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
+import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator
+import org.apache.wayang.core.optimizer.channels.ChannelConversion
+import org.apache.wayang.core.optimizer.costs.{LoadProfileEstimator, LoadProfileToTimeConverter, TimeToCostConverter}
+import org.apache.wayang.core.optimizer.enumeration.PlanEnumerationPruningStrategy
+import org.apache.wayang.core.plan.wayangplan.{ExecutionOperator, OutputSlot}
+import org.apache.wayang.core.platform.Platform
+import org.apache.wayang.core.profiling.{CardinalityRepository, InstrumentationStrategy}
+
+import java.util.function.ToDoubleFunction
+
+object ConfigurationAndContextMixIns {
+
+
+  @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+  @JsonSubTypes(Array(
+    new JsonSubTypes.Type(value = classOf[MultiContext], name = "MultiContext"),
+  ))
+  abstract class WayangContextMixIn {
+    @JsonIgnore
+    private var logger: Logger = _
+
+    // TODO: Is this okay?
+    @JsonIgnore
+    private var cardinalityRepository: CardinalityRepository = _
+  }
+
+
+
+
+  @JsonIdentityInfo(generator = classOf[ObjectIdGenerators.IntSequenceGenerator], property = "@id")
+  abstract class ConfigurationMixIn {
+    @JsonIgnore
+    private var cardinalityEstimatorProvider: KeyValueProvider[OutputSlot[_], CardinalityEstimator] = _
+
+    @JsonIgnore
+    private var udfSelectivityProvider: KeyValueProvider[FunctionDescriptor, ProbabilisticDoubleInterval] = _
+
+    @JsonIgnore
+    private var operatorLoadProfileEstimatorProvider: KeyValueProvider[ExecutionOperator, LoadProfileEstimator] = _
+
+    @JsonIgnore
+    private var functionLoadProfileEstimatorProvider: KeyValueProvider[FunctionDescriptor, LoadProfileEstimator] = _
+
+    @JsonIgnore
+    private var loadProfileEstimatorCache: MapBasedKeyValueProvider[String, LoadProfileEstimator] = _
+
+    @JsonIgnore
+    private var loadProfileToTimeConverterProvider: KeyValueProvider[Platform, LoadProfileToTimeConverter] = _
+
+    @JsonIgnore
+    private var timeToCostConverterProvider: KeyValueProvider[Platform, TimeToCostConverter] = _
+
+    @JsonIgnore
+    private var costSquasherProvider: ValueProvider[ToDoubleFunction[ProbabilisticDoubleInterval]] = _
+
+    @JsonIgnore
+    private var platformStartUpTimeProvider: KeyValueProvider[Platform, Long] = _
+
+    @JsonIgnore
+    private var platformProvider: ExplicitCollectionProvider[Platform] = _
+
+    @JsonIgnore
+    private var mappingProvider: ExplicitCollectionProvider[Mapping] = _
+
+    @JsonIgnore
+    private var channelConversionProvider: ExplicitCollectionProvider[ChannelConversion] = _
+
+    @JsonIgnore
+    private var pruningStrategyClassProvider: CollectionProvider[Class[PlanEnumerationPruningStrategy]] = _
+
+    @JsonIgnore
+    private var instrumentationStrategyProvider: ValueProvider[InstrumentationStrategy] = _
+  }
+
+  @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+  @JsonSubTypes(Array(
+    new JsonSubTypes.Type(value = classOf[MultiContext.TextFileSink], name = "MultiContextTextFileSink"),
+    new JsonSubTypes.Type(value = classOf[MultiContext.ObjectFileSink], name = "MultiContextObjectFileSink"
+    ))
+  )
+  abstract class MultiContextUnarySinkMixIn {
+  }
+
+  abstract class MultiContextPlanBuilderMixIn {
+    @JsonIgnore
+    private var multiContextMap: Map[Long, MultiContext] = _
+
+    @JsonIgnore
+    private var dataQuantaMap: Map[Long, DataQuanta[_]] = _
+
+    @JsonIgnore
+    private var planBuilderMap: Map[Long, PlanBuilder] = _
+  }
+
+}
\ No newline at end of file
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/DataTypeMixIns.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/DataTypeMixIns.scala
new file mode 100644
index 0000000..2f3be61
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/DataTypeMixIns.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.serialization.mixins
+
+import com.fasterxml.jackson.annotation.{JsonCreator, JsonProperty, JsonSubTypes, JsonTypeInfo}
+import org.apache.wayang.basic.types.RecordType
+import org.apache.wayang.core.types.{BasicDataUnitType, DataUnitGroupType, DataUnitType}
+
+object DataTypeMixIns {
+
+
+  @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+  @JsonSubTypes(Array(
+    new JsonSubTypes.Type(value = classOf[BasicDataUnitType[_]], name = "BasicDataUnitType"),
+    new JsonSubTypes.Type(value = classOf[DataUnitGroupType[_]], name = "DataUnitGroupType"),
+  ))
+  abstract class DataUnitTypeMixIn {
+  }
+
+  @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+  @JsonSubTypes(Array(
+    new JsonSubTypes.Type(value = classOf[RecordType], name = "RecordType"),
+  ))
+  abstract class BasicDataUnitTypeMixIn[T] {
+    @JsonCreator
+    def this(@JsonProperty("typeClass") typeClass: Class[T]) = {
+      this()
+    }
+  }
+
+  abstract class RecordTypeMixIn {
+    @JsonCreator
+    def this(@JsonProperty("fieldNames") fieldNames: Array[String]) = {
+      this()
+    }
+  }
+
+  abstract class DataUnitGroupTypeMixIn[T] {
+    @JsonCreator
+    def this(@JsonProperty("baseType") baseType: DataUnitType[_]) = {
+      this()
+    }
+  }
+
+  abstract class DataSetTypeMixIn[T] {
+    @JsonCreator
+    def this(@JsonProperty("dataUnitType") dataUnitType: DataUnitType[T]) = {
+      this()
+    }
+  }
+
+}
\ No newline at end of file
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/DescriptorMixIns.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/DescriptorMixIns.scala
new file mode 100644
index 0000000..7f2832d
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/DescriptorMixIns.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.serialization.mixins
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility
+import com.fasterxml.jackson.annotation.{JsonAutoDetect, JsonCreator, JsonProperty, JsonSubTypes, JsonTypeInfo}
+import org.apache.wayang.basic.function.ProjectionDescriptor
+import org.apache.wayang.core.function.FunctionDescriptor.SerializablePredicate
+import org.apache.wayang.core.function.{AggregationDescriptor, ConsumerDescriptor, FlatMapDescriptor, FunctionDescriptor, MapPartitionsDescriptor, PredicateDescriptor, ReduceDescriptor, TransformationDescriptor}
+import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
+import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator
+import org.apache.wayang.core.types.{BasicDataUnitType, DataUnitGroupType}
+
+import java.util
+
+object DescriptorMixIns {
+
+
+  @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+  @JsonSubTypes(Array(
+    new JsonSubTypes.Type(value = classOf[AggregationDescriptor[_, _]], name = "AggregationDescriptor"),
+    new JsonSubTypes.Type(value = classOf[ConsumerDescriptor[_]], name = "ConsumerDescriptor"),
+    new JsonSubTypes.Type(value = classOf[FlatMapDescriptor[_, _]], name = "FlatMapDescriptor"),
+    new JsonSubTypes.Type(value = classOf[MapPartitionsDescriptor[_, _]], name = "MapPartitionsDescriptor"),
+    new JsonSubTypes.Type(value = classOf[PredicateDescriptor[_]], name = "PredicateDescriptor"),
+    new JsonSubTypes.Type(value = classOf[ReduceDescriptor[_]], name = "ReduceDescriptor"),
+    new JsonSubTypes.Type(value = classOf[TransformationDescriptor[_, _]], name = "TransformationDescriptor"),
+  ))
+  abstract class FunctionDescriptorMixIn {
+  }
+
+  @JsonAutoDetect(fieldVisibility = Visibility.ANY, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE)
+  abstract class PredicateDescriptorMixIn[Input] {
+    @JsonCreator
+    def this(@JsonProperty("javaImplementation") javaImplementation: SerializablePredicate[Input],
+             @JsonProperty("inputType") inputType: BasicDataUnitType[Input],
+             @JsonProperty("selectivity") selectivity: ProbabilisticDoubleInterval,
+             @JsonProperty("loadProfileEstimator") loadProfileEstimator: LoadProfileEstimator) = {
+      this()
+    }
+  }
+
+  @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+  @JsonSubTypes(Array(
+    new JsonSubTypes.Type(value = classOf[ProjectionDescriptor[_, _]], name = "ProjectionDescriptor"),
+  ))
+  @JsonAutoDetect(fieldVisibility = Visibility.ANY, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE)
+  abstract class TransformationDescriptorMixIn[Input, Output] {
+    @JsonCreator def this(@JsonProperty("javaImplementation") javaImplementation: FunctionDescriptor.SerializableFunction[Input, Output],
+                          @JsonProperty("inputType") inputType: BasicDataUnitType[Input],
+                          @JsonProperty("outputType") outputType: BasicDataUnitType[Output],
+                          @JsonProperty("loadProfileEstimator") loadProfileEstimator: LoadProfileEstimator) = {
+      this()
+    }
+  }
+
+  @JsonAutoDetect(fieldVisibility = Visibility.ANY, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE)
+  abstract class ProjectionDescriptorMixIn[Input, Output] {
+    @JsonCreator def this(@JsonProperty("javaImplementation") javaImplementation: FunctionDescriptor.SerializableFunction[Input, Output],
+                          @JsonProperty("fieldNames") fieldNames: util.List[String],
+                          @JsonProperty("inputType") inputType: BasicDataUnitType[Input],
+                          @JsonProperty("outputType") outputType: BasicDataUnitType[Output]) = {
+      this()
+    }
+  }
+
+  @JsonAutoDetect(fieldVisibility = Visibility.ANY, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE)
+  abstract class ReduceDescriptorMixIn[Type] {
+    @JsonCreator
+    def this(@JsonProperty("javaImplementation") javaImplementation: FunctionDescriptor.SerializableBinaryOperator[Type],
+             @JsonProperty("inputType") inputType: DataUnitGroupType[Type],
+             @JsonProperty("outputType") outputType: BasicDataUnitType[Type],
+             @JsonProperty("loadProfileEstimator") loadProfileEstimator: LoadProfileEstimator) = {
+      this()
+    }
+  }
+
+  @JsonAutoDetect(fieldVisibility = Visibility.ANY, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE)
+  abstract class FlatMapDescriptorMixIn[Input, Output] {
+    @JsonCreator
+    def this(@JsonProperty("javaImplementation") javaImplementation: FunctionDescriptor.SerializableFunction[Input, Iterable[Output]],
+             @JsonProperty("inputType") inputType: BasicDataUnitType[Input],
+             @JsonProperty("outputType") outputType: BasicDataUnitType[Output],
+             @JsonProperty("selectivity") selectivity: ProbabilisticDoubleInterval,
+             @JsonProperty("loadProfileEstimator") loadProfileEstimator: LoadProfileEstimator) = {
+      this()
+    }
+  }
+
+  @JsonAutoDetect(fieldVisibility = Visibility.ANY, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE)
+  abstract class MapPartitionsDescriptorMixIn[Input, Output] {
+    @JsonCreator
+    def this(@JsonProperty("javaImplementation") javaImplementation: FunctionDescriptor.SerializableFunction[Iterable[Input], Iterable[Output]],
+             @JsonProperty("inputType") inputType: BasicDataUnitType[Input],
+             @JsonProperty("outputType") outputType: BasicDataUnitType[Output],
+             @JsonProperty("selectivity") selectivity: ProbabilisticDoubleInterval,
+             @JsonProperty("loadProfileEstimator") loadProfileEstimator: LoadProfileEstimator) = {
+      this()
+    }
+  }
+
+}
\ No newline at end of file
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/EstimatorMixIns.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/EstimatorMixIns.scala
new file mode 100644
index 0000000..37e6bd1
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/EstimatorMixIns.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.serialization.mixins
+
+import com.fasterxml.jackson.annotation.{JsonCreator, JsonProperty, JsonSubTypes, JsonTypeInfo, JsonTypeName}
+import org.apache.wayang.core.api.Configuration
+import org.apache.wayang.core.function.FunctionDescriptor
+import org.apache.wayang.core.function.FunctionDescriptor.SerializableToDoubleBiFunction
+import org.apache.wayang.core.optimizer.cardinality.{AggregatingCardinalityEstimator, CardinalityEstimate, DefaultCardinalityEstimator, FallbackCardinalityEstimator, FixedSizeCardinalityEstimator, SwitchForwardCardinalityEstimator}
+import org.apache.wayang.core.optimizer.costs.{ConstantLoadProfileEstimator, DefaultEstimatableCost, DefaultLoadEstimator, IntervalLoadEstimator, LoadEstimator, NestableLoadProfileEstimator}
+
+object EstimatorMixIns {
+
+
+  @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+  @JsonSubTypes(Array(
+    new JsonSubTypes.Type(value = classOf[DefaultLoadEstimator], name = "DefaultLoadEstimator"),
+    new JsonSubTypes.Type(value = classOf[IntervalLoadEstimator], name = "IntervalLoadEstimator"),
+  ))
+  abstract class LoadEstimatorMixIn {
+  }
+
+  abstract class DefaultLoadEstimatorMixIn {
+    @JsonCreator
+    def this(@JsonProperty("numInputs") numInputs: Int,
+             @JsonProperty("numOutputs") numOutputs: Int,
+             @JsonProperty("correctnessProbability") correctnessProbability: Double,
+             @JsonProperty("nullCardinalityReplacement") nullCardinalityReplacement: CardinalityEstimate,
+             @JsonProperty("singlePointFunction") singlePointFunction: LoadEstimator.SinglePointEstimationFunction) = {
+      this()
+    }
+  }
+
+  abstract class CardinalityEstimateMixIn {
+    @JsonCreator
+    def this(@JsonProperty("lowerEstimate") lowerEstimate: Long,
+             @JsonProperty("upperEstimate") upperEstimate: Long,
+             @JsonProperty("correctnessProb") correctnessProb: Double,
+             @JsonProperty("isOverride") isOverride: Boolean) = {
+      this()
+    }
+  }
+
+
+  abstract class ProbabilisticDoubleIntervalMixIn {
+    @JsonCreator
+    def this(@JsonProperty("lowerEstimate") lowerEstimate: Double,
+             @JsonProperty("upperEstimate") upperEstimate: Double,
+             @JsonProperty("correctnessProb") correctnessProb: Double,
+             @JsonProperty("isOverride") isOverride: Boolean) = {
+      this()
+    }
+  }
+
+  @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+  @JsonSubTypes(Array(
+    new JsonSubTypes.Type(value = classOf[ConstantLoadProfileEstimator], name = "ConstantLoadProfileEstimator"),
+    new JsonSubTypes.Type(value = classOf[NestableLoadProfileEstimator], name = "NestableLoadProfileEstimator"),
+  ))
+  abstract class LoadProfileEstimatorMixIn {
+  }
+
+  @JsonTypeName("nestableLoadProfileEstimator")
+  abstract class NestableLoadProfileEstimatorMixIn {
+    @JsonCreator
+    def this (@JsonProperty("cpuLoadEstimator") cpuLoadEstimator : LoadEstimator,
+              @JsonProperty("ramLoadEstimator") ramLoadEstimator: LoadEstimator,
+              @JsonProperty("diskLoadEstimator") diskLoadEstimator: LoadEstimator,
+              @JsonProperty("networkLoadEstimator") networkLoadEstimator: LoadEstimator,
+              @JsonProperty("resourceUtilizationEstimator") resourceUtilizationEstimator: SerializableToDoubleBiFunction[Array[Long], Array[Long]],
+              @JsonProperty("overheadMillis") overheadMillis: Long,
+              @JsonProperty("configurationKey") configurationKey: String
+             ) = {
+      this()
+    }
+  }
+
+  @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+  @JsonSubTypes(Array(
+    new JsonSubTypes.Type(value = classOf[AggregatingCardinalityEstimator], name = "AggregatingCardinalityEstimator"),
+    new JsonSubTypes.Type(value = classOf[DefaultCardinalityEstimator], name = "DefaultCardinalityEstimator"),
+    new JsonSubTypes.Type(value = classOf[FallbackCardinalityEstimator], name = "FallbackCardinalityEstimator"),
+    new JsonSubTypes.Type(value = classOf[FixedSizeCardinalityEstimator], name = "FixedSizeCardinalityEstimator"),
+    new JsonSubTypes.Type(value = classOf[SwitchForwardCardinalityEstimator], name = "SwitchForwardCardinalityEstimator"),
+  ))
+  abstract class CardinalityEstimatorMixIn {
+  }
+
+  abstract class DefaultCardinalityEstimatorMixIn {
+    @JsonCreator
+    def this(@JsonProperty("certaintyProb") certaintyProb: Double,
+             @JsonProperty("numInputs") numInputs: Int,
+             @JsonProperty("isAllowMoreInputs") isAllowMoreInputs: Boolean,
+             @JsonProperty("singlePointEstimator") singlePointEstimator: FunctionDescriptor.SerializableToLongBiFunction[Array[Long], Configuration]) = {
+      this()
+    }
+  }
+
+  // TODO: Add more estimator mixins
+
+  @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+  @JsonSubTypes(Array(
+    new JsonSubTypes.Type(value = classOf[DefaultEstimatableCost], name = "DefaultEstimatableCost"),
+  ))
+  abstract class EstimatableCostMixIn {
+  }
+
+}
\ No newline at end of file
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/IgnoreLoggerMixIn.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/IgnoreLoggerMixIn.scala
new file mode 100644
index 0000000..ad95291
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/IgnoreLoggerMixIn.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.serialization.mixins
+
+import com.fasterxml.jackson.annotation.JsonIgnore
+import org.apache.logging.log4j.Logger
+
+abstract class IgnoreLoggerMixIn {
+  @JsonIgnore
+  private var logger: Logger = _
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/OperatorMixIns.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/OperatorMixIns.scala
new file mode 100644
index 0000000..c53003c
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/OperatorMixIns.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.serialization.mixins
+
+import com.fasterxml.jackson.annotation.{JsonIdentityInfo, JsonIgnore, JsonSubTypes, JsonTypeInfo, ObjectIdGenerators}
+import org.apache.wayang.basic.operators.{CartesianOperator, CoGroupOperator, CollectionSource, CountOperator, DistinctOperator, DoWhileOperator, FilterOperator, FlatMapOperator, GlobalMaterializedGroupOperator, GlobalReduceOperator, GroupByOperator, IntersectOperator, JoinOperator, LocalCallbackSink, MapOperator, MapPartitionsOperator, MaterializedGroupByOperator, ReduceByOperator, ReduceOperator, RepeatOperator, SampleOperator, SortOperator, TextFileSource, UnionAllOperator, ZipWithIdOperator}
+import org.apache.wayang.core.plan.wayangplan.{ActualOperator, BinaryToUnaryOperator, CompositeOperator, ElementaryOperator, ExecutionOperator, LoopHeadOperator, OperatorBase, Subplan, UnarySink, UnarySource, UnaryToUnaryOperator}
+
+object OperatorMixIns {
+
+  @JsonIdentityInfo(generator = classOf[ObjectIdGenerators.IntSequenceGenerator], property = "@id")
+  @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+  @JsonSubTypes(Array(
+    new JsonSubTypes.Type(value = classOf[OperatorBase], name = "OperatorBase"),
+    new JsonSubTypes.Type(value = classOf[ActualOperator], name = "ActualOperator"),
+    new JsonSubTypes.Type(value = classOf[CompositeOperator], name = "CompositeOperator"),
+    new JsonSubTypes.Type(value = classOf[LoopHeadOperator], name = "LoopHeadOperator"),
+  ))
+  abstract class OperatorMixIn {
+  }
+
+  @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+  @JsonSubTypes(Array(
+    new JsonSubTypes.Type(value = classOf[ElementaryOperator], name = "ElementaryOperator"),
+    new JsonSubTypes.Type(value = classOf[Subplan], name = "Subplan"),
+  ))
+  abstract class ActualOperatorMixIn {
+  }
+
+  @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+  @JsonSubTypes(Array(
+    new JsonSubTypes.Type(value = classOf[UnarySource[_]], name = "UnarySource"),
+    new JsonSubTypes.Type(value = classOf[UnarySink[_]], name = "UnarySink"),
+    new JsonSubTypes.Type(value = classOf[UnaryToUnaryOperator[_, _]], name = "UnaryToUnaryOperator"),
+    new JsonSubTypes.Type(value = classOf[BinaryToUnaryOperator[_, _, _]], name = "BinaryToUnaryOperator"),
+    new JsonSubTypes.Type(value = classOf[DoWhileOperator[_, _]], name = "DoWhileOperator"),
+    new JsonSubTypes.Type(value = classOf[RepeatOperator[_]], name = "RepeatOperator"),
+  ))
+  abstract class OperatorBaseMixIn {
+    @JsonIgnore
+    def getOriginal(): ExecutionOperator
+
+    @JsonIgnore
+    private var original: ExecutionOperator = _
+  }
+
+  @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+  @JsonSubTypes(Array(
+    new JsonSubTypes.Type(value = classOf[UnarySource[_]], name = "UnarySource"),
+    new JsonSubTypes.Type(value = classOf[UnarySink[_]], name = "UnarySink"),
+    new JsonSubTypes.Type(value = classOf[UnaryToUnaryOperator[_, _]], name = "UnaryToUnaryOperator"),
+    new JsonSubTypes.Type(value = classOf[BinaryToUnaryOperator[_, _, _]], name = "BinaryToUnaryOperator"),
+    new JsonSubTypes.Type(value = classOf[DoWhileOperator[_, _]], name = "DoWhileOperator"),
+    new JsonSubTypes.Type(value = classOf[RepeatOperator[_]], name = "RepeatOperator"),
+  ))
+  abstract class ElementaryOperatorMixIn {
+  }
+
+  @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+  @JsonSubTypes(Array(
+    new JsonSubTypes.Type(value = classOf[TextFileSource], name = "TextFileSource"),
+    new JsonSubTypes.Type(value = classOf[CollectionSource[_]], name = "CollectionSource"),
+  ))
+  abstract class UnarySourceMixIn[T] {
+  }
+
+  @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+  @JsonSubTypes(Array(
+    new JsonSubTypes.Type(value = classOf[LocalCallbackSink[_]], name = "LocalCallbackSink"),
+  ))
+  abstract class UnarySinkMixIn[T] {
+  }
+
+  @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+  @JsonSubTypes(Array(
+    new JsonSubTypes.Type(value = classOf[MapOperator[_, _]], name = "MapOperator"),
+    new JsonSubTypes.Type(value = classOf[MapPartitionsOperator[_, _]], name = "MapPartitionsOperator"),
+    new JsonSubTypes.Type(value = classOf[FilterOperator[_]], name = "FilterOperator"),
+    new JsonSubTypes.Type(value = classOf[FlatMapOperator[_, _]], name = "FlatMapOperator"),
+    new JsonSubTypes.Type(value = classOf[SampleOperator[_]], name = "SampleOperator"),
+    new JsonSubTypes.Type(value = classOf[ReduceByOperator[_, _]], name = "ReduceByOperator"),
+    new JsonSubTypes.Type(value = classOf[MaterializedGroupByOperator[_, _]], name = "MaterializedGroupByOperator"),
+    new JsonSubTypes.Type(value = classOf[GlobalReduceOperator[_]], name = "GlobalReduceOperator"),
+    new JsonSubTypes.Type(value = classOf[GlobalMaterializedGroupOperator[_]], name = "GlobalMaterializedGroupOperator"),
+    new JsonSubTypes.Type(value = classOf[GroupByOperator[_, _]], name = "GroupByOperator"),
+    new JsonSubTypes.Type(value = classOf[ReduceOperator[_]], name = "ReduceOperator"),
+    new JsonSubTypes.Type(value = classOf[SortOperator[_, _]], name = "SortOperator"),
+    new JsonSubTypes.Type(value = classOf[ZipWithIdOperator[_]], name = "ZipWithIdOperator"),
+    new JsonSubTypes.Type(value = classOf[DistinctOperator[_]], name = "DistinctOperator"),
+    new JsonSubTypes.Type(value = classOf[CountOperator[_]], name = "CountOperator"),
+  ))
+  abstract class UnaryToUnaryOperatorMixIn[InputType, OutputType] {
+  }
+
+  @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+  @JsonSubTypes(Array(
+    new JsonSubTypes.Type(value = classOf[CartesianOperator[_, _]], name = "CartesianOperator"),
+    new JsonSubTypes.Type(value = classOf[UnionAllOperator[_]], name = "UnionAllOperator"),
+    new JsonSubTypes.Type(value = classOf[IntersectOperator[_]], name = "IntersectOperator"),
+    new JsonSubTypes.Type(value = classOf[JoinOperator[_, _, _]], name = "JoinOperator"),
+    new JsonSubTypes.Type(value = classOf[CoGroupOperator[_, _, _]], name = "CoGroupOperator"),
+  ))
+  abstract class BinaryToUnaryOperatorMixIn[InputType0, InputType1, OutputType] {
+  }
+
+  @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+  @JsonSubTypes(Array(
+    new JsonSubTypes.Type(value = classOf[DoWhileOperator[_, _]], name = "DoWhileOperator"),
+    new JsonSubTypes.Type(value = classOf[RepeatOperator[_]], name = "RepeatOperator"),
+  ))
+  abstract class LoopHeadOperatorMixIn {
+  }
+
+  @JsonIdentityInfo(generator = classOf[ObjectIdGenerators.IntSequenceGenerator], property = "@id")
+  abstract class OperatorPatternMixin {
+  }
+
+}
\ No newline at end of file
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/ProviderMixIns.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/ProviderMixIns.scala
new file mode 100644
index 0000000..50c0c22
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/ProviderMixIns.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.serialization.mixins
+
+import com.fasterxml.jackson.annotation.{JsonCreator, JsonIdentityInfo, JsonIgnore, JsonProperty, JsonSetter, JsonSubTypes, JsonTypeInfo, ObjectIdGenerators}
+import org.apache.logging.log4j.Logger
+import org.apache.wayang.core.api.Configuration
+import org.apache.wayang.core.api.configuration.{ExplicitCollectionProvider, FunctionalCollectionProvider, FunctionalKeyValueProvider, KeyValueProvider, MapBasedKeyValueProvider}
+
+import java.util.function.BiFunction
+
+object ProviderMixIns {
+
+  @JsonIdentityInfo(generator = classOf[ObjectIdGenerators.IntSequenceGenerator], property = "@id")
+  abstract class FunctionalKeyValueProviderMixIn[Key, Value] {
+    @JsonCreator
+    def this(@JsonProperty("parent") parent: KeyValueProvider[Key, Value],
+             @JsonProperty("configuration") configuration: Configuration,
+             @JsonProperty("providerFunction") providerFunction: BiFunction[Key, KeyValueProvider[Key, Value], Value]) = {
+      this()
+    }
+  }
+
+  @JsonIdentityInfo(generator = classOf[ObjectIdGenerators.IntSequenceGenerator], property = "@id")
+  abstract class MapBasedKeyValueProviderMixIn[Key, Value] {
+    @JsonSetter("storedValues")
+    private def setStoredValues(storedValues: Map[Key, Value]): Unit = {}
+
+    @JsonCreator
+    def this(@JsonProperty("parent") parent: KeyValueProvider[Key, Value],
+             @JsonProperty("configuration") configuration: Configuration,
+             @JsonProperty("isCaching") isCaching: Boolean) = {
+      this()
+    }
+  }
+
+  @JsonIdentityInfo(generator = classOf[ObjectIdGenerators.IntSequenceGenerator], property = "@id")
+  abstract class ConstantValueProviderMixIn {
+  }
+
+  @JsonIdentityInfo(generator = classOf[ObjectIdGenerators.IntSequenceGenerator], property = "@id")
+  abstract class ExplicitCollectionProviderMixIn {
+    @JsonIgnore
+    private var logger: Logger = _
+  }
+
+  @JsonIdentityInfo(generator = classOf[ObjectIdGenerators.IntSequenceGenerator], property = "@id")
+  @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+  @JsonSubTypes(Array(
+    new JsonSubTypes.Type(value = classOf[FunctionalKeyValueProvider[_, _]], name = "FunctionalKeyValueProvider"),
+    new JsonSubTypes.Type(value = classOf[MapBasedKeyValueProvider[_, _]], name = "MapBasedKeyValueProvider"
+    ))
+  )
+  abstract class KeyValueProviderMixIn {
+    @JsonIgnore
+    private var logger: Logger = _
+  }
+
+  @JsonIdentityInfo(generator = classOf[ObjectIdGenerators.IntSequenceGenerator], property = "@id")
+  @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+  @JsonSubTypes(Array(
+    new JsonSubTypes.Type(value = classOf[ExplicitCollectionProvider[_]], name = "ExplicitCollectionProvider"),
+    new JsonSubTypes.Type(value = classOf[FunctionalCollectionProvider[_]], name = "FunctionalCollectionProvider"
+    ))
+  )
+  abstract class CollectionProviderMixIn {
+  }
+
+}
\ No newline at end of file
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/SlotMixIns.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/SlotMixIns.scala
new file mode 100644
index 0000000..66f8895
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/mixins/SlotMixIns.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.serialization.mixins
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility
+import com.fasterxml.jackson.annotation.{JsonAutoDetect, JsonIdentityInfo, JsonIgnore, JsonSubTypes, JsonTypeInfo, ObjectIdGenerators}
+import org.apache.wayang.core.plan.wayangplan.{InputSlot, OutputSlot}
+
+import java.util.List
+
+object SlotMixIns {
+
+  @JsonIdentityInfo(generator = classOf[ObjectIdGenerators.IntSequenceGenerator], property = "@id")
+  @JsonAutoDetect(fieldVisibility = Visibility.ANY, getterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
+  @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
+  @JsonSubTypes(Array(
+    new JsonSubTypes.Type(value = classOf[InputSlot[_]], name = "InputSlot"),
+    new JsonSubTypes.Type(value = classOf[OutputSlot[_]], name = "OutputSlot"),
+  ))
+  abstract class SlotMixIn[T] {
+
+  }
+
+  abstract class OutputSlotMixIn[T] {
+    @JsonIgnore
+    private var occupiedSlots: List[InputSlot[T]] = _
+  }
+
+}
\ No newline at end of file
diff --git a/wayang-api/wayang-api-scala-java/src/test/scala/org/apache/wayang/api/serialization/OperatorSerializationTests.scala b/wayang-api/wayang-api-scala-java/src/test/scala/org/apache/wayang/api/serialization/OperatorSerializationTests.scala
new file mode 100644
index 0000000..6dff434
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/test/scala/org/apache/wayang/api/serialization/OperatorSerializationTests.scala
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.wayang.api.serialization
+
+import org.apache.wayang.api.{MultiContext, PlanBuilder, createPlanBuilder}
+import org.apache.wayang.core.api.{Configuration, WayangContext}
+import org.apache.wayang.java.Java
+import org.apache.wayang.postgres.Postgres
+import org.apache.wayang.postgres.operators.PostgresTableSource
+import org.apache.wayang.sqlite3.Sqlite3
+import org.apache.wayang.sqlite3.operators.Sqlite3TableSource
+import org.junit.Test
+
+import java.io.{File, PrintWriter}
+import java.sql.{Connection, Statement}
+import scala.collection.convert.ImplicitConversions.`iterable AsScalaIterable`
+
+
+class OperatorSerializationTests extends SerializationTestBase {
+
+  @Test
+  def testReadMapCollect(): Unit = {
+    // Set up WayangContext.
+    val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+
+    // Generate some test data.
+    val inputValues = (for (i <- 1 to 10) yield i).toArray
+
+    // Build and execute a Wayang plan.
+    val dq = wayang
+      .loadCollection(inputValues).withName("Load input values")
+      .map(_ + 2).withName("Add 2")
+
+    dq.operator.getTargetPlatforms
+
+    // Check the outcome.
+    val expectedOutputValues = inputValues.map(_ + 2).map(_.toString).toList
+    serializeDeserializeExecuteAssert(dq.operator, wayang, expectedOutputValues)
+  }
+
+  @Test
+  def testFilterDistinctCount(): Unit = {
+    // Set up WayangContext.
+    val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+
+    // Generate some test data.
+    val inputValues = List(1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7, 8, 8)
+
+    // Build and execute a Wayang plan.
+    val dq = wayang
+      .loadCollection(inputValues)
+      .filter(n => n >= 4)
+      .distinct
+      .count
+
+    // Check the outcome.
+    val expected = List("5")
+    serializeDeserializeExecuteAssert(dq.operator, wayang, expected)
+  }
+
+  @Test
+  def testReduce(): Unit = {
+    // Set up WayangContext.
+    val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+
+    // Generate some test data.
+    val inputValues = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
+
+    // Build and execute a Wayang plan.
+    val dq = wayang
+      .loadCollection(inputValues)
+      .reduce((a, b) => a + b)
+
+    // Check the outcome.
+    val expected = List("55")
+    serializeDeserializeExecuteAssert(dq.operator, wayang, expected)
+  }
+
+  @Test
+  def testWordCount(): Unit = {
+    val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+
+    val inputValues = Array("Big data is big.", "Is data big data?")
+
+    val dq = wayang
+      .loadCollection(inputValues).withName("Load input values")
+      .flatMap(_.split("\\s+")).withName("Split words")
+      .map(_.replaceAll("\\W+", "").toLowerCase).withName("To lowercase")
+      .map((_, 1)).withName("Attach counter")
+      .reduceByKey(_._1, (a, b) => (a._1, a._2 + b._2)).withName("Sum counters")
+
+    val expectedWordCounts = List(("big", 3), ("data", 3), ("is", 2)).map(_.toString())
+    serializeDeserializeExecuteAssert(dq.operator, wayang, expectedWordCounts)
+  }
+
+  @Test
+  def testGroupBy(): Unit = {
+    val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+
+    val inputValues = Array(1, 2, 3, 4, 5, 7, 8, 9, 10)
+
+    val dq = wayang
+      .loadCollection(inputValues)
+      .groupByKey(_ % 2).withName("group odd and even")
+      .map {
+        group =>
+
+          val buffer = group.toBuffer
+          buffer.sortBy(identity)
+          if (buffer.size % 2 == 0) (buffer(buffer.size / 2 - 1) + buffer(buffer.size / 2)) / 2
+          else buffer(buffer.size / 2)
+      }.withName("median")
+
+    val expectedOutputValues = List("6", "5")
+    serializeDeserializeExecuteAssert(dq.operator, wayang, expectedOutputValues)
+  }
+
+  @Test
+  def testSort(): Unit = {
+    val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+
+    val inputValues1 = Array(3, 4, 5, 2, 1)
+
+    val builder = new PlanBuilder(wayang)
+    val dataQuanta1 = builder.loadCollection(inputValues1)
+    val dq = dataQuanta1
+      .sort(r => r)
+
+    val expectedValues = List(1, 2, 3, 4, 5).map(_.toString)
+    serializeDeserializeExecuteAssert(dq.operator, wayang, expectedValues)
+  }
+
+  @Test
+  def testMapPartitions(): Unit = {
+    val wayang = new WayangContext().withPlugin(Java.basicPlugin())
+
+    val dq = wayang
+      .loadCollection(Seq(0, 1, 2, 3, 4, 6, 8))
+      .mapPartitions { ints =>
+        var (numOdds, numEvens) = (0, 0)
+        ints.foreach(i => if ((i & 1) == 0) numEvens += 1 else numOdds += 1)
+        Seq(("odd", numOdds), ("even", numEvens))
+      }
+      .reduceByKey(_._1, { case ((kind1, count1), (kind2, count2)) => (kind1, count1 + count2) })
+
+    val expectedValues = List(("even", 5), ("odd", 2)).map(_.toString())
+    serializeDeserializeExecuteAssert(dq.operator, wayang, expectedValues)
+  }
+
+  @Test
+  def testZipWithId(): Unit = {
+    val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+
+    val inputValues = for (i <- 0 until 100; j <- 0 until 42) yield i
+
+    val dq = wayang
+      .loadCollection(inputValues)
+      .zipWithId
+      .groupByKey(_.field1)
+      .map { group =>
+
+        (group.map(_.field0).toSet.size, 1)
+      }
+      .reduceByKey(_._1, (t1, t2) => (t1._1, t1._2 + t2._2))
+
+    val expectedValues = List((42, 100)).map(_.toString())
+    serializeDeserializeExecuteAssert(dq.operator, wayang, expectedValues)
+  }
+
+  // Function to create a temp file and write content to it
+  def createTempFile(content: String, prefix: String): File = {
+    val tempFile = File.createTempFile(prefix, ".txt")
+    val writer = new PrintWriter(tempFile)
+    try {
+      writer.write(content.trim)
+    } finally {
+      writer.close()
+    }
+    tempFile
+  }
+
+  @Test
+  def testJoin(): Unit = {
+    val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+
+    // Contents for the temp files
+    val content1 =
+      """
+        |Water, 0
+        |Tonic, 5
+        |Juice, 10
+      """.stripMargin
+
+    val content2 =
+      """
+        |Apple juice, Juice
+        |Tap water, Water
+        |Orange juice, Juice
+      """.stripMargin
+
+    // Create temp files
+    val tempFile1 = createTempFile(content1, "testFile1")
+    val tempFile2 = createTempFile(content2, "testFile2")
+
+    // Build initial data quanta
+    val builder = new PlanBuilder(wayang)
+    val dataQuanta1 = builder.readTextFile(s"file://$tempFile1")
+      .map(s => {
+        val values = s.split(", ")
+        (values(0), values(1).toInt)
+      })
+    val dataQuanta2 = builder.readTextFile(s"file://$tempFile2")
+      .map(s => {
+        val values = s.split(", ")
+        (values(0), values(1))
+      })
+
+    // Join
+    val dq = dataQuanta1
+      .join[(String, String), String](_._1, dataQuanta2, _._2)
+      .map(joinTuple => (joinTuple.field1._1, joinTuple.field0._2))
+
+    // Assert output
+    val expectedValues = List(("Apple juice", 10), ("Tap water", 0), ("Orange juice", 10))
+      .map(s => s.toString())
+    serializeDeserializeExecuteAssert(dq.operator, wayang, expectedValues)
+
+    // Clean up: Delete the temp files
+    tempFile1.delete()
+    tempFile2.delete()
+  }
+
+  @Test
+  def testJoin2(): Unit = {
+    val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+
+    val inputValues1 = Array(1, 3, 2, 4, 5)
+    val inputValues2 = Array(3, 5)
+
+    val builder = new PlanBuilder(wayang)
+    val dataQuanta1 = builder.loadCollection(inputValues1)
+    val dataQuanta2 = builder.loadCollection(inputValues2)
+    val dq = dataQuanta1
+      .join[Int, Int](n => n, dataQuanta2, n => n)
+      .map(joinTuple => joinTuple.field0)
+
+    val expectedValues = List("3", "5")
+    serializeDeserializeExecuteAssert(dq.operator, wayang, expectedValues)
+  }
+
+  @Test
+  def testCoGroup(): Unit = {
+    val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+
+    // Contents for the temp files
+    val content1 =
+      """
+        |Water, 0
+        |Cola, 5
+        |Juice, 10
+      """.stripMargin
+
+    val content2 =
+      """
+        |Apple juice, Juice
+        |Tap water, Water
+        |Orange juice, Juice
+      """.stripMargin
+
+    // Create temp files
+    val tempFile1 = createTempFile(content1, "testFile1")
+    val tempFile2 = createTempFile(content2, "testFile2")
+
+    // Build initial data quanta
+    val builder = new PlanBuilder(wayang)
+    val dataQuanta1 = builder.readTextFile(s"file://$tempFile1")
+      .map(s => {
+        val values = s.split(", ")
+        (values(0), values(1).toInt)
+      })
+    val dataQuanta2 = builder.readTextFile(s"file://$tempFile2")
+      .map(s => {
+        val values = s.split(", ")
+        (values(0), values(1))
+      })
+
+    // Co-group
+    val dq = dataQuanta1
+      .coGroup[(String, String), String](_._1, dataQuanta2, _._2)
+
+    // Assert output
+    val expectedValues = List(
+      "([(Water,0)], [(Tap water,Water)])",
+      "([(Juice,10)], [(Apple juice,Juice), (Orange juice,Juice)])",
+      "([(Cola,5)], [])"
+    )
+    serializeDeserializeExecuteAssert(dq.operator, wayang, expectedValues)
+
+    // Clean up: Delete the temp files
+    tempFile1.delete()
+    tempFile2.delete()
+  }
+
+
+  @Test
+  def testUnion(): Unit = {
+    val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+
+    val inputValues1 = Array(1, 2, 3, 4)
+    val inputValues2 = Array(0, 1, 3, 5)
+
+    val builder = new PlanBuilder(wayang)
+    val dataQuanta1 = builder.loadCollection(inputValues1)
+    val dataQuanta2 = builder.loadCollection(inputValues2)
+    val dq = dataQuanta1.union(dataQuanta2)
+
+    val unionExpectedValues = List(1, 2, 3, 4, 0, 1, 3, 5).map(_.toString)
+    serializeDeserializeExecuteAssert(dq.operator, wayang, unionExpectedValues)
+  }
+
+  @Test
+  def testIntersect(): Unit = {
+    val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+
+    val inputValues1 = Array(1, 2, 3, 4, 5, 7, 8, 9, 10)
+    val inputValues2 = Array(9, 0, 2, 3, 3, 4, 5, 7, 8, 11)
+
+    val builder = new PlanBuilder(wayang)
+    val dataQuanta1 = builder.loadCollection(inputValues1)
+    val dataQuanta2 = builder.loadCollection(inputValues2)
+    val dq = dataQuanta1.intersect(dataQuanta2)
+
+    val intersectExpectedValues = List(2, 3, 4, 5, 7, 8, 9).map(_.toString)
+    serializeDeserializeExecuteAssert(dq.operator, wayang, intersectExpectedValues)
+  }
+
+  @Test
+  def testRepeat(): Unit = {
+    val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+
+    val inputValues = Array(1, 2)
+
+    val dq = wayang
+      .loadCollection(inputValues)
+      .repeat(3,
+        _.reduce(_ * _)
+          .flatMap(v => Seq(v, v + 1))
+      )
+
+    // initial: 1,2 -> 1st: 2,3 -> 2nd: 6,7 => 3rd: 42,43
+    val expectedValues = List("42", "43")
+    serializeDeserializeExecuteAssert(dq.operator, wayang, expectedValues)
+  }
+
+  @Test
+  def testRepeat2(): Unit = {
+    val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+
+    val inputValues = Array(1, 2)
+
+    val dq = wayang
+      .loadCollection(inputValues)
+      .repeat(3,
+        _.reduce(_ * _)
+          .flatMap(v => Seq(v, v + 1))
+      )
+      .filter(n => n == 43)
+
+    // initial: 1,2 -> 1st: 2,3 -> 2nd: 6,7 => 3rd: 42,43
+    val expectedValues = List("43")
+    serializeDeserializeExecuteAssert(dq.operator, wayang, expectedValues)
+  }
+
+  @Test
+  def testRepeat3(): Unit = {
+    val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+
+    val inputValues = Array(1, 2, 3, 4, 5)
+
+    val dq = wayang
+      .loadCollection(inputValues)
+      .repeat(10, _.map(_ + 1))
+
+    val expectedValues = List("11", "12", "13", "14", "15")
+    serializeDeserializeExecuteAssert(dq.operator, wayang, expectedValues)
+  }
+
+  @Test
+  def testDoWhile(): Unit = {
+    val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+
+    val inputValues = Array(1, 2)
+
+    val dq = wayang
+      .loadCollection(inputValues)
+      .doWhile[Int](vals => vals.max > 100, {
+        start =>
+          val sum = start.reduce(_ + _).withName("Sum")
+          (start.union(sum), sum)
+      })
+
+    val expectedValues = List(1, 2, 3, 6, 12, 24, 48, 96, 192).map(_.toString)
+    serializeDeserializeExecuteAssert(dq.operator, wayang, expectedValues)
+  }
+
+  @Test
+  def testDoWhile2(): Unit = {
+    val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+
+    val inputValues = Array(1, 2)
+
+    val dq = wayang
+      .loadCollection(inputValues)
+      .doWhile[Int](vals => vals.max > 100, {
+        start =>
+          val sum = start.reduce(_ + _).withName("Sum")
+          (start.union(sum), sum)
+      })
+      .filter(n => n > 50)
+
+    val expectedValues = List(96, 192).map(_.toString)
+    serializeDeserializeExecuteAssert(dq.operator, wayang, expectedValues)
+  }
+
+  @Test
+  def testSample(): Unit = {
+    val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+
+    val inputValues = for (i <- 0 until 100) yield i
+
+    val dq = wayang
+      .loadCollection(inputValues)
+      .sample(10)
+
+    val tempFilenameOut = serializeDeserializeExecute(dq.operator, wayang)
+    SerializationTestBase.assertOutputFileLineCount(tempFilenameOut, 10)
+  }
+
+  @Test
+  def testSqlite3(): Unit = {
+    // Initialize some test data.
+    val configuration = new Configuration
+    val sqlite3dbFile = File.createTempFile("wayang-sqlite3", "db")
+    sqlite3dbFile.deleteOnExit()
+    configuration.setProperty("wayang.sqlite3.jdbc.url", "jdbc:sqlite:" + sqlite3dbFile.getAbsolutePath)
+
+    try {
+      val connection: Connection = Sqlite3.platform.createDatabaseDescriptor(configuration).createJdbcConnection
+      try {
+        val statement: Statement = connection.createStatement
+        statement.addBatch("DROP TABLE IF EXISTS customer;")
+        statement.addBatch("CREATE TABLE customer (name TEXT, age INT);")
+        statement.addBatch("INSERT INTO customer VALUES ('John', 20)")
+        statement.addBatch("INSERT INTO customer VALUES ('Timmy', 16)")
+        statement.addBatch("INSERT INTO customer VALUES ('Evelyn', 35)")
+        statement.executeBatch()
+      } finally {
+        if (connection != null) connection.close()
+      }
+    }
+
+    // Set up WayangContext.
+    val wayang = new MultiContext(configuration).withPlugin(Java.basicPlugin).withPlugin(Sqlite3.plugin)
+
+    // Build plan
+    val dq = wayang
+      .readTable(new Sqlite3TableSource("customer", "name", "age"))
+      .filter(r => r.getField(1).asInstanceOf[Integer] >= 18, sqlUdf = "age >= 18").withTargetPlatforms(Java.platform)
+      .projectRecords(Seq("name"))
+      .map(_.getField(0).asInstanceOf[String])
+
+    // Execute and assert
+    val expectedValues = List("John", "Evelyn")
+    serializeDeserializeExecuteAssert(dq.operator, wayang, expectedValues)
+  }
+
+//  @Test
+  def testPostgres(): Unit = {
+    // Initialize some test data.
+    val configuration = new Configuration
+    configuration.setProperty("wayang.postgres.jdbc.url", "jdbc:postgresql://localhost:5432/test_erp")
+    configuration.setProperty("wayang.postgres.jdbc.user", "postgres")
+    configuration.setProperty("wayang.postgres.jdbc.password", "1234")
+
+    // Set up WayangContext.
+    val wayang = new MultiContext(configuration).withPlugin(Java.basicPlugin).withPlugin(Postgres.plugin)
+
+    // Build plan
+    val dq = wayang
+      .readTable(new PostgresTableSource("clients", "first_name", "role"))
+      .projectRecords(Seq("first_name"))
+      .map(_.getField(0).asInstanceOf[String]).withTargetPlatforms(Java.platform)
+
+    // Execute and assert
+    val expectedValues = List("John", "Jane")
+    serializeDeserializeExecuteAssert(dq.operator, wayang, expectedValues)
+  }
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/test/scala/org/apache/wayang/api/serialization/OtherSerializationTests.scala b/wayang-api/wayang-api-scala-java/src/test/scala/org/apache/wayang/api/serialization/OtherSerializationTests.scala
new file mode 100644
index 0000000..1f01b2f
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/test/scala/org/apache/wayang/api/serialization/OtherSerializationTests.scala
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.wayang.api.serialization
+
+import org.apache.wayang.api.{MultiContext, MultiContextPlanBuilder, PlanBuilder, createPlanBuilder, toCardinalityEstimator, toLoadEstimator}
+import org.apache.wayang.basic.operators.{MapOperator, TextFileSink}
+import org.apache.wayang.core.api.{Configuration, WayangContext}
+import org.apache.wayang.core.optimizer.costs._
+import org.apache.wayang.core.plan.wayangplan.{Operator, WayangPlan}
+import org.apache.wayang.core.platform.Platform
+import org.apache.wayang.core.util.ReflectionUtils
+import org.apache.wayang.java.Java
+import org.apache.wayang.spark.Spark
+import org.junit.{Assert, Test}
+
+import java.nio.file.{Files, Paths}
+
+
+class OtherSerializationTests extends SerializationTestBase {
+
+  @Test
+  def multiContextSerializationTest(): Unit = {
+    val configuration = new Configuration()
+    configuration.setProperty("spark.master", "random_master_url_1")
+    configuration.setProperty("spark.app.name", "random_app_name_2")
+    val multiContext = new MultiContext(configuration).withPlugin(Spark.basicPlugin()).withTextFileSink("file:///tmp/out11")
+
+    try {
+      val serializedConfiguration = SerializationUtils.serialize(configuration)
+      val deserializedConfiguration = SerializationUtils.deserialize[Configuration](serializedConfiguration)
+      Assert.assertEquals(deserializedConfiguration.getStringProperty("spark.master"), "random_master_url_1")
+      Assert.assertEquals(deserializedConfiguration.getStringProperty("spark.app.name"), "random_app_name_2")
+
+      val serializedMultiContext = SerializationUtils.serialize(multiContext)
+      val deserializedMultiContext = SerializationUtils.deserialize[MultiContext](serializedMultiContext)
+      Assert.assertEquals(deserializedMultiContext.getConfiguration.getStringProperty("spark.master"), "random_master_url_1")
+      Assert.assertEquals(deserializedMultiContext.getConfiguration.getStringProperty("spark.app.name"), "random_app_name_2")
+      Assert.assertEquals(deserializedMultiContext.getSink.get.asInstanceOf[MultiContext.TextFileSink].url, "file:///tmp/out11")
+      Assert.assertArrayEquals(multiContext.getConfiguration.getPlatformProvider.provideAll().toArray, deserializedMultiContext.getConfiguration.getPlatformProvider.provideAll().toArray)
+    } catch {
+      case t: Throwable =>
+        t.printStackTrace()
+        throw t
+    }
+  }
+
+
+  @Test
+  def planBuilderSerializationTest(): Unit = {
+    val configuration1 = new Configuration()
+    configuration1.setProperty("spark.master", "master1")
+
+    val context1 = new MultiContext(configuration1).withPlugin(Spark.basicPlugin()).withTextFileSink("file:///tmp/out11")
+
+    val planBuilder = new PlanBuilder(context1)
+      .withUdfJarsOf(classOf[OtherSerializationTests])
+      .withUdfJars("Aaa", "Bbb", "Ccc")
+
+    try {
+      val serialized = SerializationUtils.serializeAsString(planBuilder)
+      val deserialized = SerializationUtils.deserializeFromString[PlanBuilder](serialized)
+      // SerializationTestBase.log(SerializationUtils.serializeAsString(deserialized), testName.getMethodName + ".log.json")
+
+      Assert.assertEquals(
+        planBuilder.udfJars,
+        deserialized.udfJars
+      )
+      Assert.assertEquals(
+        deserialized.wayangContext.asInstanceOf[MultiContext].getConfiguration.getStringProperty("spark.master"),
+        "master1"
+      )
+      Assert.assertEquals(
+        deserialized.wayangContext.asInstanceOf[MultiContext].getSink.get.asInstanceOf[MultiContext.TextFileSink].url,
+        "file:///tmp/out11"
+      )
+    }
+    catch {
+      case t: Throwable =>
+        t.printStackTrace()
+        throw t
+    }
+  }
+
+
+  @Test
+  def multiContextPlanBuilderSerializationTest(): Unit = {
+    val configuration1 = new Configuration()
+    configuration1.setProperty("spark.master", "master1")
+    val configuration2 = new Configuration()
+    configuration2.setProperty("spark.master", "master2")
+
+    val context1 = new MultiContext(configuration1).withPlugin(Spark.basicPlugin()).withTextFileSink("file:///tmp/out11")
+    val context2 = new MultiContext(configuration2).withPlugin(Spark.basicPlugin()).withObjectFileSink("file:///tmp/out12")
+
+    val multiContextPlanBuilder = new MultiContextPlanBuilder(List(context1, context2))
+      .withUdfJarsOf(classOf[OtherSerializationTests])
+      .withUdfJars("Aaa", "Bbb", "Ccc")
+
+    try {
+      val serialized = SerializationUtils.serializeAsString(multiContextPlanBuilder)
+      val deserialized = SerializationUtils.deserializeFromString[MultiContextPlanBuilder](serialized)
+      // SerializationTestBase.log(SerializationUtils.serializeAsString(deserialized), testName.getMethodName + ".log.json")
+
+      Assert.assertEquals(
+        multiContextPlanBuilder.udfJars,
+        deserialized.udfJars
+      )
+      Assert.assertEquals(
+        multiContextPlanBuilder.multiContexts(0).getConfiguration.getStringProperty("spark.master"),
+        "master1"
+      )
+      Assert.assertEquals(
+        multiContextPlanBuilder.multiContexts(1).getConfiguration.getStringProperty("spark.master"),
+        "master2"
+      )
+      Assert.assertEquals(
+        multiContextPlanBuilder.multiContexts(0).getSink.get.asInstanceOf[MultiContext.TextFileSink].url,
+        "file:///tmp/out11"
+      )
+      Assert.assertEquals(
+        multiContextPlanBuilder.multiContexts(1).getSink.get.asInstanceOf[MultiContext.ObjectFileSink].url,
+        "file:///tmp/out12"
+      )
+    }
+    catch {
+      case t: Throwable =>
+        t.printStackTrace()
+        throw t
+    }
+  }
+
+
+  @Test
+  def serializeToTempFileTest(): Unit = {
+    // Define configuration
+    val configuration = new Configuration()
+    val wayangContext = new WayangContext(configuration)
+      .withPlugin(Java.basicPlugin())
+    val planBuilder = new PlanBuilder(wayangContext)
+      .withUdfJarsOf(classOf[OtherSerializationTests])
+
+    // Define plan
+    val dataQuanta = planBuilder
+      .loadCollection(List("12345", "12345678", "1234567890", "1234567890123"))
+      .map(s => s + " Wayang out")
+      .map(s => (s, "AAAA", "BBBB"))
+      .map(s => List(s._1, "a", "b", "c"))
+      .filter(s => s.head.length > 20)
+      .map(s => s.head)
+
+    val tempfile = TempFileUtils.writeToTempFileAsString(dataQuanta.operator)
+    val operator = TempFileUtils.readFromTempFileFromString[Operator](tempfile)
+
+    // Attach an output sink to deserialized plan
+    val tempFileOut = s"/tmp/${testName.getMethodName}.out"
+    val sink = new TextFileSink[AnyRef](s"file://$tempFileOut", classOf[AnyRef])
+    operator.connectTo(0, sink, 0)
+
+    // Execute plan
+    val plan = new WayangPlan(sink)
+    wayangContext.execute(plan, ReflectionUtils.getDeclaringJar(classOf[OtherSerializationTests]))
+
+    // Check results
+    val expectedLines = List("1234567890 Wayang out", "1234567890123 Wayang out")
+    SerializationTestBase.assertOutputFile(tempFileOut, expectedLines)
+  }
+
+
+  @Test
+  def multiDataQuantaExecuteTest(): Unit = {
+
+    try {
+      // Create multi contexts
+      val out1 = Files.createTempFile("out1", "tmp").toString
+      val out2 = Files.createTempFile("out2", "tmp").toString
+      val context1 = new MultiContext(new Configuration()).withPlugin(Java.basicPlugin()).withTextFileSink(s"file://$out1")
+      val context2 = new MultiContext(new Configuration()).withPlugin(Java.basicPlugin()).withTextFileSink(s"file://$out2")
+
+      // Create multiContextPlanBuilder
+      val multiContextPlanBuilder = new MultiContextPlanBuilder(List(context1, context2))
+        .withUdfJarsOf(classOf[OtherSerializationTests])
+
+      // Build and execute plan
+      multiContextPlanBuilder
+        .forEach(_.loadCollection(List("aaabbb", "aaabbbccc", "aaabbbcccddd", "aaabbbcccdddeee")))
+        .forEach(_.map(s => s + " Wayang out."))
+        .forEach(_.filter(s => s.length > 20))
+        .execute()
+
+      // Check results
+      val expectedLines = List("aaabbbccc Wayang out.", "aaabbbcccddd Wayang out.", "aaabbbcccdddeee Wayang out.")
+      SerializationTestBase.assertOutputFile(out1, expectedLines)
+      SerializationTestBase.assertOutputFile(out2, expectedLines)
+
+      // Delete temp files after usage
+      Files.deleteIfExists(Paths.get(out1))
+      Files.deleteIfExists(Paths.get(out2))
+    }
+    catch {
+      case t: Throwable =>
+        t.printStackTrace()
+        throw t
+    }
+  }
+
+
+  @Test
+  def platformSerializationTest(): Unit = {
+    try {
+      val serialized = SerializationUtils.serialize(Java.platform())
+      val deserialized = SerializationUtils.deserialize[Platform](serialized)
+      Assert.assertEquals(deserialized.getClass.getName, Java.platform().getClass.getName)
+    } catch {
+      case t: Throwable =>
+        t.printStackTrace()
+        throw t
+    }
+  }
+
+
+  @Test
+  def targetPlatformsTest(): Unit = {
+    val configuration = new Configuration()
+    val wayangContext = new WayangContext(configuration)
+      .withPlugin(Java.basicPlugin())
+    val planBuilder = new PlanBuilder(wayangContext)
+      .withUdfJarsOf(classOf[OtherSerializationTests])
+
+    val dataQuanta = planBuilder
+      .loadCollection(List("12345", "12345678", "1234567890", "1234567890123"))
+      .map(s => s + " Wayang out").withTargetPlatforms(Spark.platform()).withTargetPlatforms(Java.platform())
+
+    try {
+      val serialized = SerializationUtils.serializeAsString(dataQuanta.operator)
+      val deserialized = SerializationUtils.deserializeFromString[Operator](serialized)
+      Assert.assertEquals(deserialized.getTargetPlatforms.size(), 2)
+      val deserializedPlatformNames = deserialized.getTargetPlatforms.toArray.map(p => p.getClass.getName)
+      Assert.assertTrue(deserializedPlatformNames.contains(Spark.platform().getClass.getName))
+      Assert.assertTrue(deserializedPlatformNames.contains(Java.platform().getClass.getName))
+    } catch {
+      case t: Throwable =>
+        t.printStackTrace()
+        throw t
+    }
+  }
+
+
+  @Test
+  def targetPlatforms2Test(): Unit = {
+    val configuration = new Configuration()
+    val wayangContext = new WayangContext(configuration)
+      .withPlugin(Java.basicPlugin())
+    val planBuilder = new PlanBuilder(wayangContext)
+      .withUdfJarsOf(classOf[OtherSerializationTests])
+
+    val inputValues1 = Array("Big data is big.", "Is data big data?")
+    val dataQuanta = planBuilder
+      .loadCollection(inputValues1)
+      .flatMap(_.split("\\s+"))
+      .map(_.replaceAll("\\W+", "").toLowerCase)
+      .map((_, 1))
+      .reduceByKey(_._1, (a, b) => (a._1, a._2 + b._2))
+      .withTargetPlatforms(Spark.platform())
+
+    try {
+      val serialized = SerializationUtils.serializeAsString(dataQuanta.operator)
+      val deserialized = SerializationUtils.deserializeFromString[Operator](serialized)
+      Assert.assertEquals(deserialized.getTargetPlatforms.size(), 1)
+      Assert.assertEquals(deserialized.getTargetPlatforms.toArray.toList(0).getClass.getName, Spark.platform().getClass.getName)
+    } catch {
+      case t: Throwable =>
+        t.printStackTrace()
+        throw t
+    }
+  }
+
+
+  @Test
+  def testLoadProfileEstimator(): Unit = {
+    val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+
+    // Create load estimator
+    val loadProfileEstimator: LoadProfileEstimator = new NestableLoadProfileEstimator(
+      (in: Long, _: Long) => 10 * in,
+      (_: Long, _: Long) => 1000L
+    )
+
+    // Create map operator with load profile estimator
+    val dq1 = wayang.loadCollection(List(1, 2, 3))
+      .map(_ + 1, udfLoad = loadProfileEstimator)
+
+    // Serialize and then deserialize the map operator
+    var deserialized: Operator = null
+    try {
+      val serialized = SerializationUtils.serializeAsString(dq1.operator)
+      deserialized = SerializationUtils.deserializeFromString[Operator](serialized)
+    }
+    catch {
+      case e: Exception =>
+        e.printStackTrace()
+        throw e
+    }
+
+    // Check if the load profile estimators are equal
+    val originalLoadProfileEstimator = dq1.operator.asInstanceOf[MapOperator[Any, Any]]
+      .getFunctionDescriptor
+      .getLoadProfileEstimator.get().asInstanceOf[NestableLoadProfileEstimator]
+    val deserializedLoadProfileEstimator = deserialized.asInstanceOf[MapOperator[Any, Any]]
+      .getFunctionDescriptor
+      .getLoadProfileEstimator.get().asInstanceOf[NestableLoadProfileEstimator]
+
+    Assert.assertEquals(originalLoadProfileEstimator.getConfigurationKeys, deserializedLoadProfileEstimator.getConfigurationKeys)
+    Assert.assertEquals(originalLoadProfileEstimator.getTemplateKeys, deserializedLoadProfileEstimator.getTemplateKeys)
+
+    /*// Print the contents of configuration keys array for both the originalLoadProfileEstimator and the deserializedLoadProfileEstimator
+    println("originalLoadProfileEstimator.getConfigurationKeys: " + originalLoadProfileEstimator.getConfigurationKeys.mkString(","))
+    println("deserializedLoadProfileEstimator.getConfigurationKeys: " + deserializedLoadProfileEstimator.getConfigurationKeys.mkString(","))
+
+    // Print the contents of template keys array for both the originalLoadProfileEstimator and the deserializedLoadProfileEstimator
+    println("originalLoadProfileEstimator.getTemplateKeys: " + originalLoadProfileEstimator.getTemplateKeys.mkString(","))
+    println("deserializedLoadProfileEstimator.getTemplateKeys: " + deserializedLoadProfileEstimator.getTemplateKeys.mkString(","))
+
+    // Print the toString representation of both the originalLoadProfileEstimator and the deserializedLoadProfileEstimator
+    println("originalLoadProfileEstimator.toString: " + originalLoadProfileEstimator.toString)
+    println("deserializedLoadProfileEstimator.toString: " + deserializedLoadProfileEstimator.toString)*/
+
+  }
+
+
+  @Test
+  def testCardinalityEstimator(): Unit = {
+    val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+
+    // Create map operator with load profile estimator
+    val dq1 = wayang.loadCollection(List(1, 2, 3))
+      .map(_ + 1)
+      .withCardinalityEstimator((in: Long) => math.round(in * 0.01))
+
+    // Serialize and then deserialize the map operator
+    var deserialized: Operator = null
+    try {
+      val serialized = SerializationUtils.serializeAsString(dq1.operator)
+      deserialized = SerializationUtils.deserializeFromString[Operator](serialized)
+    }
+    catch {
+      case e: Exception =>
+        e.printStackTrace()
+        throw e
+    }
+
+  }
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/test/scala/org/apache/wayang/api/serialization/SerializationTestBase.scala b/wayang-api/wayang-api-scala-java/src/test/scala/org/apache/wayang/api/serialization/SerializationTestBase.scala
new file mode 100644
index 0000000..5b83798
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/test/scala/org/apache/wayang/api/serialization/SerializationTestBase.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.wayang.api.serialization
+
+import org.apache.wayang.basic.operators.TextFileSink
+import org.apache.wayang.core.api.WayangContext
+import org.apache.wayang.core.plan.wayangplan.{LoopHeadOperator, Operator, WayangPlan}
+import org.apache.wayang.core.util.ReflectionUtils
+import org.junit.rules.TestName
+import org.junit.{Assert, Rule}
+
+import java.io.{File, FileWriter}
+import java.nio.file.{Files, Paths}
+import java.util.stream.Collectors
+import scala.jdk.CollectionConverters.asScalaBufferConverter
+
+
+trait SerializationTestBase {
+
+  //
+  // Some magic from https://stackoverflow.com/a/36152864/5589918 in order to get the current test name
+  //
+  var _testName: TestName = new TestName
+
+  @Rule
+  def testName: TestName = _testName
+
+  def testName_=(aTestName: TestName): Unit = {
+    _testName = aTestName
+  }
+
+
+  def serializeDeserializeExecuteAssert(operator: Operator, wayangContext: WayangContext, expectedLines: List[String], log: Boolean = false): Unit = {
+    var tempFileOut: Option[String] = None
+    try {
+      tempFileOut = Some(serializeDeserializeExecute(operator, wayangContext, log)) // Serialize, deserialize, execute
+      SerializationTestBase.assertOutputFile(tempFileOut.get, expectedLines) // Check results
+    }
+    catch {
+      case t: Throwable =>
+        t.printStackTrace()
+        throw t
+    }
+    finally {
+      // Delete tempFileOut if it exists
+      tempFileOut match {
+        case Some(file) => new java.io.File(file).delete()
+        case None => // Do nothing
+      }
+    }
+  }
+
+  def serializeDeserializeExecute(operator: Operator, wayangContext: WayangContext, log: Boolean = false): String = {
+    try {
+      val serialized = SerializationUtils.serializeAsString(operator)
+      if (log) SerializationTestBase.log(serialized, testName.getMethodName + ".log.json")
+      val deserialized = SerializationUtils.deserializeFromString[Operator](serialized)
+
+      // Create an output sink
+      val outType = deserialized.getOutput(0).getType.getDataUnitType.getTypeClass
+      val tempFilenameOut = s"/tmp/${testName.getMethodName}.out"
+      val sink = new TextFileSink(s"file://$tempFilenameOut", outType)
+
+      // And attach it to the deserialized operator
+      deserialized match {
+        case loopHeadOperator: LoopHeadOperator => loopHeadOperator.connectTo(1, sink, 0)
+        case operator: Operator => operator.connectTo(0, sink, 0)
+      }
+
+      // Execute plan
+      val plan = new WayangPlan(sink)
+      wayangContext.execute(plan, ReflectionUtils.getDeclaringJar(classOf[OperatorSerializationTests]))
+      tempFilenameOut
+    }
+    catch {
+      case t: Throwable =>
+        t.printStackTrace()
+        throw t
+    }
+  }
+
+}
+
+object SerializationTestBase {
+
+  def assertOutputFile(outputFilename: String, expectedLines: List[String]): Unit = {
+
+    // Read lines
+    val lines = Files.lines(Paths.get(outputFilename)).collect(Collectors.toList[String]).asScala
+
+    // Assert number of lines
+    Assert.assertEquals("Number of lines in the file should match", expectedLines.size, lines.size)
+
+    // Assert content of lines
+    lines.zip(expectedLines).foreach { case (actual, expected) =>
+      Assert.assertEquals("Line content should match", expected, actual)
+    }
+  }
+
+  def assertOutputFileLineCount(outputFilename: String, expectedNumberOfLines: Int): Unit = {
+
+    // Read lines
+    val lines = Files.lines(Paths.get(outputFilename)).collect(Collectors.toList[String]).asScala
+
+    // Assert number of lines
+    Assert.assertEquals("Number of lines in the file should match", expectedNumberOfLines, lines.size)
+  }
+
+
+  def log(text: String, filename: String = "customLogFile.json"): Unit = {
+
+    // Get the user's desktop path
+    val desktopPath = System.getProperty("user.home") + "/Desktop"
+
+    // Specify the filename for your custom file
+    val customFile = new File(desktopPath, filename)
+
+    // Create the file if it doesn't exist, or overwrite it if it does
+    customFile.createNewFile() // This will not harm the existing file
+
+    // Write to the temp file using a FileWriter
+    val writer = new FileWriter(customFile)
+    try {
+      writer.write(text)
+    } finally {
+      writer.close()
+    }
+
+    // Log the path of the temporary file
+    println(s"Temp file created at: ${customFile.getAbsolutePath}")
+  }
+
+}
\ No newline at end of file
diff --git a/wayang-benchmark/src/main/scala/org/apache/wayang/async/apps/WordCount.scala b/wayang-benchmark/src/main/scala/org/apache/wayang/async/apps/WordCount.scala
new file mode 100644
index 0000000..1298052
--- /dev/null
+++ b/wayang-benchmark/src/main/scala/org/apache/wayang/async/apps/WordCount.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.wayang.async.apps
+
+import org.apache.wayang.api.async.DataQuantaImplicits._
+import org.apache.wayang.api.async.PlanBuilderImplicits._
+import org.apache.wayang.api.{MultiContext, DataQuanta, PlanBuilder}
+import org.apache.wayang.java.Java
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+object WordCount {
+
+  def main(args: Array[String]): Unit = {
+    println("WordCount")
+    println("Scala version:")
+    println(scala.util.Properties.versionString)
+
+    val planBuilder1 = new PlanBuilder(new MultiContext().withPlugin(Java.basicPlugin())).withUdfJarsOf(this.getClass)
+    val planBuilder2 = new PlanBuilder(new MultiContext().withPlugin(Java.basicPlugin())).withUdfJarsOf(this.getClass)
+    val planBuilder3 = new PlanBuilder(new MultiContext().withPlugin(Java.basicPlugin())).withUdfJarsOf(this.getClass)
+
+    val result1 = planBuilder1
+      .loadCollection(List(1, 2, 3, 4, 5))
+      .map(_ * 1)
+      .runAsync(tempFileOut = "file:///tmp/out1.temp")
+
+    val result2 = planBuilder2
+      .loadCollection(List(6, 7, 8, 9, 10))
+      .filter(_ <= 8)
+      .runAsync(tempFileOut = "file:///tmp/out2.temp")
+
+    val dq1: DataQuanta[Int] = planBuilder1.loadAsync(result1)
+    val dq2: DataQuanta[Int] = planBuilder1.loadAsync(result2)
+    val result3 = dq1.union(dq2)
+      .map(_ * 3)
+      .filter(_ < 100)
+      .runAsync(tempFileOut = "file:///tmp/out3.temp", result1, result2)
+
+    val result4 = planBuilder3
+      .loadCollection(List(1, 2, 3, 4, 5))
+      .filter(_ >= 2)
+      .runAsync(tempFileOut = "file:///tmp/out4.temp")
+
+    val dq3: DataQuanta[Int] = planBuilder1.loadAsync(result3)
+    val dq4: DataQuanta[Int] = planBuilder1.loadAsync(result4)
+    val result5 = dq3.intersect(dq4)
+      .map(_ * 4)
+      .writeTextFileAsync(url = "file:///tmp/out5.final", result3, result4)
+
+    Await.result(result5, Duration.Inf)
+    println("DONE!")
+
+  }
+
+}
+
diff --git a/wayang-benchmark/src/main/scala/org/apache/wayang/multicontext/apps/package.scala b/wayang-benchmark/src/main/scala/org/apache/wayang/multicontext/apps/package.scala
new file mode 100644
index 0000000..bcef35f
--- /dev/null
+++ b/wayang-benchmark/src/main/scala/org/apache/wayang/multicontext/apps/package.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.multicontext
+
+import org.apache.wayang.core.api.Configuration
+
+package object apps {
+
+  def loadConfig(args: Array[String]): (Configuration, Configuration) = {
+    if (args.length < 2) {
+      println("Loading default configurations.")
+      (new Configuration(), new Configuration())
+    } else {
+      println("Loading custom configurations.")
+      (loadConfigFromUrl(args(0)), loadConfigFromUrl(args(1)))
+    }
+  }
+
+  private def loadConfigFromUrl(url: String): Configuration = {
+    try {
+      new Configuration(url)
+    } catch {
+      case unexpected: Exception =>
+        unexpected.printStackTrace()
+        println(s"Can't load configuration from $url")
+        new Configuration()
+    }
+  }
+
+}
diff --git a/wayang-benchmark/src/main/scala/org/apache/wayang/multicontext/apps/tpch/Query1.scala b/wayang-benchmark/src/main/scala/org/apache/wayang/multicontext/apps/tpch/Query1.scala
new file mode 100644
index 0000000..5239dcd
--- /dev/null
+++ b/wayang-benchmark/src/main/scala/org/apache/wayang/multicontext/apps/tpch/Query1.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.wayang.multicontext.apps.tpch
+
+import org.apache.wayang.api.{MultiContext, MultiContextPlanBuilder}
+import org.apache.wayang.apps.tpch.CsvUtils
+import org.apache.wayang.apps.tpch.data.LineItem
+import org.apache.wayang.java.Java
+import org.apache.wayang.spark.Spark
+import org.apache.wayang.apps.tpch.queries.{Query1 => Query1Utils}
+import org.apache.wayang.multicontext.apps.loadConfig
+
+class Query1 {}
+
+object Query1 {
+
+  def main(args: Array[String]): Unit = {
+
+    if (args.length != 2 && args.length != 4) {
+      println("Usage: <lineitem-url> <delta> <optional-path-to-config-1> <optional-path-to-config-2>")
+      System.exit(1)
+    }
+
+    println("TPC-H querying #1 in multi context wayang!")
+    println("Scala version:")
+    println(scala.util.Properties.versionString)
+
+    val (configuration1, configuration2) = loadConfig(args.drop(2))
+
+    val context1 = new MultiContext(configuration1)
+      .withPlugin(Java.basicPlugin())
+      .withPlugin(Spark.basicPlugin())
+      .withTextFileSink("file:///tmp/out11")
+    val context2 = new MultiContext(configuration2)
+      .withPlugin(Java.basicPlugin())
+      .withPlugin(Spark.basicPlugin())
+      .withTextFileSink("file:///tmp/out12")
+
+    val multiContextPlanBuilder = new MultiContextPlanBuilder(List(context1, context2)).withUdfJarsOf(classOf[Query1])
+
+    // Example structure of lineitem file:
+    // 1|155190|7706|1|17|21168.23|0.04|0.02|N|O|1996-03-13|1996-02-12|1996-03-22|DELIVER IN PERSON|TRUCK|egular courts above the
+    // 1|67310|7311|2|36|45983.16|0.09|0.06|N|O|1996-04-12|1996-02-28|1996-04-20|TAKE BACK RETURN|MAIL|ly final dependencies: slyly bold
+    // ...
+    val lineItemFile = args(0)
+    val delta = args(1).toInt
+
+    multiContextPlanBuilder
+
+      // Load lineitem file
+      .forEach(_.readTextFile(lineItemFile))
+
+      // Parse
+      .forEach(_.map(s => LineItem.parseCsv(s)))
+
+      // Filter line items
+      .forEach(_.filter(t => t.shipDate <= CsvUtils.parseDate("1998-12-01") - delta))
+
+      // Project line items
+      .forEach(_.map(t => (t.returnFlag, t.lineStatus, t.quantity, t.extendedPrice, t.discount, t.tax)))
+
+      // Calculate result fields
+      .forEach(_.map { case (returnFlag, lineStatus, quantity, extendedPrice, discount, tax) =>
+        Query1Utils.Result(
+          returnFlag.toString,
+          lineStatus.toString,
+          quantity,
+          extendedPrice,
+          extendedPrice * (1 - discount),
+          extendedPrice * (1 - discount) * (1 + tax),
+          quantity,
+          extendedPrice,
+          discount,
+          1
+        )
+      })
+
+      // Aggregate line items
+      .forEach(_.reduceByKey(
+        result => (result.l_returnflag, result.l_linestatus),
+        (r1, r2) => Query1Utils.Result(
+          r1.l_returnflag,
+          r1.l_linestatus,
+          r1.sum_qty + r2.sum_qty,
+          r1.sum_base_price + r2.sum_base_price,
+          r1.sum_disc_price + r2.sum_disc_price,
+          r1.sum_charge + r2.sum_charge,
+          r1.avg_qty + r2.avg_qty,
+          r1.avg_price + r2.avg_price,
+          r1.avg_disc + r2.avg_disc,
+          r1.count_order + r2.count_order
+        )
+      ))
+
+      // Post-process line items aggregates
+      .forEach(_.map(result => Query1Utils.Result(
+        result.l_returnflag,
+        result.l_linestatus,
+        result.sum_qty,
+        result.sum_base_price,
+        result.sum_disc_price,
+        result.sum_charge,
+        result.avg_qty / result.count_order,
+        result.avg_price / result.count_order,
+        result.avg_disc / result.count_order,
+        result.count_order
+      )))
+
+      // Execute
+      .execute()
+
+
+  }
+}
diff --git a/wayang-benchmark/src/main/scala/org/apache/wayang/multicontext/apps/wordcount/WordCount.scala b/wayang-benchmark/src/main/scala/org/apache/wayang/multicontext/apps/wordcount/WordCount.scala
new file mode 100644
index 0000000..c87145a
--- /dev/null
+++ b/wayang-benchmark/src/main/scala/org/apache/wayang/multicontext/apps/wordcount/WordCount.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.wayang.multicontext.apps.wordcount
+
+import org.apache.wayang.api.{MultiContext, MultiContextPlanBuilder}
+import org.apache.wayang.multicontext.apps.loadConfig
+import org.apache.wayang.java.Java
+import org.apache.wayang.spark.Spark
+
+class WordCount {}
+
+object WordCount {
+
+  def main(args: Array[String]): Unit = {
+    println("WordCount")
+    println("Scala version:")
+    println(scala.util.Properties.versionString)
+
+    val (configuration1, configuration2) = loadConfig(args)
+
+    val context1 = new MultiContext(configuration1)
+      .withPlugin(Spark.basicPlugin())
+      .withTextFileSink("file:///tmp/out11")
+    val context2 = new MultiContext(configuration2)
+      .withPlugin(Spark.basicPlugin())
+      .withTextFileSink("file:///tmp/out12")
+
+    val multiContextPlanBuilder = new MultiContextPlanBuilder(List(context1, context2))
+      .withUdfJarsOf(this.getClass)
+
+    // Generate some test data
+    val inputValues = Array("Big data is big.", "Is data big data?")
+
+
+    // Build and execute a word count
+    multiContextPlanBuilder.forEach(_
+      .loadCollection(inputValues)
+      .flatMap(_.split("\\s+"))
+      .map(_.replaceAll("\\W+", "").toLowerCase)
+      .map((_, 1))
+      .reduceByKey(_._1, (a, b) => (a._1, a._2 + b._2))
+    ).execute()
+
+    /*.forEach(_.loadCollection(inputValues))
+    .forEach(_.flatMap(_.split("\\s+")))
+    .forEach(_.map(_.replaceAll("\\W+", "").toLowerCase))
+    .forEach(_.map((_, 1)))
+    .forEach(_.reduceByKey(_._1, (a, b) => (a._1, a._2 + b._2)))
+    .execute()*/
+
+  }
+
+}
+
diff --git a/wayang-benchmark/src/main/scala/org/apache/wayang/multicontext/apps/wordcount/WordCountCombineEach.scala b/wayang-benchmark/src/main/scala/org/apache/wayang/multicontext/apps/wordcount/WordCountCombineEach.scala
new file mode 100644
index 0000000..850ceba
--- /dev/null
+++ b/wayang-benchmark/src/main/scala/org/apache/wayang/multicontext/apps/wordcount/WordCountCombineEach.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.wayang.multicontext.apps.wordcount
+
+import org.apache.wayang.api.{MultiContext, DataQuanta, MultiContextPlanBuilder}
+import org.apache.wayang.java.Java
+import org.apache.wayang.multicontext.apps.loadConfig
+
+object WordCountCombineEach {
+
+  def main(args: Array[String]): Unit = {
+    println("WordCountCombineEach")
+    println("Scala version:")
+    println(scala.util.Properties.versionString)
+
+    val (configuration1, configuration2) = loadConfig(args)
+
+    val context1 = new MultiContext(configuration1)
+      .withPlugin(Java.basicPlugin())
+      .withTextFileSink("file:///tmp/out11")
+    val context2 = new MultiContext(configuration2)
+      .withPlugin(Java.basicPlugin())
+      .withTextFileSink("file:///tmp/out12")
+
+    val multiContextPlanBuilder = new MultiContextPlanBuilder(List(context1, context2))
+      .withUdfJarsOf(this.getClass)
+
+    // Generate some test data
+    val inputValues = Array("Big data is big.", "Is data big data?")
+
+    // Build and execute a word count
+    val dq1 = multiContextPlanBuilder
+      .forEach(_.loadCollection(inputValues))
+      .forEach(_.flatMap(_.split("\\s+")))
+      .forEach(_.map(_.replaceAll("\\W+", "").toLowerCase))
+      .forEach(_.map((_, 1)))
+      .forEach(_.reduceByKey(_._1, (a, b) => (a._1, a._2 + b._2)))
+
+    val dq2 = multiContextPlanBuilder
+      .forEach(_.loadCollection(inputValues))
+      .forEach(_.flatMap(_.split("\\s+")))
+      .forEach(_.map(_.replaceAll("\\W+", "").toLowerCase))
+      .forEach(_.map((_, 1)))
+      .forEach(_.reduceByKey(_._1, (a, _) => (a._1, 100)))
+
+    dq1.combineEach(dq2, (dq1: DataQuanta[(String, Int)], dq2: DataQuanta[(String, Int)]) => dq1.union(dq2))
+      .forEach(_.map(t => (t._1 + " wayang out", t._2)))
+      .execute()
+  }
+
+}
+
diff --git a/wayang-benchmark/src/main/scala/org/apache/wayang/multicontext/apps/wordcount/WordCountWithMerge.scala b/wayang-benchmark/src/main/scala/org/apache/wayang/multicontext/apps/wordcount/WordCountWithMerge.scala
new file mode 100644
index 0000000..a391e94
--- /dev/null
+++ b/wayang-benchmark/src/main/scala/org/apache/wayang/multicontext/apps/wordcount/WordCountWithMerge.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.wayang.multicontext.apps.wordcount
+
+import org.apache.wayang.api.{MultiContext, MultiContextPlanBuilder}
+import org.apache.wayang.core.api.{Configuration, WayangContext}
+import org.apache.wayang.java.Java
+import org.apache.wayang.multicontext.apps.loadConfig
+import org.apache.wayang.spark.Spark
+
+object WordCountWithMerge {
+
+  def main(args: Array[String]): Unit = {
+    println("WordCountWithMerge")
+    println("Scala version:")
+    println(scala.util.Properties.versionString)
+
+    val (configuration1, configuration2) = loadConfig(args)
+
+    val context1 = new MultiContext(configuration1)
+      .withPlugin(Java.basicPlugin())
+      .withMergeFileSink("file:///tmp/out11")   // The mergeContext will read the output of context 1 from here
+    val context2 = new MultiContext(configuration2)
+      .withPlugin(Java.basicPlugin())
+      .withMergeFileSink("file:///tmp/out12")   // The mergeContext will read the output of context 2 from here
+
+    val multiContextPlanBuilder = new MultiContextPlanBuilder(List(context1, context2))
+      .withUdfJarsOf(this.getClass)
+
+    // To be used after merging the previous two
+    val mergeContext = new WayangContext(new Configuration())
+      .withPlugin(Java.basicPlugin())
+
+    // Generate some test data
+    val inputValues1 = Array("Big data is big.", "Is data big data?")
+    val inputValues2 = Array("Big big data is big big.", "Is data big data big?")
+
+    // Build and execute a word count in 2 different contexts
+    multiContextPlanBuilder
+      .loadCollection(context1, inputValues1)
+      .loadCollection(context2, inputValues2)
+      .forEach(_.flatMap(_.split("\\s+")))
+      .forEach(_.map(_.replaceAll("\\W+", "").toLowerCase))
+      .forEach(_.map((_, 1)))
+      .forEach(_.reduceByKey(_._1, (a, b) => (a._1, a._2 + b._2)))
+
+      // Merge contexts with union operator
+      .mergeUnion(mergeContext)
+
+      // Continue processing merged DataQuanta
+      .filter(_._2 >= 3)
+      .reduceByKey(_._1, (t1, t2) => (t1._1, t1._2 + t2._2))
+
+      // Write out
+      // Writes:
+      //    (big,9)
+      //    (data,6)
+      .writeTextFile("file:///tmp/out1.merged", s => s.toString())
+
+  }
+
+}
diff --git a/wayang-benchmark/src/main/scala/org/apache/wayang/multicontext/apps/wordcount/WordCountWithTargetPlatforms.scala b/wayang-benchmark/src/main/scala/org/apache/wayang/multicontext/apps/wordcount/WordCountWithTargetPlatforms.scala
new file mode 100644
index 0000000..5e7d185
--- /dev/null
+++ b/wayang-benchmark/src/main/scala/org/apache/wayang/multicontext/apps/wordcount/WordCountWithTargetPlatforms.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.wayang.multicontext.apps.wordcount
+
+import org.apache.wayang.api.{MultiContext, MultiContextPlanBuilder}
+import org.apache.wayang.java.Java
+import org.apache.wayang.multicontext.apps.loadConfig
+import org.apache.wayang.spark.Spark
+
+object WordCountWithTargetPlatforms {
+
+  def main(args: Array[String]): Unit = {
+    println("WordCountWithTargetPlatforms")
+    println("Scala version:")
+    println(scala.util.Properties.versionString)
+
+    val (configuration1, configuration2) = loadConfig(args)
+
+    val context1 = new MultiContext(configuration1)
+      .withPlugin(Java.basicPlugin())
+      .withPlugin(Spark.basicPlugin())
+      .withTextFileSink("file:///tmp/out11")
+    val context2 = new MultiContext(configuration2)
+      .withPlugin(Java.basicPlugin())
+      .withPlugin(Spark.basicPlugin())
+      .withTextFileSink("file:///tmp/out12")
+
+     val multiContextPlanBuilder = new MultiContextPlanBuilder(List(context1, context2))
+      .withUdfJarsOf(this.getClass)
+
+    // Generate some test data
+    val inputValues1 = Array("Big data is big.", "Is data big data?")
+    val inputValues2 = Array("Big big data is big big.", "Is data big data big?")
+
+    multiContextPlanBuilder
+      .loadCollection(context1, inputValues1)
+      .loadCollection(context2, inputValues2)
+
+      .forEach(_.flatMap(_.split("\\s+")))
+      .withTargetPlatforms(context1, Spark.platform())
+      .withTargetPlatforms(context2, Java.platform())
+
+      .forEach(_.map(_.replaceAll("\\W+", "").toLowerCase))
+      .withTargetPlatforms(Java.platform())
+
+      .forEach(_.map((_, 1)))
+
+      .forEach(_.reduceByKey(_._1, (a, b) => (a._1, a._2 + b._2)))
+      .withTargetPlatforms(context1, Spark.platform())
+      .execute()
+  }
+}
\ No newline at end of file
diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/SampleOperator.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/SampleOperator.java
index 50db0d7..c5a9c14 100644
--- a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/SampleOperator.java
+++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/SampleOperator.java
@@ -20,6 +20,7 @@
 
 import org.apache.commons.lang3.Validate;
 import org.apache.wayang.core.api.Configuration;
+import org.apache.wayang.core.function.FunctionDescriptor;
 import org.apache.wayang.core.optimizer.OptimizationContext;
 import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator;
 import org.apache.wayang.core.optimizer.cardinality.FixedSizeCardinalityEstimator;
@@ -29,8 +30,6 @@
 import org.apache.logging.log4j.Logger;
 
 import java.util.Optional;
-import java.util.function.IntUnaryOperator;
-import java.util.function.LongUnaryOperator;
 
 /**
  * A random sample operator randomly selects its inputs from the input slot and pushes that element to the output slot.
@@ -78,12 +77,12 @@
     /**
      * This function determines the sample size by the number of iterations.
      */
-    protected IntUnaryOperator sampleSizeFunction;
+    protected FunctionDescriptor.SerializableIntUnaryOperator sampleSizeFunction;
 
     /**
      * This function optionally determines the seed by the number of iterations.
      */
-    protected LongUnaryOperator seedFunction;
+    protected FunctionDescriptor.SerializableLongUnaryOperator seedFunction;
 
     /**
      * Size of the dataset to be sampled or {@value #UNKNOWN_DATASET_SIZE} if a dataset size is not known.
@@ -108,7 +107,7 @@
      * @param sampleSizeFunction user-specified size of the sample in dependence of the current iteration number
      * @param type               {@link DataSetType} of the sampled dataset
      */
-    public SampleOperator(IntUnaryOperator sampleSizeFunction, DataSetType<Type> type) {
+    public SampleOperator(FunctionDescriptor.SerializableIntUnaryOperator sampleSizeFunction, DataSetType<Type> type) {
         this(sampleSizeFunction, type, Methods.ANY, iterationNumber -> randomSeed());
     }
 
@@ -122,21 +121,21 @@
     /**
      * Creates a new instance given the sample size and the method.
      */
-    public SampleOperator(IntUnaryOperator sampleSizeFunction, DataSetType<Type> type, Methods sampleMethod) {
+    public SampleOperator(FunctionDescriptor.SerializableIntUnaryOperator sampleSizeFunction, DataSetType<Type> type, Methods sampleMethod) {
         this(sampleSizeFunction, type, sampleMethod, iterationNumber -> randomSeed());
     }
 
     /**
      * Creates a new instance given a user-defined sample size.
      */
-    public SampleOperator(IntUnaryOperator sampleSizeFunction, DataSetType<Type> type, Methods sampleMethod, long seed) {
+    public SampleOperator(FunctionDescriptor.SerializableIntUnaryOperator sampleSizeFunction, DataSetType<Type> type, Methods sampleMethod, long seed) {
         this(sampleSizeFunction, type, sampleMethod, iterationNumber -> seed);
     }
 
     /**
      * Creates a new instance given user-defined sample size and seed methods.
      */
-    public SampleOperator(IntUnaryOperator sampleSizeFunction, DataSetType<Type> type, Methods sampleMethod, LongUnaryOperator seedFunction) {
+    public SampleOperator(FunctionDescriptor.SerializableIntUnaryOperator sampleSizeFunction, DataSetType<Type> type, Methods sampleMethod, FunctionDescriptor.SerializableLongUnaryOperator seedFunction) {
         super(type, type, true);
         this.sampleSizeFunction = sampleSizeFunction;
         this.sampleMethod = sampleMethod;
@@ -186,7 +185,7 @@
         this.sampleMethod = sampleMethod;
     }
 
-    public void setSeedFunction(LongUnaryOperator seedFunction) {
+    public void setSeedFunction(FunctionDescriptor.SerializableLongUnaryOperator seedFunction) {
         this.seedFunction = seedFunction;
     }
 
diff --git a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/function/FunctionDescriptor.java b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/function/FunctionDescriptor.java
index 18d13e0..c78d004 100644
--- a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/function/FunctionDescriptor.java
+++ b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/function/FunctionDescriptor.java
@@ -25,11 +25,7 @@
 
 import java.io.Serializable;
 import java.util.Optional;
-import java.util.function.BiFunction;
-import java.util.function.BinaryOperator;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.function.Predicate;
+import java.util.function.*;
 
 /**
  * A function operates on single data units or collections of those.
@@ -140,4 +136,29 @@
     public interface ExtendedSerializableConsumer<T> extends SerializableConsumer<T>, ExtendedFunction{
 
     }
+
+    @FunctionalInterface
+    public interface SerializableIntUnaryOperator extends IntUnaryOperator, Serializable {
+
+    }
+
+    @FunctionalInterface
+    public interface SerializableLongUnaryOperator extends LongUnaryOperator, Serializable {
+
+    }
+
+    @FunctionalInterface
+    public interface SerializableToLongBiFunction<T, U> extends ToLongBiFunction<T, U>, Serializable {
+
+    }
+
+    @FunctionalInterface
+    public interface SerializableToDoubleBiFunction<T, U> extends ToDoubleBiFunction<T, U>, Serializable {
+
+    }
+
+    @FunctionalInterface
+    public interface SerializableToLongFunction<T> extends ToLongFunction<T>, Serializable {
+
+    }
 }
diff --git a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/cardinality/DefaultCardinalityEstimator.java b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/cardinality/DefaultCardinalityEstimator.java
index 7582261..ebddcc1 100644
--- a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/cardinality/DefaultCardinalityEstimator.java
+++ b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/cardinality/DefaultCardinalityEstimator.java
@@ -19,11 +19,10 @@
 package org.apache.wayang.core.optimizer.cardinality;
 
 import org.apache.wayang.core.api.Configuration;
+import org.apache.wayang.core.function.FunctionDescriptor;
 import org.apache.wayang.core.optimizer.OptimizationContext;
 
 import java.util.Arrays;
-import java.util.function.ToLongBiFunction;
-import java.util.function.ToLongFunction;
 
 /**
  * Default implementation of the {@link CardinalityEstimator}. Generalizes a single-point estimation function.
@@ -34,7 +33,7 @@
 
     private final int numInputs;
 
-    private final ToLongBiFunction<long[], Configuration> singlePointEstimator;
+    private final FunctionDescriptor.SerializableToLongBiFunction<long[], Configuration> singlePointEstimator;
 
     /**
      * If {@code true}, receiving more than {@link #numInputs} is also fine.
@@ -44,7 +43,7 @@
     public DefaultCardinalityEstimator(double certaintyProb,
                                        int numInputs,
                                        boolean isAllowMoreInputs,
-                                       ToLongFunction<long[]> singlePointEstimator) {
+                                       FunctionDescriptor.SerializableToLongFunction<long[]> singlePointEstimator) {
         this(certaintyProb,
                 numInputs,
                 isAllowMoreInputs,
@@ -54,7 +53,7 @@
     public DefaultCardinalityEstimator(double certaintyProb,
                                        int numInputs,
                                        boolean isAllowMoreInputs,
-                                       ToLongBiFunction<long[], Configuration> singlePointEstimator) {
+                                       FunctionDescriptor.SerializableToLongBiFunction<long[], Configuration> singlePointEstimator) {
         this.certaintyProb = certaintyProb;
         this.numInputs = numInputs;
         this.singlePointEstimator = singlePointEstimator;
diff --git a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/costs/DefaultLoadEstimator.java b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/costs/DefaultLoadEstimator.java
index 775129e..9d9a4ea 100644
--- a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/costs/DefaultLoadEstimator.java
+++ b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/costs/DefaultLoadEstimator.java
@@ -19,6 +19,7 @@
 package org.apache.wayang.core.optimizer.costs;
 
 import org.apache.commons.lang3.Validate;
+import org.apache.wayang.core.function.FunctionDescriptor;
 import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimate;
 import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
 
@@ -41,7 +42,7 @@
     public DefaultLoadEstimator(int numInputs,
                                 int numOutputs,
                                 double correctnessProbability,
-                                ToLongBiFunction<long[], long[]> singlePointFunction) {
+                                FunctionDescriptor.SerializableToLongBiFunction<long[], long[]> singlePointFunction) {
         this(numInputs, numOutputs, correctnessProbability, null, singlePointFunction);
     }
 
@@ -49,7 +50,7 @@
                                 int numOutputs,
                                 double correctnessProbability,
                                 CardinalityEstimate nullCardinalityReplacement,
-                                ToLongBiFunction<long[], long[]> singlePointFunction) {
+                                FunctionDescriptor.SerializableToLongBiFunction<long[], long[]> singlePointFunction) {
         this(
                 numInputs, numOutputs, correctnessProbability, nullCardinalityReplacement,
                 (context, inputEstimates, outputEstimates) -> singlePointFunction.applyAsLong(inputEstimates, outputEstimates)
diff --git a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/costs/LoadEstimator.java b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/costs/LoadEstimator.java
index 0f07067..4892343 100644
--- a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/costs/LoadEstimator.java
+++ b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/costs/LoadEstimator.java
@@ -22,6 +22,7 @@
 import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimate;
 import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
 
+import java.io.Serializable;
 import java.util.Arrays;
 
 /**
@@ -36,7 +37,7 @@
      * Functional interface for lambda expressions to express single-point load estimation functions.
      */
     @FunctionalInterface
-    public interface SinglePointEstimationFunction {
+    public interface SinglePointEstimationFunction extends Serializable {
 
         /**
          * Estimate the load for the given artifact, input, and output estimates.
diff --git a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/costs/LoadProfileEstimators.java b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/costs/LoadProfileEstimators.java
index d4093b9..be42d6c 100644
--- a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/costs/LoadProfileEstimators.java
+++ b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/costs/LoadProfileEstimators.java
@@ -198,7 +198,7 @@
         );
 
         long overhead = spec.has("overhead") ? spec.getLong("overhead") : 0L;
-        ToDoubleBiFunction<long[], long[]> resourceUtilizationEstimator = spec.has("ru") ?
+        FunctionDescriptor.SerializableToDoubleBiFunction<long[], long[]> resourceUtilizationEstimator = spec.has("ru") ?
                 parseResourceUsageJuel(spec.getString("ru"), numInputs, numOutputs) :
                 DEFAULT_RESOURCE_UTILIZATION_ESTIMATOR;
         return new NestableLoadProfileEstimator(
@@ -273,7 +273,7 @@
         );
 
         long overhead = spec.has("overhead") ? spec.getLong("overhead") : 0L;
-        ToDoubleBiFunction<long[], long[]> resourceUtilizationEstimator = spec.has("ru") ?
+        FunctionDescriptor.SerializableToDoubleBiFunction<long[], long[]> resourceUtilizationEstimator = spec.has("ru") ?
                 compileResourceUsage(spec.getString("ru")) :
                 DEFAULT_RESOURCE_UTILIZATION_ESTIMATOR;
         return new NestableLoadProfileEstimator(
@@ -318,7 +318,7 @@
      * @param numOutputs the number of outputs of the estimated operator, reflected as JUEL variables {@code out0}, {@code out1}, ...
      * @return a {@link ToLongBiFunction} wrapping the JUEL expression
      */
-    private static ToDoubleBiFunction<long[], long[]> parseResourceUsageJuel(String juel, int numInputs, int numOutputs) {
+    private static FunctionDescriptor.SerializableToDoubleBiFunction<long[], long[]> parseResourceUsageJuel(String juel, int numInputs, int numOutputs) {
         final Map<String, Class<?>> parameterClasses = createJuelParameterClasses(numInputs, numOutputs);
         final JuelUtils.JuelFunction<Double> juelFunction = new JuelUtils.JuelFunction<>(juel, Double.class, parameterClasses);
         return (inCards, outCards) -> applyJuelFunction(juelFunction, null, inCards, outCards, Collections.emptyList());
@@ -391,7 +391,7 @@
      * @param expression a mathematical expression
      * @return a {@link ToLongBiFunction} wrapping the expression
      */
-    private static ToDoubleBiFunction<long[], long[]> compileResourceUsage(String expression) {
+    private static FunctionDescriptor.SerializableToDoubleBiFunction<long[], long[]> compileResourceUsage(String expression) {
         final Expression expr = ExpressionBuilder.parse(expression).specify(baseContext);
         return (inCards, outCards) -> {
             Context mathContext = createMathContext(null, inCards, outCards);
@@ -484,6 +484,6 @@
     }
 
 
-    private static final ToDoubleBiFunction<long[], long[]> DEFAULT_RESOURCE_UTILIZATION_ESTIMATOR = (in, out) -> 1d;
+    private static final FunctionDescriptor.SerializableToDoubleBiFunction<long[], long[]> DEFAULT_RESOURCE_UTILIZATION_ESTIMATOR = (in, out) -> 1d;
 
 }
diff --git a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/costs/NestableLoadProfileEstimator.java b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/costs/NestableLoadProfileEstimator.java
index 7bd64c2..4510c01 100644
--- a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/costs/NestableLoadProfileEstimator.java
+++ b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/costs/NestableLoadProfileEstimator.java
@@ -20,6 +20,7 @@
 
 import org.apache.wayang.core.api.Configuration;
 import org.apache.wayang.core.api.exception.WayangException;
+import org.apache.wayang.core.function.FunctionDescriptor;
 import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimate;
 
 import java.util.Collection;
@@ -40,7 +41,7 @@
     /**
      * The degree to which the load profile can utilize available resources.
      */
-    private final ToDoubleBiFunction<long[], long[]> resourceUtilizationEstimator;
+    private final FunctionDescriptor.SerializableToDoubleBiFunction<long[], long[]> resourceUtilizationEstimator;
 
     /**
      * Milliseconds overhead that this load profile incurs.
@@ -98,7 +99,7 @@
                                         LoadEstimator ramLoadEstimator,
                                         LoadEstimator diskLoadEstimator,
                                         LoadEstimator networkLoadEstimator,
-                                        ToDoubleBiFunction<long[], long[]> resourceUtilizationEstimator,
+                                        FunctionDescriptor.SerializableToDoubleBiFunction<long[], long[]> resourceUtilizationEstimator,
                                         long overheadMillis) {
         this(
                 cpuLoadEstimator, ramLoadEstimator, diskLoadEstimator, networkLoadEstimator,
@@ -121,7 +122,7 @@
                                         LoadEstimator ramLoadEstimator,
                                         LoadEstimator diskLoadEstimator,
                                         LoadEstimator networkLoadEstimator,
-                                        ToDoubleBiFunction<long[], long[]> resourceUtilizationEstimator,
+                                        FunctionDescriptor.SerializableToDoubleBiFunction<long[], long[]> resourceUtilizationEstimator,
                                         long overheadMillis,
                                         String configurationKey) {
         this.cpuLoadEstimator = cpuLoadEstimator;
diff --git a/wayang-commons/wayang-core/src/test/java/org/apache/wayang/core/optimizer/cardinality/DefaultCardinalityEstimatorTest.java b/wayang-commons/wayang-core/src/test/java/org/apache/wayang/core/optimizer/cardinality/DefaultCardinalityEstimatorTest.java
index 5ffc3ab..44584a3 100644
--- a/wayang-commons/wayang-core/src/test/java/org/apache/wayang/core/optimizer/cardinality/DefaultCardinalityEstimatorTest.java
+++ b/wayang-commons/wayang-core/src/test/java/org/apache/wayang/core/optimizer/cardinality/DefaultCardinalityEstimatorTest.java
@@ -18,12 +18,11 @@
 
 package org.apache.wayang.core.optimizer.cardinality;
 
+import org.apache.wayang.core.api.Configuration;
+import org.apache.wayang.core.function.FunctionDescriptor;
+import org.apache.wayang.core.optimizer.OptimizationContext;
 import org.junit.Assert;
 import org.junit.Test;
-import org.apache.wayang.core.api.Configuration;
-import org.apache.wayang.core.optimizer.OptimizationContext;
-
-import java.util.function.ToLongFunction;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -42,7 +41,7 @@
         CardinalityEstimate inputEstimate1 = new CardinalityEstimate(50, 60, 0.8);
         CardinalityEstimate inputEstimate2 = new CardinalityEstimate(10, 100, 0.4);
 
-        final ToLongFunction<long[]> singlePointEstimator =
+        final FunctionDescriptor.SerializableToLongFunction<long[]> singlePointEstimator =
                 inputEstimates -> (long) Math.ceil(0.8 * inputEstimates[0] * inputEstimates[1]);
 
         CardinalityEstimator estimator = new DefaultCardinalityEstimator(
diff --git a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkSampleOperator.java b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkSampleOperator.java
index 86b0613..fb4e50b 100644
--- a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkSampleOperator.java
+++ b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkSampleOperator.java
@@ -22,6 +22,7 @@
 import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.wayang.basic.operators.SampleOperator;
 import org.apache.wayang.core.api.exception.WayangException;
+import org.apache.wayang.core.function.FunctionDescriptor;
 import org.apache.wayang.core.optimizer.OptimizationContext;
 import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
 import org.apache.wayang.core.platform.ChannelDescriptor;
@@ -52,7 +53,7 @@
     /**
      * Creates a new instance.
      */
-    public FlinkSampleOperator(IntUnaryOperator sampleSizeFunction, DataSetType<Type> type, LongUnaryOperator seedFunction) {
+    public FlinkSampleOperator(FunctionDescriptor.SerializableIntUnaryOperator sampleSizeFunction, DataSetType<Type> type, FunctionDescriptor.SerializableLongUnaryOperator seedFunction) {
         super(sampleSizeFunction, type, Methods.RANDOM, seedFunction);
     }
 
diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaRandomSampleOperator.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaRandomSampleOperator.java
index 528f716..bf48176 100644
--- a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaRandomSampleOperator.java
+++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaRandomSampleOperator.java
@@ -19,6 +19,7 @@
 package org.apache.wayang.java.operators;
 
 import org.apache.wayang.basic.operators.SampleOperator;
+import org.apache.wayang.core.function.FunctionDescriptor;
 import org.apache.wayang.core.optimizer.OptimizationContext;
 import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
 import org.apache.wayang.core.platform.ChannelDescriptor;
@@ -55,7 +56,7 @@
      *
      * @param sampleSizeFunction udf-based size of sample
      */
-    public JavaRandomSampleOperator(IntUnaryOperator sampleSizeFunction, DataSetType<Type> type, LongUnaryOperator seedFunction) {
+    public JavaRandomSampleOperator(FunctionDescriptor.SerializableIntUnaryOperator sampleSizeFunction, DataSetType<Type> type, FunctionDescriptor.SerializableLongUnaryOperator seedFunction) {
         super(sampleSizeFunction, type, Methods.RANDOM, seedFunction);
     }
 
diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaReservoirSampleOperator.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaReservoirSampleOperator.java
index f6946e6..66730af 100644
--- a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaReservoirSampleOperator.java
+++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaReservoirSampleOperator.java
@@ -19,6 +19,7 @@
 package org.apache.wayang.java.operators;
 
 import org.apache.wayang.basic.operators.SampleOperator;
+import org.apache.wayang.core.function.FunctionDescriptor;
 import org.apache.wayang.core.optimizer.OptimizationContext;
 import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
 import org.apache.wayang.core.platform.ChannelDescriptor;
@@ -54,7 +55,7 @@
     /**
      * Creates a new instance.
      */
-    public JavaReservoirSampleOperator(IntUnaryOperator sampleSizeFunction, DataSetType<Type> type, LongUnaryOperator seed) {
+    public JavaReservoirSampleOperator(FunctionDescriptor.SerializableIntUnaryOperator sampleSizeFunction, DataSetType<Type> type, FunctionDescriptor.SerializableLongUnaryOperator seed) {
         super(sampleSizeFunction, type, Methods.RESERVOIR, seed);
     }
 
diff --git a/wayang-platforms/wayang-jdbc-template/pom.xml b/wayang-platforms/wayang-jdbc-template/pom.xml
index 2a64798..e2f655b 100644
--- a/wayang-platforms/wayang-jdbc-template/pom.xml
+++ b/wayang-platforms/wayang-jdbc-template/pom.xml
@@ -68,6 +68,11 @@
             <version>0.7.1</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.wayang</groupId>
+            <artifactId>wayang-spark</artifactId>
+            <version>0.7.1</version>
+        </dependency>
+        <dependency>
             <groupId>org.hsqldb</groupId>
             <artifactId>hsqldb</artifactId>
             <version>${hsqldb.version}</version>
diff --git a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/SqlToRddOperator.java b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/SqlToRddOperator.java
index 08672ca..48cff71 100644
--- a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/SqlToRddOperator.java
+++ b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/SqlToRddOperator.java
@@ -36,10 +36,7 @@
 import org.apache.wayang.spark.operators.SparkExecutionOperator;
 
 import java.sql.Connection;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
+import java.util.*;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
diff --git a/wayang-platforms/wayang-postgres/pom.xml b/wayang-platforms/wayang-postgres/pom.xml
index 5bc9ce3..ee75b36 100644
--- a/wayang-platforms/wayang-postgres/pom.xml
+++ b/wayang-platforms/wayang-postgres/pom.xml
@@ -55,6 +55,11 @@
             <artifactId>wayang-jdbc-template</artifactId>
             <version>0.7.1</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.wayang</groupId>
+            <artifactId>wayang-spark</artifactId>
+            <version>0.7.1</version>
+        </dependency>
     </dependencies>
 
 
diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkBernoulliSampleOperator.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkBernoulliSampleOperator.java
index be041ae..8ec074f 100644
--- a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkBernoulliSampleOperator.java
+++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkBernoulliSampleOperator.java
@@ -20,6 +20,7 @@
 
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.wayang.basic.operators.SampleOperator;
+import org.apache.wayang.core.function.FunctionDescriptor;
 import org.apache.wayang.core.optimizer.OptimizationContext;
 import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
 import org.apache.wayang.core.platform.ChannelDescriptor;
@@ -49,7 +50,7 @@
     /**
      * Creates a new instance.
      */
-    public SparkBernoulliSampleOperator(IntUnaryOperator sampleSizeFunction, DataSetType<Type> type, LongUnaryOperator seedFunction) {
+    public SparkBernoulliSampleOperator(FunctionDescriptor.SerializableIntUnaryOperator sampleSizeFunction, DataSetType<Type> type, FunctionDescriptor.SerializableLongUnaryOperator seedFunction) {
         super(sampleSizeFunction, type, Methods.BERNOULLI, seedFunction);
     }
 
diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkRandomPartitionSampleOperator.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkRandomPartitionSampleOperator.java
index 8333a17..25212be 100644
--- a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkRandomPartitionSampleOperator.java
+++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkRandomPartitionSampleOperator.java
@@ -22,6 +22,7 @@
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.wayang.basic.operators.SampleOperator;
 import org.apache.wayang.core.api.exception.WayangException;
+import org.apache.wayang.core.function.FunctionDescriptor;
 import org.apache.wayang.core.optimizer.OptimizationContext;
 import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
 import org.apache.wayang.core.platform.ChannelDescriptor;
@@ -70,7 +71,7 @@
     /**
      * Creates a new instance.
      */
-    public SparkRandomPartitionSampleOperator(IntUnaryOperator sampleSizeFunction, DataSetType<Type> type, LongUnaryOperator seedFunction) {
+    public SparkRandomPartitionSampleOperator(FunctionDescriptor.SerializableIntUnaryOperator sampleSizeFunction, DataSetType<Type> type, FunctionDescriptor.SerializableLongUnaryOperator seedFunction) {
         super(sampleSizeFunction, type, Methods.RANDOM, seedFunction);
     }
 
diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkShufflePartitionSampleOperator.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkShufflePartitionSampleOperator.java
index ca1c36b..3169da7 100644
--- a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkShufflePartitionSampleOperator.java
+++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkShufflePartitionSampleOperator.java
@@ -22,6 +22,7 @@
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.function.Function2;
 import org.apache.wayang.basic.operators.SampleOperator;
+import org.apache.wayang.core.function.FunctionDescriptor;
 import org.apache.wayang.core.optimizer.OptimizationContext;
 import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
 import org.apache.wayang.core.platform.ChannelDescriptor;
@@ -65,7 +66,7 @@
     /**
      * Creates a new instance.
      */
-    public SparkShufflePartitionSampleOperator(IntUnaryOperator sampleSizeFunction, DataSetType<Type> type, LongUnaryOperator seedFunction) {
+    public SparkShufflePartitionSampleOperator(FunctionDescriptor.SerializableIntUnaryOperator sampleSizeFunction, DataSetType<Type> type, FunctionDescriptor.SerializableLongUnaryOperator seedFunction) {
         super(sampleSizeFunction, type, Methods.SHUFFLE_PARTITION_FIRST, seedFunction);
     }