blob: a647c0af3c8b70b8bb9506589829b41edc18b2b5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
/**
* An example that counts words and includes Beam best practices.
*
* <p>For a detailed walkthrough of this example, see <a
* href="https://beam.apache.org/get-started/wordcount-example/">
* https://beam.apache.org/get-started/wordcount-example/ </a>
*
* <p>Basic concepts: Reading text files; counting a
* PCollection; writing to text files
*
* <p>New Concepts:
*
* <pre>
* 1. Executing a Pipeline both locally and using the selected runner
* 2. Using ParDo with static DoFns defined out-of-line
* 3. Building a composite transform
* 4. Defining your own pipeline options
* </pre>
*
* <p>To execute this example locally:
* <pre>{@code
* $ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
* -Dexec.args="--inputFile=pom.xml --output=counts --runner=SamzaRunner" -Psamza-runner
* }</pre>
*
* <p>To execute the example in distributed manner, use mvn to package it first:
* (remove .waitUntilFinish() in the code for yarn deployment)
* <pre>{@code
* $ mkdir -p deploy/examples
* $ mvn package && tar -xvf target/samza-beam-examples-0.1-dist.tar.gz -C deploy/examples/
* }</pre>
*
* <p>To execute this example in standalone with zookeeper:
* (split the input by 2)
* <pre>{@code
* $ deploy/examples/bin/run-beam-standalone.sh org.apache.beam.examples.WordCount \
* --configFilePath=$PWD/deploy/examples/config/standalone.properties \
* --inputFile=/Users/xiliu/opensource/samza-beam-examples/pom.xml --output=word-counts.txt \
* --maxSourceParallelism=2
* }</pre>
*
* <p>To execute this example in yarn:
* (split the input by 2)
* <pre>{@code
* $ deploy/examples/bin/run-beam-yarn.sh org.apache.beam.examples.WordCount \
* --configFilePath=$PWD/deploy/examples/config/yarn.properties \
* --inputFile=/Users/xiliu/opensource/samza-beam-examples/pom.xml \
* --output=/tmp/word-counts.txt --maxSourceParallelism=2
* }</pre>
*/
public class WordCount {
private static final String TOKENIZER_PATTERN = "[^\\p{L}]+";
/**
* Concept #2: You can make your pipeline assembly code less verbose by defining your DoFns
* statically out-of-line. This DoFn tokenizes lines of text into individual words; we pass it to
* a ParDo in the pipeline.
*/
static class ExtractWordsFn extends DoFn<String, String> {
private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");
private final Distribution lineLenDist =
Metrics.distribution(ExtractWordsFn.class, "lineLenDistro");
@ProcessElement
public void processElement(@Element String element, OutputReceiver<String> receiver) {
lineLenDist.update(element.length());
if (element.trim().isEmpty()) {
emptyLines.inc();
}
// Split the line into words.
String[] words = element.split(TOKENIZER_PATTERN, -1);
// Output each word encountered into the output PCollection.
for (String word : words) {
if (!word.isEmpty()) {
receiver.output(word);
}
}
}
}
/** A SimpleFunction that converts a Word and Count into a printable string. */
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}
/**
* A PTransform that converts a PCollection containing lines of text into a PCollection of
* formatted word counts.
*
* <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
* Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
* modular testing, and an improved monitoring experience.
*/
public static class CountWords
extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());
return wordCounts;
}
}
/**
* Options supported by {@link WordCount}.
*
* <p>Concept #4: Defining your own configuration options. Here, you can add your own arguments to
* be processed by the command-line parser, and specify default values for them. You can then
* access the options values in your pipeline code.
*
* <p>Inherits standard configuration options.
*/
public interface WordCountOptions extends PipelineOptions {
/**
* By default, this example reads from a public dataset containing the text of King Lear. Set
* this option to choose a different input file or glob.
*/
@Description("Path of the file to read from")
@Required
String getInputFile();
void setInputFile(String value);
/** Set this required option to specify where to write the output. */
@Description("Path of the file to write to")
@Required
String getOutput();
void setOutput(String value);
}
static void runWordCount(WordCountOptions options) {
Pipeline p = Pipeline.create(options);
// Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the
// static FormatAsTextFn() to the ParDo transform.
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply(new CountWords())
.apply(MapElements.via(new FormatAsTextFn()))
.apply("WriteCounts", TextIO.write().to(options.getOutput()).withoutSharding());
//For yarn, we don't need to wait after submitting the job,
//so there is no need for waitUntilFinish(). Please use
//p.run()
p.run().waitUntilFinish();
}
public static void main(String[] args) {
WordCountOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(WordCountOptions.class);
options.setJobName("word-count");
runWordCount(options);
}
}