blob: f1ace56c485c61fbddfc17aa27ddcbee5699c3ef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.apps.simwords
import org.apache.wayang.apps.util.{ExperimentDescriptor, Parameters, ProfileDBHelper}
import org.apache.wayang.api._
import org.apache.wayang.apps.util.ProfileDBHelper
import org.apache.wayang.commons.util.profiledb.model.Experiment
import org.apache.wayang.core.api.{Configuration, WayangContext}
import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators
import org.apache.wayang.core.plugin.Plugin
import org.apache.wayang.core.util.fs.FileSystems
/**
* This app clusters words by their word neighborhoods in a corpus.
* <p>Note the UDF load properties `wayang.apps.simwords.udfs.create-neighborhood.load` and `wayang.apps.simwords.udfs.select-centroid.load`.</p>
*/
class SimWords(plugins: Plugin*) {
def apply(inputFile: String,
minWordOccurrences: Int,
neighborhoodReach: Int,
numClusters: Int,
numIterations: Int,
wordsPerLine: ProbabilisticDoubleInterval)
(implicit experiment: Experiment,
configuration: Configuration) = {
// Initialize.
val wayangCtx = new WayangContext(configuration)
plugins.foreach(wayangCtx.register)
val planBuilder = new PlanBuilder(wayangCtx)
.withJobName(
jobName = s"SimWords ($inputFile, reach=$neighborhoodReach, clusters=$numClusters, $numIterations iterations)"
).withExperiment(experiment)
.withUdfJarsOf(this.getClass)
// Create the word dictionary
val _minWordOccurrences = minWordOccurrences
val wordIds = planBuilder
.readTextFile(inputFile).withName("Read corpus (1)")
.flatMapJava(new ScrubFunction, selectivity = wordsPerLine).withName("Split & scrub")
.map(word => (word, 1)).withName("Add word counter")
.reduceByKey(_._1, (wc1, wc2) => (wc1._1, wc1._2 + wc2._2)).withName("Sum word counters")
.withCardinalityEstimator((in: Long) => math.round(in * 0.01))
.filter(_._2 >= _minWordOccurrences, selectivity = 10d / (9d + minWordOccurrences))
.withName("Filter frequent words")
.map(_._1).withName("Strip word counter")
.zipWithId.withName("Zip with ID")
.map(t => (t.field1, t.field0.toInt)).withName("Convert ID attachment")
// Create the word neighborhood vectors.
val wordVectors = planBuilder
.readTextFile(inputFile).withName("Read corpus (2)")
.flatMapJava(
new CreateWordNeighborhoodFunction(neighborhoodReach, "wordIds"),
selectivity = wordsPerLine,
udfLoad = LoadProfileEstimators.createFromSpecification("wayang.apps.simwords.udfs.create-neighborhood.load", configuration)
)
.withBroadcast(wordIds, "wordIds")
.withName("Create word vectors")
.reduceByKey(_._1, (wv1, wv2) => (wv1._1, wv1._2 + wv2._2)).withName("Add word vectors")
.map { wv =>
wv._2.normalize(); wv
}.withName("Normalize word vectors")
// Generate initial centroids.
// val initialCentroids = wordVectors
// .customOperator[(Int, SparseVector)](
// new SampleOperator[(Int, SparseVector)](numClusters, dataSetType[(Int, SparseVector)], SampleOperator.Methods.RANDOM)
// ).withName("Sample centroids")
// .map(x => x).withName("Identity (wa1)")
val _numClusters = numClusters
val initialCentroids = wordIds
.map(_._2).withName("Strip words")
.group().withName("Group IDs")
.flatMap { ids =>
import scala.collection.JavaConversions._
val idArray = ids.toArray
for (i <- 0 to _numClusters) yield (i, SparseVector.createRandom(idArray, .99, _numClusters))
}.withName("Generate centroids")
// Run k-means on the vectors.
val finalCentroids = initialCentroids.repeat(numIterations, { centroids: DataQuanta[(Int, SparseVector)] =>
val newCentroids: DataQuanta[(Int, SparseVector)] = wordVectors
.mapJava(
new SelectNearestCentroidFunction("centroids"),
udfLoad = LoadProfileEstimators.createFromSpecification("wayang.apps.simwords.udfs.select-centroid.load", configuration)
)
.withBroadcast(centroids, "centroids")
.withName("Select nearest centroids")
.map(assignment => (assignment._3, assignment._2)).withName("Strip word ID")
.reduceByKey(_._1, (wv1: (Int, SparseVector), wv2: (Int, SparseVector)) => (wv1._1, wv1._2 + wv2._2))
.withName("Add up cluster words").withCardinalityEstimator((in: Long) => _numClusters.toLong)
.map { centroid: (Int, SparseVector) => centroid._2.normalize(); centroid }.withName("Normalize centroids")
newCentroids
}).withName("K-means iteration").map(x => x).withName("Identity (wa2)")
// Apply the centroids to the points and resolve the word IDs.
val clusters = wordVectors
.mapJava(new SelectNearestCentroidFunction("finalCentroids")).withBroadcast(finalCentroids, "finalCentroids").withName("Select nearest final centroids")
.map(assigment => (assigment._3, List(assigment._1))).withName("Discard word vectors")
.reduceByKey(_._1, (c1, c2) => (c1._1, c1._2 ++ c2._2)).withName("Create clusters")
.map(_._2).withName("Discard cluster IDs")
.mapJava(new ResolveClusterFunction("wordIds")).withBroadcast(wordIds, "wordIds").withName("Resolve word IDs")
clusters.collect()
}
}
object SimWords extends ExperimentDescriptor {
override def version = "0.1.0"
def main(args: Array[String]): Unit = {
if (args.isEmpty) {
println(s"Usage: <main class> ${Parameters.experimentHelp} <plugin(,plugin)*> <input file> <min word occurrences> <neighborhood reach> <#clusters> <#iterations> [<words per line (from..to)>]")
sys.exit(1)
}
implicit val configuration = new Configuration
implicit val experiment = Parameters.createExperiment(args(0), this)
val plugins = Parameters.loadPlugins(args(1))
experiment.getSubject.addConfiguration("plugins", args(1))
val inputFile = args(2)
experiment.getSubject.addConfiguration("input", args(2))
val minWordOccurrences = args(3).toInt
experiment.getSubject.addConfiguration("minWordOccurrences", args(3))
val neighborhoodRead = args(4).toInt
experiment.getSubject.addConfiguration("neighborhoodReach", args(4))
val numClusters = args(5).toInt
experiment.getSubject.addConfiguration("clusters", args(5))
val numIterations = args(6).toInt
experiment.getSubject.addConfiguration("iterations", args(6))
val wordsPerLine = if (args.length >= 8) {
experiment.getSubject.addConfiguration("wordsPerLine", args(7))
Parameters.parseAny(args(7)).asInstanceOf[ProbabilisticDoubleInterval]
} else new ProbabilisticDoubleInterval(100, 10000, 0.9)
val simWords = new SimWords(plugins: _*)
val result = simWords(inputFile, minWordOccurrences, neighborhoodRead, numClusters, numIterations, wordsPerLine)
// Store experiment data.
val inputFileSize = FileSystems.getFileSize(inputFile)
if (inputFileSize.isPresent) experiment.getSubject.addConfiguration("inputSize", inputFileSize.getAsLong)
ProfileDBHelper.store(experiment, configuration)
// Print the results.
result.filter(_.size > 1).toIndexedSeq.sortBy(_.size).reverse.foreach(println(_))
}
}