blob: 2717dde09182dc8099e239420ceeb6f99fb60b08 [file] [log] [blame] [view]
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
This page contains examples to be executed using Wayang.
- [WordCount](#wordcount)
* [Java scala-like API](#java-scala-like-api)
* [Scala API](#scala-api)
- [k-means](#k-means)
* [Scala API](#scala-api-1)
## WordCount
The "Hello World!" of data processing systems is the wordcount.
### Java scala-like API
```java
import org.apache.wayang.api.JavaPlanBuilder;
import org.apache.wayang.basic.data.Tuple2;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator;
import org.apache.wayang.java.Java;
import org.apache.wayang.spark.Spark;
import java.util.Collection;
import java.util.Arrays;
public class WordcountJava {
public static void main(String[] args){
// Settings
String inputUrl = "file:/tmp.txt";
// Get a plan builder.
WayangContext wayangContext = new WayangContext(new Configuration())
.withPlugin(Java.basicPlugin())
.withPlugin(Spark.basicPlugin());
JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
.withJobName(String.format("WordCount (%s)", inputUrl))
.withUdfJarOf(WordcountJava.class);
// Start building the WayangPlan.
Collection<Tuple2<String, Integer>> wordcounts = planBuilder
// Read the text file.
.readTextFile(inputUrl).withName("Load file")
// Split each line by non-word characters.
.flatMap(line -> Arrays.asList(line.split("\\W+")))
.withSelectivity(10, 100, 0.9)
.withName("Split words")
// Filter empty tokens.
.filter(token -> !token.isEmpty())
.withSelectivity(0.99, 0.99, 0.99)
.withName("Filter empty words")
// Attach counter to each word.
.map(word -> new Tuple2<>(word.toLowerCase(), 1)).withName("To lower case, add counter")
// Sum up counters for every word.
.reduceByKey(
Tuple2::getField0,
(t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1())
)
.withCardinalityEstimator(new DefaultCardinalityEstimator(0.9, 1, false, in -> Math.round(0.01 * in[0])))
.withName("Add counters")
// Execute the plan and collect the results.
.collect();
System.out.println(wordcounts);
}
}
```
### Scala API
```scala
import org.apache.wayang.api._
import org.apache.wayang.core.api.{Configuration, WayangContext}
import org.apache.wayang.java.Java
import org.apache.wayang.spark.Spark
object WordcountScala {
def main(args: Array[String]) {
// Settings
val inputUrl = "file:/tmp.txt"
// Get a plan builder.
val wayangContext = new WayangContext(new Configuration)
.withPlugin(Java.basicPlugin)
.withPlugin(Spark.basicPlugin)
val planBuilder = new PlanBuilder(wayangContext)
.withJobName(s"WordCount ($inputUrl)")
.withUdfJarsOf(this.getClass)
val wordcounts = planBuilder
// Read the text file.
.readTextFile(inputUrl).withName("Load file")
// Split each line by non-word characters.
.flatMap(_.split("\\W+"), selectivity = 10).withName("Split words")
// Filter empty tokens.
.filter(_.nonEmpty, selectivity = 0.99).withName("Filter empty words")
// Attach counter to each word.
.map(word => (word.toLowerCase, 1)).withName("To lower case, add counter")
// Sum up counters for every word.
.reduceByKey(_._1, (c1, c2) => (c1._1, c1._2 + c2._2)).withName("Add counters")
.withCardinalityEstimator((in: Long) => math.round(in * 0.01))
// Execute the plan and collect the results.
.collect()
println(wordcounts)
}
}
```
## k-means
Wayang is also capable of iterative processing, which is, e.g., very important for machine learning algorithms, such as k-means.
### Scala API
```scala
import org.apache.wayang.api._
import org.apache.wayang.core.api.{Configuration, WayangContext}
import org.apache.wayang.core.function.FunctionDescriptor.ExtendedSerializableFunction
import org.apache.wayang.core.function.ExecutionContext
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators
import org.apache.wayang.java.Java
import org.apache.wayang.spark.Spark
import scala.util.Random
import scala.collection.JavaConversions._
object kmeans {
def main(args: Array[String]) {
// Settings
val inputUrl = "file:/kmeans.txt"
val k = 5
val iterations = 100
val configuration = new Configuration
// Get a plan builder.
val wayangContext = new WayangContext(new Configuration)
.withPlugin(Java.basicPlugin)
.withPlugin(Spark.basicPlugin)
val planBuilder = new PlanBuilder(wayangContext)
.withJobName(s"k-means ($inputUrl, k=$k, $iterations iterations)")
.withUdfJarsOf(this.getClass)
case class Point(x: Double, y: Double)
case class TaggedPoint(x: Double, y: Double, cluster: Int)
case class TaggedPointCounter(x: Double, y: Double, cluster: Int, count: Long) {
def add_points(that: TaggedPointCounter) = TaggedPointCounter(this.x + that.x, this.y + that.y, this.cluster, this.count + that.count)
def average = TaggedPointCounter(x / count, y / count, cluster, 0)
}
// Read and parse the input file(s).
val points = planBuilder
.readTextFile(inputUrl).withName("Read file")
.map { line =>
val fields = line.split(",")
Point(fields(0).toDouble, fields(1).toDouble)
}.withName("Create points")
// Create initial centroids.
val random = new Random
val initialCentroids = planBuilder
.loadCollection(for (i <- 1 to k) yield TaggedPointCounter(random.nextGaussian(), random.nextGaussian(), i, 0)).withName("Load random centroids")
// Declare UDF to select centroid for each data point.
class SelectNearestCentroid extends ExtendedSerializableFunction[Point, TaggedPointCounter] {
/** Keeps the broadcasted centroids. */
var centroids: Iterable[TaggedPointCounter] = _
override def open(executionCtx: ExecutionContext) = {
centroids = executionCtx.getBroadcast[TaggedPointCounter]("centroids")
}
override def apply(point: Point): TaggedPointCounter = {
var minDistance = Double.PositiveInfinity
var nearestCentroidId = -1
for (centroid <- centroids) {
val distance = Math.pow(Math.pow(point.x - centroid.x, 2) + Math.pow(point.y - centroid.y, 2), 0.5)
if (distance < minDistance) {
minDistance = distance
nearestCentroidId = centroid.cluster
}
}
new TaggedPointCounter(point.x, point.y, nearestCentroidId, 1)
}
}
// Do the k-means loop.
val finalCentroids = initialCentroids.repeat(iterations, { currentCentroids =>
points
.mapJava(new SelectNearestCentroid,
udfLoad = LoadProfileEstimators.createFromSpecification(
"my.udf.costfunction.key", configuration
))
.withBroadcast(currentCentroids, "centroids").withName("Find nearest centroid")
.reduceByKey(_.cluster, _.add_points(_)).withName("Add up points")
.withCardinalityEstimator(k)
.map(_.average).withName("Average points")
}).withName("Loop")
// Collect the results.
.collect()
println(finalCentroids)
}
}
```
## Collaborative Filtering
This code demonstrates the implementation of a Collaborative Filtering algorithm used in Recommendation Systems using Wayang.
```java
import org.apache.wayang.api.*;
import org.apache.wayang.basic.data.*;
import org.apache.wayang.core.api.*;
import org.apache.wayang.core.function.*;
import org.apache.wayang.core.util.*;
import org.apache.wayang.java.Java;
import org.apache.wayang.spark.Spark;
import org.apache.commons.math3.linear.*;
import java.util.*;
public class CollaborativeFiltering {
public static void main(String[] args) {
// Create a Wayang context
WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()).with(Spark.basicPlugin());
PlanBuilder planBuilder = new PlanBuilder(wayangContext);
// Load the data
List<Tuple3<String, String, Integer>> data = Arrays.asList(
new Tuple3<>("user1", "item1", 5),
new Tuple3<>("user1", "item2", 3),
new Tuple3<>("user2", "item1", 4),
new Tuple3<>("user2", "item3", 2),
new Tuple3<>("user3", "item2", 1),
new Tuple3<>("user3", "item3", 5)
);
// Define a function to normalize the ratings
TransformationDescriptor.SerializableFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Double>> normalizationFunction =
tuple -> new Tuple3<>(tuple.field0, tuple.field1, (double)tuple.field2 / 5);
// Define a function to calculate the cosine similarity between users
TransformationDescriptor.SerializableFunction<Tuple2<String, RealVector>, Tuple2<String, RealVector>> similarityFunction =
tuple -> {
// This is a placeholder. You would need to implement a real similarity calculation here.
// For example, you could calculate the cosine similarity like this:
double dotProduct = tuple.field1.dotProduct(otherUserVector);
double normProduct = tuple.field1.getNorm() * otherUserVector.getNorm();
double cosineSimilarity = dotProduct / normProduct;
return new Tuple2<>(tuple.field0, cosineSimilarity);
};
// Define a function to calculate the predicted rating for each user-item pair
TransformationDescriptor.SerializableFunction<Tuple3<String, String, Double>, Tuple3<String, String, Double>> predictionFunction =
tuple -> {
// This is a placeholder. You would need to implement a real prediction calculation here.
// For example, you could calculate the predicted rating based on the similarity matrix and the user's ratings like this:
double predictedRating = 0.0;
double similaritySum = 0.0;
for (String otherUser : similarityMatrix.keySet()) {
double similarity = similarityMatrix.get(otherUser);
double otherUserRating = userRatings.get(otherUser).get(tuple.field1);
predictedRating += similarity * otherUserRating;
similaritySum += Math.abs(similarity);
}
predictedRating /= similaritySum;
return new Tuple3<>(tuple.field0, tuple.field1, predictedRating);
};
// Define a function to handle cold start problems
TransformationDescriptor.SerializableFunction<Tuple3<String, String, Double>, Tuple3<String, String, Double>> coldStartFunction =
tuple -> {
if (tuple.field2 == null) {
// If the user has no ratings, recommend the most popular item
String mostPopularItem = itemPopularity.entrySet().stream()
.max(Map.Entry.comparingByValue())
.get()
.getKey();
return new Tuple3<>(tuple.field0, mostPopularItem, 5.0);
} else {
return tuple;
}
};
// Define a function to handle cold start problems
TransformationDescriptor.SerializableFunction<Tuple3<String, String, Double>, Tuple3<String, String, Double>> coldStartFunction =
tuple -> {
if (tuple.field2 == null) {
// If the user has no ratings, recommend the most popular item
return new Tuple3<>(tuple.field0, "item1", 1.0);
} else {
return tuple;
}
};
// Execute the plan
Collection<Tuple3<String, String, Double>> output = planBuilder
.loadCollection(data)
.map(normalizationFunction)
.map(similarityFunction)
.map(predictionFunction)
.map(recommendationFunction)
.map(coldStartFunction)
.collect();
// Print the recommendations
for (Tuple3<String, String, Double> recommendation : output) {
System.out.println("User: " + recommendation.field0 + ", Item: " + recommendation.field1 + ", Rating: " + recommendation.field2);
}
}
}
```