blob: 5ac808079b36c1856c5d8997357ccee9132e1d47 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples;
import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
/**
* An example that counts words in Shakespeare.
*
* <p>This class, {@link MinimalWordCount}, is the first in a series of four successively more
* detailed 'word count' examples. Here, for simplicity, we don't show any error-checking or
* argument processing, and focus on construction of the pipeline, which chains together the
* application of core transforms.
*
* <p>Next, see the {@link WordCount} pipeline, then the {@link DebuggingWordCount}, and finally the
* {@link WindowedWordCount} pipeline, for more detailed examples that introduce additional
* concepts.
*
* <p>Concepts:
*
* <pre>
* 1. Reading data from text files
* 2. Specifying 'inline' transforms
* 3. Counting items in a PCollection
* 4. Writing data to text files
* </pre>
*
* <p>No arguments are required to run this pipeline. It will be executed with the DirectRunner. You
* can see the results in the output files in your current working directory, with names like
* "wordcounts-00001-of-00005. When running on a distributed service, you would use an appropriate
* file service.
*/
public class MinimalWordCount {
public static void main(String[] args) {
// Create a PipelineOptions object. This object lets us set various execution
// options for our pipeline, such as the runner you wish to use. This example
// will run with the DirectRunner by default, based on the class path configured
// in its dependencies.
PipelineOptions options = PipelineOptionsFactory.create();
// Create the Pipeline object with the options we defined above.
Pipeline p = Pipeline.create(options);
// Apply the pipeline's transforms.
// Concept #1: Apply a root transform to the pipeline; in this case, TextIO.Read to read a set
// of input text files. TextIO.Read returns a PCollection where each element is one line from
// the input text (a set of Shakespeare's texts).
// This example reads a public data set consisting of the complete works of Shakespeare.
p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
// Concept #2: Apply a ParDo transform to our PCollection of text lines. This ParDo invokes a
// DoFn (defined in-line) on each element that tokenizes the text line into individual words.
// The ParDo returns a PCollection<String>, where each element is an individual word in
// Shakespeare's collected texts.
.apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
for (String word : c.element().split(ExampleUtils.TOKENIZER_PATTERN)) {
if (!word.isEmpty()) {
c.output(word);
}
}
}
}))
// Concept #3: Apply the Count transform to our PCollection of individual words. The Count
// transform returns a new PCollection of key/value pairs, where each key represents a unique
// word in the text. The associated value is the occurrence count for that word.
.apply(Count.<String>perElement())
// Apply a MapElements transform that formats our PCollection of word counts into a printable
// string, suitable for writing to an output file.
.apply("FormatResults", MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}))
// Concept #4: Apply a write transform, TextIO.Write, at the end of the pipeline.
// TextIO.Write writes the contents of a PCollection (in this case, our PCollection of
// formatted strings) to a series of text files.
//
// By default, it will write to a set of files with names like wordcount-00001-of-00005
.apply(TextIO.write().to("wordcounts"));
// Run the pipeline.
p.run().waitUntilFinish();
}
}