blob: cce4b9bd0862583b95169e963cbb88dabe875e6d [file] [log] [blame]
package org.qcri.rheem.apps.wordcount
import org.qcri.rheem.api._
import org.qcri.rheem.apps.util.{ExperimentDescriptor, Parameters, ProfileDBHelper}
import org.qcri.rheem.core.api.{Configuration, RheemContext}
import org.qcri.rheem.core.optimizer.ProbabilisticDoubleInterval
import org.qcri.rheem.core.plugin.Plugin
import org.qcri.rheem.core.util.fs.FileSystems
* This is app counts words in a file.
* @see [[org.qcri.rheem.apps.wordcount.Main]]
class WordCountScala(plugin: Plugin*) {
* Run the word count over a given file.
* @param inputUrl URL to the file
* @param wordsPerLine optional estimate of how many words there are in each line
* @return the counted words
def apply(inputUrl: String,
wordsPerLine: ProbabilisticDoubleInterval = new ProbabilisticDoubleInterval(100, 10000, .8d))
(implicit configuration: Configuration, experiment: Experiment) = {
val rheemCtx = new RheemContext(configuration)
val planBuilder = new PlanBuilder(rheemCtx)
.withJobName(s"WordCount ($inputUrl)")
.readTextFile(inputUrl).withName("Load file")
.flatMap(_.split("\\W+"), selectivity = wordsPerLine).withName("Split words")
.filter(_.nonEmpty, selectivity = 0.99).withName("Filter empty words")
.map(word => (word.toLowerCase, 1)).withName("To lower case, add counter")
.reduceByKey(_._1, (c1, c2) => (c1._1, c1._2 + c2._2)).withName("Add counters")
.withCardinalityEstimator((in: Long) => math.round(in * 0.01))
* Companion object for [[WordCountScala]].
object WordCountScala extends ExperimentDescriptor {
override def version = "0.1.0"
def main(args: Array[String]) {
// Parse args.
if (args.isEmpty) {
println(s"Usage: <main class> ${Parameters.experimentHelp} <plugin(,plugin)*> <input file> [<words per line a..b[~confidence]>]")
implicit val configuration = new Configuration
implicit val experiment = Parameters.createExperiment(args(0), this)
val plugins = Parameters.loadPlugins(args(1))
experiment.getSubject.addConfiguration("plugins", args(1))
val inputFile = args(2)
experiment.getSubject.addConfiguration("input", inputFile)
val wordsPerLine = if (args.length >= 4) {
experiment.getSubject.addConfiguration("wordsPerLine", args(3))
} else null
// Run wordCount.
val wordCount = new WordCountScala(plugins: _*)
val words =
(if (wordsPerLine != null) {
wordCount(inputFile, wordsPerLine)
} else {
// Store experiment data.
val inputFileSize = FileSystems.getFileSize(inputFile)
if (inputFileSize.isPresent) experiment.getSubject.addConfiguration("inputSize", inputFileSize.getAsLong), configuration)
// Print results.
println(s"Found ${words.size} words:")
words.take(10).foreach(wc => println(s"${wc._2}x ${wc._1}"))
if (words.size > 10) print(s"${words.size - 10} more...")