This page contains examples to be executed using Wayang.
The “Hello World!” of data processing systems is the wordcount.
import org.apache.wayang.api.JavaPlanBuilder; import org.apache.wayang.basic.data.Tuple2; import org.apache.wayang.core.api.Configuration; import org.apache.wayang.core.api.WayangContext; import org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator; import org.apache.wayang.java.Java; import org.apache.wayang.spark.Spark; import java.util.Collection; import java.util.Arrays; public class WordcountJava { public static void main(String[] args){ // Settings String inputUrl = "file:/tmp.txt"; // Get a plan builder. WayangContext wayangContext = new WayangContext(new Configuration()) .withPlugin(Java.basicPlugin()) .withPlugin(Spark.basicPlugin()); JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext) .withJobName(String.format("WordCount (%s)", inputUrl)) .withUdfJarOf(WordcountJava.class); // Start building the WayangPlan. Collection<Tuple2<String, Integer>> wordcounts = planBuilder // Read the text file. .readTextFile(inputUrl).withName("Load file") // Split each line by non-word characters. .flatMap(line -> Arrays.asList(line.split("\\W+"))) .withSelectivity(10, 100, 0.9) .withName("Split words") // Filter empty tokens. .filter(token -> !token.isEmpty()) .withSelectivity(0.99, 0.99, 0.99) .withName("Filter empty words") // Attach counter to each word. .map(word -> new Tuple2<>(word.toLowerCase(), 1)).withName("To lower case, add counter") // Sum up counters for every word. .reduceByKey( Tuple2::getField0, (t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1()) ) .withCardinalityEstimator(new DefaultCardinalityEstimator(0.9, 1, false, in -> Math.round(0.01 * in[0]))) .withName("Add counters") // Execute the plan and collect the results. .collect(); System.out.println(wordcounts); } }
import org.apache.wayang.api._ import org.apache.wayang.core.api.{Configuration, WayangContext} import org.apache.wayang.java.Java import org.apache.wayang.spark.Spark object WordcountScala { def main(args: Array[String]) { // Settings val inputUrl = "file:/tmp.txt" // Get a plan builder. val wayangContext = new WayangContext(new Configuration) .withPlugin(Java.basicPlugin) .withPlugin(Spark.basicPlugin) val planBuilder = new PlanBuilder(wayangContext) .withJobName(s"WordCount ($inputUrl)") .withUdfJarsOf(this.getClass) val wordcounts = planBuilder // Read the text file. .readTextFile(inputUrl).withName("Load file") // Split each line by non-word characters. .flatMap(_.split("\\W+"), selectivity = 10).withName("Split words") // Filter empty tokens. .filter(_.nonEmpty, selectivity = 0.99).withName("Filter empty words") // Attach counter to each word. .map(word => (word.toLowerCase, 1)).withName("To lower case, add counter") // Sum up counters for every word. .reduceByKey(_._1, (c1, c2) => (c1._1, c1._2 + c2._2)).withName("Add counters") .withCardinalityEstimator((in: Long) => math.round(in * 0.01)) // Execute the plan and collect the results. .collect() println(wordcounts) } }
Wayang is also capable of iterative processing, which is, e.g., very important for machine learning algorithms, such as k-means.
import org.apache.wayang.api._ import org.apache.wayang.core.api.{Configuration, WayangContext} import org.apache.wayang.core.function.FunctionDescriptor.ExtendedSerializableFunction import org.apache.wayang.core.function.ExecutionContext import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators import org.apache.wayang.java.Java import org.apache.wayang.spark.Spark import scala.util.Random import scala.collection.JavaConversions._ object kmeans { def main(args: Array[String]) { // Settings val inputUrl = "file:/kmeans.txt" val k = 5 val iterations = 100 val configuration = new Configuration // Get a plan builder. val wayangContext = new WayangContext(new Configuration) .withPlugin(Java.basicPlugin) .withPlugin(Spark.basicPlugin) val planBuilder = new PlanBuilder(wayangContext) .withJobName(s"k-means ($inputUrl, k=$k, $iterations iterations)") .withUdfJarsOf(this.getClass) case class Point(x: Double, y: Double) case class TaggedPoint(x: Double, y: Double, cluster: Int) case class TaggedPointCounter(x: Double, y: Double, cluster: Int, count: Long) { def add_points(that: TaggedPointCounter) = TaggedPointCounter(this.x + that.x, this.y + that.y, this.cluster, this.count + that.count) def average = TaggedPointCounter(x / count, y / count, cluster, 0) } // Read and parse the input file(s). val points = planBuilder .readTextFile(inputUrl).withName("Read file") .map { line => val fields = line.split(",") Point(fields(0).toDouble, fields(1).toDouble) }.withName("Create points") // Create initial centroids. val random = new Random val initialCentroids = planBuilder .loadCollection(for (i <- 1 to k) yield TaggedPointCounter(random.nextGaussian(), random.nextGaussian(), i, 0)).withName("Load random centroids") // Declare UDF to select centroid for each data point. class SelectNearestCentroid extends ExtendedSerializableFunction[Point, TaggedPointCounter] { /** Keeps the broadcasted centroids. */ var centroids: Iterable[TaggedPointCounter] = _ override def open(executionCtx: ExecutionContext) = { centroids = executionCtx.getBroadcast[TaggedPointCounter]("centroids") } override def apply(point: Point): TaggedPointCounter = { var minDistance = Double.PositiveInfinity var nearestCentroidId = -1 for (centroid <- centroids) { val distance = Math.pow(Math.pow(point.x - centroid.x, 2) + Math.pow(point.y - centroid.y, 2), 0.5) if (distance < minDistance) { minDistance = distance nearestCentroidId = centroid.cluster } } new TaggedPointCounter(point.x, point.y, nearestCentroidId, 1) } } // Do the k-means loop. val finalCentroids = initialCentroids.repeat(iterations, { currentCentroids => points .mapJava(new SelectNearestCentroid, udfLoad = LoadProfileEstimators.createFromSpecification( "my.udf.costfunction.key", configuration )) .withBroadcast(currentCentroids, "centroids").withName("Find nearest centroid") .reduceByKey(_.cluster, _.add_points(_)).withName("Add up points") .withCardinalityEstimator(k) .map(_.average).withName("Average points") }).withName("Loop") // Collect the results. .collect() println(finalCentroids) } }
This code demonstrates the implementation of a Collaborative Filtering algorithm used in Recommendation Systems using Wayang.
import org.apache.wayang.api.*; import org.apache.wayang.basic.data.*; import org.apache.wayang.core.api.*; import org.apache.wayang.core.function.*; import org.apache.wayang.core.util.*; import org.apache.wayang.java.Java; import org.apache.wayang.spark.Spark; import org.apache.commons.math3.linear.*; import java.util.*; public class CollaborativeFiltering { public static void main(String[] args) { // Create a Wayang context WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()).with(Spark.basicPlugin()); PlanBuilder planBuilder = new PlanBuilder(wayangContext); // Load the data List<Tuple3<String, String, Integer>> data = Arrays.asList( new Tuple3<>("user1", "item1", 5), new Tuple3<>("user1", "item2", 3), new Tuple3<>("user2", "item1", 4), new Tuple3<>("user2", "item3", 2), new Tuple3<>("user3", "item2", 1), new Tuple3<>("user3", "item3", 5) ); // Define a function to normalize the ratings TransformationDescriptor.SerializableFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Double>> normalizationFunction = tuple -> new Tuple3<>(tuple.field0, tuple.field1, (double)tuple.field2 / 5); // Define a function to calculate the cosine similarity between users TransformationDescriptor.SerializableFunction<Tuple2<String, RealVector>, Tuple2<String, RealVector>> similarityFunction = tuple -> { // This is a placeholder. You would need to implement a real similarity calculation here. // For example, you could calculate the cosine similarity like this: double dotProduct = tuple.field1.dotProduct(otherUserVector); double normProduct = tuple.field1.getNorm() * otherUserVector.getNorm(); double cosineSimilarity = dotProduct / normProduct; return new Tuple2<>(tuple.field0, cosineSimilarity); }; // Define a function to calculate the predicted rating for each user-item pair TransformationDescriptor.SerializableFunction<Tuple3<String, String, Double>, Tuple3<String, String, Double>> predictionFunction = tuple -> { // This is a placeholder. You would need to implement a real prediction calculation here. // For example, you could calculate the predicted rating based on the similarity matrix and the user's ratings like this: double predictedRating = 0.0; double similaritySum = 0.0; for (String otherUser : similarityMatrix.keySet()) { double similarity = similarityMatrix.get(otherUser); double otherUserRating = userRatings.get(otherUser).get(tuple.field1); predictedRating += similarity * otherUserRating; similaritySum += Math.abs(similarity); } predictedRating /= similaritySum; return new Tuple3<>(tuple.field0, tuple.field1, predictedRating); }; // Define a function to handle cold start problems TransformationDescriptor.SerializableFunction<Tuple3<String, String, Double>, Tuple3<String, String, Double>> coldStartFunction = tuple -> { if (tuple.field2 == null) { // If the user has no ratings, recommend the most popular item String mostPopularItem = itemPopularity.entrySet().stream() .max(Map.Entry.comparingByValue()) .get() .getKey(); return new Tuple3<>(tuple.field0, mostPopularItem, 5.0); } else { return tuple; } }; // Define a function to handle cold start problems TransformationDescriptor.SerializableFunction<Tuple3<String, String, Double>, Tuple3<String, String, Double>> coldStartFunction = tuple -> { if (tuple.field2 == null) { // If the user has no ratings, recommend the most popular item return new Tuple3<>(tuple.field0, "item1", 1.0); } else { return tuple; } }; // Execute the plan Collection<Tuple3<String, String, Double>> output = planBuilder .loadCollection(data) .map(normalizationFunction) .map(similarityFunction) .map(predictionFunction) .map(recommendationFunction) .map(coldStartFunction) .collect(); // Print the recommendations for (Tuple3<String, String, Double> recommendation : output) { System.out.println("User: " + recommendation.field0 + ", Item: " + recommendation.field1 + ", Rating: " + recommendation.field2); } } }