Wayang 8 (#89)

* [WAYANG-8][API-PYTHON] Creation of functions to be consumed by MapPartitionsDescriptor

* [WAYANG-8][API-PYTHON] Included PythonProcessCaller that manages the python process execution and Java - Python connection

* [WAYANG-8][API-PYTHON] POM fixes plus minor test

* [WAYANG-8][API-PYTHON] Python connection through TCP socket enabled

* [WAYANG-8][API-PYTHON] Writing from Java to Python. Not taking into care about Iterator Datatypes.

* [WAYANG-8][API-PYTHON] Java Socket Writter improvements

* [WAYANG-8][API-PYTHON] Python UTF8 Deserializer included

* [WAYANG-8][API-PYTHON] Python UTF8 Reading Stream

* [WAYANG-8][API-PYTHON] Getting results from Python and continue processing

* [WAYANG-8][API-PYTHON] Config files for pywayang

* [WAYANG-8][API-PYTHON] Structures to save the plan with functional fashion plus most basic operators

* [WAYANG-8][API-PYTHON] Main program to test plan executions locally

* [WAYANG-8][API-PYTHON] Minor comments and

* [WAYANG-8][API-PYTHON] Most basic test for protobuff communication with java

* [WAYANG-8][API-PYTHON] Addjacency list from PyWayang Plan

* [WAYANG-8][API-PYTHON] Graph traversal implementation with visitor pattern

* [WAYANG-8][API-PYTHON] Protobuf python message generator

* [WAYANG-8][API-PYTHON] Wayang Web Service project structure

* [WAYANG-8][API-PYTHON] Protobuf message generation fixes

* [WAYANG-8][API-PYTHON] Wayang Web Service executes most basic plans directly

* [WAYANG-8][API-PYTHON] Receiving Base64 passing to byte array and unpickling

* [WAYANG-8][API-PYTHON] Updated classes to process a single Serialized UDF

* [WAYANG-8][API-PYTHON] New test with single UDF

* [WAYANG-8][API-PYTHON] Protobuf command

* [WAYANG-8][API-PYTHON] Protobuf message template updated


* [WAYANG-8][API-PYTHON] License comments added

* [WAYANG-8][API-PYTHON] Correction on missing licenses

* [WAYANG-8][API-PYTHON] Serializable module creation

* [WAYANG-8][API-PYTHON] adding protoc to travis

* [WAYANG-8][API-PYTHON] protoc executable path correction

* [WAYANG-8][API-PYTHON] Commenting objc_class_prefix

* [WAYANG-8][API-PYTHON] Obtaining pipelines

* [WAYANG-8][API-PYTHON] Dataquanta writing message

* [WAYANG-8][API-PYTHON] Plan writer pipeline based adjustments

* [WAYANG-8][API-PYTHON] Operator Python executable indicator

* [WAYANG-8][API-PYTHON] Plan writer improved to use less sockets

* [WAYANG-8][API-PYTHON] New version of Wayang protobuf message

* [WAYANG-8][API-PYTHON] Wayang REST improved to allow multi pipelined executions

* [WAYANG-8][API-PYTHON] More test programs

* [WAYANG-8][API-PYTHON] Commentaries and logging for Graph module

* [WAYANG-8][API-PYTHON] Commentaries and logging for Orchestrator module

* [WAYANG-8][API-PYTHON] Commentaries and logging for Protobuf module

* [WAYANG-8][API-PYTHON] Fix usage of relative paths

* [WAYANG-8][API-PYTHON] Scripts to compile protobuf has been deleted. Now Maven executes them

* [WAYANG-8][API-PYTHON] Execution Log configuration

* [WAYANG-8][API-PYTHON] Fix - Python Map partition with single operator

* [WAYANG-8][API-PYTHON] Unitary Testing preparing the Wayang Plan

* [WAYANG-8][API-PYTHON] Plugin selection through Plan Descriptor

* [WAYANG-8][API-PYTHON] Unitary Testing preparing the Wayang Plan with Spark Execution

* [WAYANG-8][API-PYTHON] Pywayang sends protobuf message in API request as bytes using base64

* [WAYANG-8][API-PYTHON] New Operators Flatmap group by, reduce and Reduce By Key. Only Python Side.

* [WAYANG-8][API-PYTHON] Protobuf Wayang Plan message updated to allow more Complex Java-Python Operators

* [WAYANG-8][API-PYTHON] Adding TPC-H 1st Test

* [WAYANG-8][API-PYTHON] Last changes, not working

* [WAYANG-8] Fixing errors with dependencies

* [WAYANG-8] Fix to Pom versions problem

* [WAYANG-8] Protoc path updated

* [WAYANG-8] Correction in the pom.xml for flags

Signed-off-by: bertty <bertty@apache.org>

Co-authored-by: berttty <bertty@scalytics.io>
Co-authored-by: Bertty Contreras-Rojas <bertty@databloom.ai>
46 files changed
tree: 08c3a97c98185fcbca8f9bc650d7396451e97ea6
  1. .github/
  2. .mvn/
  3. bin/
  4. images/
  5. pywayang/
  6. src/
  7. tools/
  8. wayang-api/
  9. wayang-benchmark/
  10. wayang-commons/
  11. wayang-docs/
  12. wayang-platforms/
  13. wayang-plugins/
  14. wayang-profiler/
  15. wayang-resources/
  16. wayang-tests-integration/
  17. .asf.yaml
  18. .dlc.json
  19. .gitignore
  20. .gitmodules
  21. .licenserc.yaml
  22. .travis.yml
  23. build.md
  25. general-todos.md
  26. jenkins.pom
  27. Jenkinsfile
  29. mvnw
  30. mvnw.cmd
  31. NOTICE
  32. pom.xml
  33. README.md

Apache Wayang (incubating)

Travis branch Maven central License Last commit GitHub commit activity (branch) GitHub forks GitHub Repo stars


The first cross-platform data processing system

In contrast to traditional data processing systems that provide one dedicated execution engine, Apache Wayang (incubating) is a cross-platform data processing system: Users can specify any data processing application using one of Wayang's APIs and then Wayang will choose the data processing platform(s), e.g., Postgres or Apache Spark, that best fits the application. Finally, Wayang will perform the execution, thereby hiding the different platform-specific APIs and coordinating inter-platform communication.

Apache Wayang (incubating) aims at freeing data engineers and software developers from the burden of learning all different data processing systems, their APIs, strengths and weaknesses; the intricacies of coordinating and integrating different processing platforms; and the inflexibility when trying a fixed set of processing platforms. As of now, Wayang has built-in support for the following processing platforms:

How to use Wayang

Requirements. Apache Wayang (incubating) is built with Java 8 and Scala 2.11. However, to execute Wayang it is sufficient to have Java 8 installed. If you want to build Wayang yourself, you will also need to have installed Apache Hadoop (the version that you want). Please also consider that processing platforms employed by Wayang might have further requirements.

NOTE: In windows, you need to define the variable HADOOP_HOME with the winutils.exe, an not official option to obtain this repository, or you can generate your winutils.exe following the instructions in the repository. Also, you may need to install msvcr100.dll

NOTE: Currently Apache Wayang (incubating) is updating Java and Scala, consider that to be able to utilize Scala 2.12 you will need to install Java 11 in your enviroment

NOTE: Make sure that the JAVA_HOME environment variable is set correctly to either Java 8 or Java 11 as the prerequisite checker script currently supports up to Java 11 and checks the latest version of Java if you have higher version installed. In Linux, it is preferably to use the export JAVA_HOME method inside the project folder. It is also recommended running ‘./mvnw clean install’ before opening the project using IntelliJ.

Get Wayang. Wayang is available via Maven Central. To use it with Maven, for instance, include the following into your POM file:


Note the ***: Wayang ships with multiple modules that can be included in your app, depending on how you want to use it:

  • wayang-core: provides core data structures and the optimizer (required)
  • wayang-basic: provides common operators and data types for your apps (recommended)
  • wayang-api: provides an easy-to-use Scala and Java API to assemble Wayang plans (recommended)
  • wayang-java, wayang-spark, wayang-graphchi, wayang-sqlite3, wayang-postgres: adapters for the various supported processing platforms
  • wayang-profiler: provides functionality to learn operator and UDF cost functions from historical execution data

For the sake of version flexibility, you still have to include your Hadoop (hadoop-hdfs and hadoop-common) and Spark (spark-core and spark-graphx) version of choice.

In addition, you can obtain the most recent snapshot version of Wayang via Sonatype's snapshot repository. Just include

    <name>Apache Foundation Snapshot Repository</name>

If you need to rebuild Wayang, e.g., to use a different Scala version, you can simply do so via Maven:

  1. Adapt the version variables (e.g., spark.version) in the main pom.xml file.
  2. Build Wayang with the adapted versions.
    $ ./mvnw clean install

NOTE: If you receive an error about not finding MathExBaseVisitor, then the problem might be that you are trying to build from IntelliJ, without Maven. MathExBaseVisitor is generated code, and a Maven build should generate it automatically.

NOTE: In the current Maven setup, the version of scala is tied to the Java version, you can compile the profile scala-11 with Java 8 and profile scala-12 with Java 11.

NOTE: For compiling and testing the code it is required to have Hadoop installed on your machine.

NOTE: the standalone profile to fix Hadoop and Spark versions, so that Wayang apps do not explicitly need to declare the corresponding dependencies.

Also, note the distro profile, which assembles a binary Wayang distribution. To activate these profiles, you need to specify them when running maven, i.e.,

./mvnw clean install -P<profile name>

Configure Wayang. In order for Wayang to work properly, it is necessary to tell Wayang about the capacities of your processing platforms and how to reach them. While there is a default configuration that allows to test Wayang right away, we recommend to create a properties file to adapt the configuration where necessary. To have Wayang use that configuration transparently, just run you app via

$ java -Dwayang.configuration=url://to/my/wayang.properties ...

You can find the most relevant settings in the following:

  • General settings
    • wayang.core.log.enabled (= true): whether to log execution statistics to allow learning better cardinality and cost estimators for the optimizer
    • wayang.core.log.executions (= ~/.wayang/executions.json) where to log execution times of operator groups
    • wayang.core.log.cardinalities (= ~/.wayang/cardinalities.json) where to log cardinality measurements
    • wayang.core.optimizer.instrumentation (= org.apache.wayang.core.profiling.OutboundInstrumentationStrategy): where to measure cardinalities in Wayang plans; other options are org.apache.wayang.core.profiling.NoInstrumentationStrategy and org.apache.wayang.core.profiling.FullInstrumentationStrategy
    • wayang.core.optimizer.reoptimize (= false): whether to progressively optimize Wayang plans
    • wayang.basic.tempdir (= file:///tmp): where to store temporary files, in particular for inter-platform communication
  • Java Streams
    • wayang.java.cpu.mhz (= 2700): clock frequency of processor the JVM runs on in MHz
    • wayang.java.hdfs.ms-per-mb (= 2.7): average throughput from HDFS to JVM in ms/MB
  • Apache Spark
    • spark.master (= local): Spark master
      • various other Spark settings are supported, e.g., spark.executor.memory, spark.serializer, ...
    • wayang.spark.cpu.mhz (= 2700): clock frequency of processor the Spark workers run on in MHz
    • wayang.spark.hdfs.ms-per-mb (= 2.7): average throughput from HDFS to the Spark workers in ms/MB
    • wayang.spark.network.ms-per-mb (= 8.6): average network throughput of the Spark workers in ms/MB
    • wayang.spark.init.ms (= 4500): time it takes Spark to initialize in ms
  • GraphChi
    • wayang.graphchi.cpu.mhz (= 2700): clock frequency of processor GraphChi runs on in MHz
    • wayang.graphchi.cpu.cores (= 2): number of cores GraphChi runs on
    • wayang.graphchi.hdfs.ms-per-mb (= 2.7): average throughput from HDFS to GraphChi in ms/MB
  • SQLite
    • wayang.sqlite3.jdbc.url: JDBC URL to use SQLite
    • wayang.sqlite3.jdbc.user: optional user name
    • wayang.sqlite3.jdbc.password: optional password
    • wayang.sqlite3.cpu.mhz (= 2700): clock frequency of processor SQLite runs on in MHz
    • wayang.sqlite3.cpu.cores (= 2): number of cores SQLite runs on
  • PostgreSQL
    • wayang.postgres.jdbc.url: JDBC URL to use PostgreSQL
    • wayang.postgres.jdbc.user: optional user name
    • wayang.postgres.jdbc.password: optional password
    • wayang.postgres.cpu.mhz (= 2700): clock frequency of processor PostgreSQL runs on in MHz
    • wayang.postgres.cpu.cores (= 2): number of cores PostgreSQL runs on

Code with Wayang. The recommended way to specify your apps with Wayang is via its Scala or Java API from the wayang-api module. You can find examples below.

Learn cost functions. Wayang provides a utility to learn cost functions from historical execution data. Specifically, Wayang can learn configurations for load profile estimators (that estimate CPU load, disk load etc.) for both operators and UDFs, as long as the configuration provides a template for those estimators. As an example, the JavaMapOperator draws its load profile estimator configuration via the configuration key wayang.java.map.load. Now, it is possible to specify a load profile estimator template in the configuration under the key <original key>.template, e.g.:

wayang.java.map.load.template = {\
  "in":1, "out":1,\

This template specifies a load profile estimator that expects (at least) one input cardinality and one output cardinality. Further, it models a CPU load that is proportional to the input cardinality. However, more complex functions are possible. In particular, you can use

  • the variables in0, in1, ... and out0, out1, ... to incorporate the input and output cardinalities, respectively;
  • operator properties, such as numIterations for the PageRankOperator implementations;
  • the operators +, -, *, /, %, ^, and parantheses;
  • the functions min(x0, x1, ...)), max(x0, x1, ...), abs(x), log(x, base), ln(x), ld(x);
  • and the constants e and pi.

While Wayang specifies templates for all execution operators, you will need to specify that your UDFs are modelled by some configuration-based cost function (see the k-means example below) and create the according initial specification and template yourself. Once, you gathered execution data, you can run

java ... org.apache.wayang.profiler.ga.GeneticOptimizerApp [configuration URL [execution log]]

This app will try to find appropriate values for the question marks (?) in the load profile estimator templates to fit the gathered execution data and ready-made configuration entries for the load profile estimators. You can then copy them into your configuration.


The “Hello World!” of data processing systems is the wordcount.

Java API

import org.apache.wayang.api.JavaPlanBuilder;
import org.apache.wayang.basic.data.Tuple2;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator;
import org.apache.wayang.java.Java;
import org.apache.wayang.spark.Spark;
import java.util.Collection;
import java.util.Arrays;

public class WordcountJava {

    public static void main(String[] args){

        // Settings
        String inputUrl = "file:/tmp.txt";

        // Get a plan builder.
        WayangContext wayangContext = new WayangContext(new Configuration())
        JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
                .withJobName(String.format("WordCount (%s)", inputUrl))

        // Start building the WayangPlan.
        Collection<Tuple2<String, Integer>> wordcounts = planBuilder
                // Read the text file.
                .readTextFile(inputUrl).withName("Load file")

                // Split each line by non-word characters.
                .flatMap(line -> Arrays.asList(line.split("\\W+")))
                .withSelectivity(10, 100, 0.9)
                .withName("Split words")

                // Filter empty tokens.
                .filter(token -> !token.isEmpty())
                .withSelectivity(0.99, 0.99, 0.99)
                .withName("Filter empty words")

                // Attach counter to each word.
                .map(word -> new Tuple2<>(word.toLowerCase(), 1)).withName("To lower case, add counter")

                // Sum up counters for every word.
                        (t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1())
                .withCardinalityEstimator(new DefaultCardinalityEstimator(0.9, 1, false, in -> Math.round(0.01 * in[0])))
                .withName("Add counters")

                // Execute the plan and collect the results.


Scala API

import org.apache.wayang.api._
import org.apache.wayang.core.api.{Configuration, WayangContext}
import org.apache.wayang.java.Java
import org.apache.wayang.spark.Spark

object WordcountScala {
  def main(args: Array[String]) {

    // Settings
    val inputUrl = "file:/tmp.txt"

    // Get a plan builder.
    val wayangContext = new WayangContext(new Configuration)
    val planBuilder = new PlanBuilder(wayangContext)
      .withJobName(s"WordCount ($inputUrl)")

    val wordcounts = planBuilder
      // Read the text file.
      .readTextFile(inputUrl).withName("Load file")

      // Split each line by non-word characters.
      .flatMap(_.split("\\W+"), selectivity = 10).withName("Split words")

      // Filter empty tokens.
      .filter(_.nonEmpty, selectivity = 0.99).withName("Filter empty words")

      // Attach counter to each word.
      .map(word => (word.toLowerCase, 1)).withName("To lower case, add counter")

      // Sum up counters for every word.
      .reduceByKey(_._1, (c1, c2) => (c1._1, c1._2 + c2._2)).withName("Add counters")
      .withCardinalityEstimator((in: Long) => math.round(in * 0.01))

      // Execute the plan and collect the results.



Wayang is also capable of iterative processing, which is, e.g., very important for machine learning algorithms, such as k-means.

Scala API

import org.apache.wayang.api._
import org.apache.wayang.core.api.{Configuration, WayangContext}
import org.apache.wayang.core.function.FunctionDescriptor.ExtendedSerializableFunction
import org.apache.wayang.core.function.ExecutionContext
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators
import org.apache.wayang.java.Java
import org.apache.wayang.spark.Spark

import scala.util.Random
import scala.collection.JavaConversions._

object kmeans {
  def main(args: Array[String]) {

    // Settings
    val inputUrl = "file:/kmeans.txt"
    val k = 5
    val iterations = 100
    val configuration = new Configuration

    // Get a plan builder.
    val wayangContext = new WayangContext(new Configuration)
    val planBuilder = new PlanBuilder(wayangContext)
      .withJobName(s"k-means ($inputUrl, k=$k, $iterations iterations)")

    case class Point(x: Double, y: Double)
    case class TaggedPoint(x: Double, y: Double, cluster: Int)
    case class TaggedPointCounter(x: Double, y: Double, cluster: Int, count: Long) {
      def add_points(that: TaggedPointCounter) = TaggedPointCounter(this.x + that.x, this.y + that.y, this.cluster, this.count + that.count)
      def average = TaggedPointCounter(x / count, y / count, cluster, 0)

    // Read and parse the input file(s).
    val points = planBuilder
      .readTextFile(inputUrl).withName("Read file")
      .map { line =>
        val fields = line.split(",")
        Point(fields(0).toDouble, fields(1).toDouble)
      }.withName("Create points")

    // Create initial centroids.
    val random = new Random
    val initialCentroids = planBuilder
      .loadCollection(for (i <- 1 to k) yield TaggedPointCounter(random.nextGaussian(), random.nextGaussian(), i, 0)).withName("Load random centroids")

    // Declare UDF to select centroid for each data point.
    class SelectNearestCentroid extends ExtendedSerializableFunction[Point, TaggedPointCounter] {

      /** Keeps the broadcasted centroids. */
      var centroids: Iterable[TaggedPointCounter] = _

      override def open(executionCtx: ExecutionContext) = {
        centroids = executionCtx.getBroadcast[TaggedPointCounter]("centroids")

      override def apply(point: Point): TaggedPointCounter = {
        var minDistance = Double.PositiveInfinity
        var nearestCentroidId = -1
        for (centroid <- centroids) {
          val distance = Math.pow(Math.pow(point.x - centroid.x, 2) + Math.pow(point.y - centroid.y, 2), 0.5)
          if (distance < minDistance) {
            minDistance = distance
            nearestCentroidId = centroid.cluster
        new TaggedPointCounter(point.x, point.y, nearestCentroidId, 1)

    // Do the k-means loop.
    val finalCentroids = initialCentroids.repeat(iterations, { currentCentroids =>
        .mapJava(new SelectNearestCentroid,
          udfLoad = LoadProfileEstimators.createFromSpecification(
            "my.udf.costfunction.key", configuration
        .withBroadcast(currentCentroids, "centroids").withName("Find nearest centroid")
        .reduceByKey(_.cluster, _.add_points(_)).withName("Add up points")
        .map(_.average).withName("Average points")

      // Collect the results.



All files in this repository are licensed under the Apache Software License 2.0

Copyright 2020 - 2021 The Apache Software Foundation.

Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at


Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.