This page contains examples to be executed using Wayang.
The “Hello World!” of data processing systems is the wordcount.
import org.apache.wayang.api.JavaPlanBuilder; import org.apache.wayang.basic.data.Tuple2; import org.apache.wayang.core.api.Configuration; import org.apache.wayang.core.api.WayangContext; import org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator; import org.apache.wayang.java.Java; import org.apache.wayang.spark.Spark; import java.util.Collection; import java.util.Arrays; public class WordcountJava { public static void main(String[] args){ // Settings String inputUrl = "file:/tmp.txt"; // Get a plan builder. WayangContext wayangContext = new WayangContext(new Configuration()) .withPlugin(Java.basicPlugin()) .withPlugin(Spark.basicPlugin()); JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext) .withJobName(String.format("WordCount (%s)", inputUrl)) .withUdfJarOf(WordcountJava.class); // Start building the WayangPlan. Collection<Tuple2<String, Integer>> wordcounts = planBuilder // Read the text file. .readTextFile(inputUrl).withName("Load file") // Split each line by non-word characters. .flatMap(line -> Arrays.asList(line.split("\\W+"))) .withSelectivity(10, 100, 0.9) .withName("Split words") // Filter empty tokens. .filter(token -> !token.isEmpty()) .withSelectivity(0.99, 0.99, 0.99) .withName("Filter empty words") // Attach counter to each word. .map(word -> new Tuple2<>(word.toLowerCase(), 1)).withName("To lower case, add counter") // Sum up counters for every word. .reduceByKey( Tuple2::getField0, (t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1()) ) .withCardinalityEstimator(new DefaultCardinalityEstimator(0.9, 1, false, in -> Math.round(0.01 * in[0]))) .withName("Add counters") // Execute the plan and collect the results. .collect(); System.out.println(wordcounts); } }
import org.apache.wayang.api._ import org.apache.wayang.core.api.{Configuration, WayangContext} import org.apache.wayang.java.Java import org.apache.wayang.spark.Spark object WordcountScala { def main(args: Array[String]) { // Settings val inputUrl = "file:/tmp.txt" // Get a plan builder. val wayangContext = new WayangContext(new Configuration) .withPlugin(Java.basicPlugin) .withPlugin(Spark.basicPlugin) val planBuilder = new PlanBuilder(wayangContext) .withJobName(s"WordCount ($inputUrl)") .withUdfJarsOf(this.getClass) val wordcounts = planBuilder // Read the text file. .readTextFile(inputUrl).withName("Load file") // Split each line by non-word characters. .flatMap(_.split("\\W+"), selectivity = 10).withName("Split words") // Filter empty tokens. .filter(_.nonEmpty, selectivity = 0.99).withName("Filter empty words") // Attach counter to each word. .map(word => (word.toLowerCase, 1)).withName("To lower case, add counter") // Sum up counters for every word. .reduceByKey(_._1, (c1, c2) => (c1._1, c1._2 + c2._2)).withName("Add counters") .withCardinalityEstimator((in: Long) => math.round(in * 0.01)) // Execute the plan and collect the results. .collect() println(wordcounts) } }
Wayang is also capable of iterative processing, which is, e.g., very important for machine learning algorithms, such as k-means.
import org.apache.wayang.api._ import org.apache.wayang.core.api.{Configuration, WayangContext} import org.apache.wayang.core.function.FunctionDescriptor.ExtendedSerializableFunction import org.apache.wayang.core.function.ExecutionContext import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators import org.apache.wayang.java.Java import org.apache.wayang.spark.Spark import scala.util.Random import scala.collection.JavaConversions._ object kmeans { def main(args: Array[String]) { // Settings val inputUrl = "file:/kmeans.txt" val k = 5 val iterations = 100 val configuration = new Configuration // Get a plan builder. val wayangContext = new WayangContext(new Configuration) .withPlugin(Java.basicPlugin) .withPlugin(Spark.basicPlugin) val planBuilder = new PlanBuilder(wayangContext) .withJobName(s"k-means ($inputUrl, k=$k, $iterations iterations)") .withUdfJarsOf(this.getClass) case class Point(x: Double, y: Double) case class TaggedPoint(x: Double, y: Double, cluster: Int) case class TaggedPointCounter(x: Double, y: Double, cluster: Int, count: Long) { def add_points(that: TaggedPointCounter) = TaggedPointCounter(this.x + that.x, this.y + that.y, this.cluster, this.count + that.count) def average = TaggedPointCounter(x / count, y / count, cluster, 0) } // Read and parse the input file(s). val points = planBuilder .readTextFile(inputUrl).withName("Read file") .map { line => val fields = line.split(",") Point(fields(0).toDouble, fields(1).toDouble) }.withName("Create points") // Create initial centroids. val random = new Random val initialCentroids = planBuilder .loadCollection(for (i <- 1 to k) yield TaggedPointCounter(random.nextGaussian(), random.nextGaussian(), i, 0)).withName("Load random centroids") // Declare UDF to select centroid for each data point. class SelectNearestCentroid extends ExtendedSerializableFunction[Point, TaggedPointCounter] { /** Keeps the broadcasted centroids. */ var centroids: Iterable[TaggedPointCounter] = _ override def open(executionCtx: ExecutionContext) = { centroids = executionCtx.getBroadcast[TaggedPointCounter]("centroids") } override def apply(point: Point): TaggedPointCounter = { var minDistance = Double.PositiveInfinity var nearestCentroidId = -1 for (centroid <- centroids) { val distance = Math.pow(Math.pow(point.x - centroid.x, 2) + Math.pow(point.y - centroid.y, 2), 0.5) if (distance < minDistance) { minDistance = distance nearestCentroidId = centroid.cluster } } new TaggedPointCounter(point.x, point.y, nearestCentroidId, 1) } } // Do the k-means loop. val finalCentroids = initialCentroids.repeat(iterations, { currentCentroids => points .mapJava(new SelectNearestCentroid, udfLoad = LoadProfileEstimators.createFromSpecification( "my.udf.costfunction.key", configuration )) .withBroadcast(currentCentroids, "centroids").withName("Find nearest centroid") .reduceByKey(_.cluster, _.add_points(_)).withName("Add up points") .withCardinalityEstimator(k) .map(_.average).withName("Average points") }).withName("Loop") // Collect the results. .collect() println(finalCentroids) } }