blob: fd215a43e6df247ea17a794109381ad760b0f3d1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples.kotlin
import org.apache.beam.examples.kotlin.common.ExampleUtils
import org.apache.beam.sdk.Pipeline
import org.apache.beam.sdk.io.TextIO
import org.apache.beam.sdk.metrics.Metrics
import org.apache.beam.sdk.options.Default
import org.apache.beam.sdk.options.Description
import org.apache.beam.sdk.options.PipelineOptions
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.apache.beam.sdk.options.Validation.Required
import org.apache.beam.sdk.transforms.*
import org.apache.beam.sdk.values.KV
import org.apache.beam.sdk.values.PCollection
import org.apache.beam.sdk.values.PDone
/**
* An example that counts words in Shakespeare and includes Beam best practices.
*
*
* This class, [WordCount], is the second in a series of four successively more detailed
* 'word count' examples. You may first want to take a look at [MinimalWordCount]. After
* you've looked at this example, then see the [DebuggingWordCount] pipeline, for introduction
* of additional concepts.
*
*
* For a detailed walkthrough of this example, see [
* https://beam.apache.org/get-started/wordcount-example/ ](https://beam.apache.org/get-started/wordcount-example/)
*
*
* Basic concepts, also in the MinimalWordCount example: Reading text files; counting a
* PCollection; writing to text files
*
*
* New Concepts:
*
* <pre>
* 1. Executing a Pipeline both locally and using the selected runner
* 2. Using ParDo with static DoFns defined out-of-line
* 3. Building a composite transform
* 4. Defining your own pipeline options
</pre> *
*
*
* Concept #1: you can execute this pipeline either locally or using by selecting another runner.
* These are now command-line options and not hard-coded as they were in the MinimalWordCount
* example.
*
*
* To change the runner, specify:
*
* <pre>`--runner=YOUR_SELECTED_RUNNER
`</pre> *
*
*
* To execute this pipeline, specify a local output file (if using the `DirectRunner`) or
* output prefix on a supported distributed file system.
*
* <pre>`--output=[YOUR_LOCAL_FILE | YOUR_OUTPUT_PREFIX]
`</pre> *
*
*
* The input file defaults to a public data set containing the text of of King Lear, by William
* Shakespeare. You can override it and choose your own input with `--inputFile`.
*/
public object WordCount {
/**
* Concept #2: You can make your pipeline assembly code less verbose by defining your DoFns
* statically out-of-line. This DoFn tokenizes lines of text into individual words; we pass it to
* a ParDo in the pipeline.
*/
public class ExtractWordsFn : DoFn<String, String>() {
private val emptyLines = Metrics.counter(ExtractWordsFn::class.java, "emptyLines")
private val lineLenDist = Metrics.distribution(ExtractWordsFn::class.java, "lineLenDistro")
@ProcessElement
fun processElement(@Element element: String, receiver: DoFn.OutputReceiver<String>) {
lineLenDist.update(element.length.toLong())
if (element.trim(' ').isEmpty()) {
emptyLines.inc()
}
// Split the line into words.
val words = element.split(ExampleUtils.TOKENIZER_PATTERN.toRegex()).toTypedArray()
// Output each word encountered into the output PCollection.
for (word in words) {
if (word.isNotEmpty()) {
receiver.output(word)
}
}
}
}
/** A SimpleFunction that converts a Word and Count into a printable string. */
public class FormatAsTextFn : SimpleFunction<KV<String, Long>, String>() {
override fun apply(input: KV<String, Long>) = "${input.key} : ${input.value}"
}
/**
* A PTransform that converts a PCollection containing lines of text into a PCollection of
* formatted word counts.
*
*
* Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
* Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
* modular testing, and an improved monitoring experience.
*/
public class CountWords : PTransform<PCollection<String>, PCollection<KV<String, Long>>>() {
override fun expand(lines: PCollection<String>): PCollection<KV<String, Long>> {
// Convert lines of text into individual words.
val words = lines.apply(ParDo.of(ExtractWordsFn()))
// Count the number of times each word occurs.
val wordCounts = words.apply(Count.perElement())
return wordCounts
}
}
/**
* Options supported by [WordCount].
*
*
* Concept #4: Defining your own configuration options. Here, you can add your own arguments to
* be processed by the command-line parser, and specify default values for them. You can then
* access the options values in your pipeline code.
*
*
* Inherits standard configuration options.
*/
public interface WordCountOptions : PipelineOptions {
/**
* By default, this example reads from a public dataset containing the text of King Lear. Set
* this option to choose a different input file or glob.
*/
@get:Description("Path of the file to read from")
@get:Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
var inputFile: String
/** Set this required option to specify where to write the output. */
@get:Description("Path of the file to write to")
@get:Required
var output: String
}
@JvmStatic
fun runWordCount(options: WordCountOptions) {
val p = Pipeline.create(options)
// Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the
// static FormatAsTextFn() to the ParDo transform.
p.apply("ReadLines", TextIO.read().from(options.inputFile))
.apply(CountWords())
.apply(MapElements.via(FormatAsTextFn()))
.apply<PDone>("WriteCounts", TextIO.write().to(options.output))
p.run().waitUntilFinish()
}
@JvmStatic
fun main(args: Array<String>) {
val options = (PipelineOptionsFactory.fromArgs(*args).withValidation() as WordCountOptions)
runWordCount(options)
}
}