blob: e91226c247cac68df87ef4ffba21f6152eb20064 [file] [view]
---
title: "Beam WordCount Examples"
aliases: /use/wordcount-example/
---
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
# Apache Beam WordCount Examples
{{< toc >}}
{{< language-switcher java py go >}}
The WordCount examples demonstrate how to set up a processing pipeline that can
read text, tokenize the text lines into individual words, and perform a
frequency count on each of those words. The Beam SDKs contain a series of these
four successively more detailed WordCount examples that build on each other. The
input text for all the examples is a set of Shakespeare's texts.
Each WordCount example introduces different concepts in the Beam programming
model. Begin by understanding MinimalWordCount, the simplest of the examples.
Once you feel comfortable with the basic principles in building a pipeline,
continue on to learn more concepts in the other examples.
* **MinimalWordCount** demonstrates the basic principles involved in building a
pipeline.
* **WordCount** introduces some of the more common best practices in creating
re-usable and maintainable pipelines.
* **DebuggingWordCount** introduces logging and debugging practices.
* **WindowedWordCount** demonstrates how you can use Beam's programming model
to handle both bounded and unbounded datasets.
## MinimalWordCount example
MinimalWordCount demonstrates a simple pipeline that uses the Direct Runner to
read from a text file, apply transforms to tokenize and count the words, and
write the data to an output text file.
{{< paragraph class="language-java language-go" >}}
This example hard-codes the locations for its input and output files and doesn't
perform any error checking; it is intended to only show you the "bare bones" of
creating a Beam pipeline. This lack of parameterization makes this particular
pipeline less portable across different runners than standard Beam pipelines. In
later examples, we will parameterize the pipeline's input and output sources and
show other best practices.
{{< /paragraph >}}
{{< highlight java >}}
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.MinimalWordCount
{{< /highlight >}}
{{< highlight py >}}
python -m apache_beam.examples.wordcount_minimal --input YOUR_INPUT_FILE --output counts
{{< /highlight >}}
{{< highlight go >}}
$ go install github.com/apache/beam/sdks/v2/go/examples/minimal_wordcount
$ minimal_wordcount
{{< /highlight >}}
{{< paragraph class="language-java" >}}
To view the full code in Java, see
**[MinimalWordCount](https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java).**
{{< /paragraph >}}
{{< paragraph class="language-py" >}}
To view the full code in Python, see
**[wordcount_minimal.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount_minimal.py).**
{{< /paragraph >}}
{{< paragraph class="language-go" >}}
To view the full code in Go, see
**[minimal_wordcount.go](https://github.com/apache/beam/blob/master/sdks/go/examples/minimal_wordcount/minimal_wordcount.go).**
{{< /paragraph >}}
**Key Concepts:**
* Creating the Pipeline
* Applying transforms to the Pipeline
* Reading input (in this example: reading text files)
* Applying ParDo transforms
* Applying SDK-provided transforms (in this example: Count)
* Writing output (in this example: writing to a text file)
* Running the Pipeline
The following sections explain these concepts in detail, using the relevant code
excerpts from the MinimalWordCount pipeline.
### Creating the pipeline
{{< paragraph class="language-java language-py" >}}
In this example, the code first creates a `PipelineOptions` object. This object
lets us set various options for our pipeline, such as the pipeline runner that
will execute our pipeline and any runner-specific configuration required by the
chosen runner. In this example we set these options programmatically, but more
often, command-line arguments are used to set `PipelineOptions`.
{{< /paragraph >}}
{{< paragraph class="language-java language-py" >}}
You can specify a runner for executing your pipeline, such as the
`DataflowRunner` or `SparkRunner`. If you omit specifying a runner, as in this
example, your pipeline executes locally using the `DirectRunner`. In the next
sections, we will specify the pipeline's runner.
{{< /paragraph >}}
{{< highlight java >}}
// Create a PipelineOptions object. This object lets us set various execution
// options for our pipeline, such as the runner you wish to use. This example
// will run with the DirectRunner by default, based on the class path configured
// in its dependencies.
PipelineOptions options = PipelineOptionsFactory.create();
{{< /highlight >}}
{{< highlight py >}}
{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_examples_wordcount_minimal.py" examples_wordcount_minimal_options >}}
{{< /highlight >}}
{{< paragraph class="language-java language-py" >}}
The next step is to create a `Pipeline` object with the options we've just
constructed. The Pipeline object builds up the graph of transformations to be
executed, associated with that particular pipeline.
{{< /paragraph >}}
{{< paragraph class="language-go" >}}
The first step is to create a `Pipeline` object. It builds up the graph of
transformations to be executed, associated with that particular pipeline.
The scope allows grouping into composite transforms.
{{< /paragraph >}}
{{< highlight java >}}
Pipeline p = Pipeline.create(options);
{{< /highlight >}}
{{< highlight py >}}
{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_examples_wordcount_minimal.py" examples_wordcount_minimal_create >}}
{{< /highlight >}}
{{< highlight go >}}
p := beam.NewPipeline()
s := p.Root()
{{< /highlight >}}
### Applying pipeline transforms
The MinimalWordCount pipeline contains several transforms to read data into the
pipeline, manipulate or otherwise transform the data, and write out the results.
Transforms can consist of an individual operation, or can contain multiple
nested transforms (which is a [composite transform](/documentation/programming-guide#composite-transforms)).
Each transform takes some kind of input data and produces some output data. The
input and output data is often represented by the SDK class `PCollection`.
`PCollection` is a special class, provided by the Beam SDK, that you can use to
represent a dataset of virtually any size, including unbounded datasets.
<img src="/images/wordcount-pipeline.svg" width="800px" alt="The MinimalWordCount pipeline data flow.">
*Figure 1: The MinimalWordCount pipeline data flow.*
The MinimalWordCount pipeline contains five transforms:
1. A text file `Read` transform is applied to the `Pipeline` object itself, and
produces a `PCollection` as output. Each element in the output `PCollection`
represents one line of text from the input file. This example uses input
data stored in a publicly accessible Google Cloud Storage bucket ("gs://").
{{< highlight java >}}
p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
{{< /highlight >}}
{{< highlight py >}}
{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_examples_wordcount_minimal.py" examples_wordcount_minimal_read >}}
{{< /highlight >}}
{{< highlight go >}}
lines := textio.Read(s, "gs://apache-beam-samples/shakespeare/*")
{{< /highlight >}}
2. This transform splits the lines in `PCollection<String>`, where each element
is an individual word in Shakespeare's collected texts.
As an alternative, it would have been possible to use a
[ParDo](/documentation/programming-guide/#pardo)
transform that invokes a `DoFn` (defined in-line as an anonymous class) on
each element that tokenizes the text lines into individual words. The input
for this transform is the `PCollection` of text lines generated by the
previous `TextIO.Read` transform. The `ParDo` transform outputs a new
`PCollection`, where each element represents an individual word in the text.
{{< highlight java >}}
.apply("ExtractWords", FlatMapElements
.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(line.split("[^\\p{L}]+"))))
{{< /highlight >}}
{{< highlight py >}}
# The Flatmap transform is a simplified version of ParDo.
{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_examples_wordcount_minimal.py" examples_wordcount_minimal_pardo >}}
{{< /highlight >}}
{{< highlight go >}}
words := beam.ParDo(s, func(line string, emit func(string)) {
for _, word := range wordRE.FindAllString(line, -1) {
emit(word)
}
}, lines)
{{< /highlight >}}
3. The SDK-provided `Count` transform is a generic transform that takes a
`PCollection` of any type, and returns a `PCollection` of key/value pairs.
Each key represents a unique element from the input collection, and each
value represents the number of times that key appeared in the input
collection.
In this pipeline, the input for `Count` is the `PCollection` of individual
words generated by the previous `ParDo`, and the output is a `PCollection`
of key/value pairs where each key represents a unique word in the text and
the associated value is the occurrence count for each.
{{< highlight java >}}
.apply(Count.<String>perElement())
{{< /highlight >}}
{{< highlight py >}}
{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_examples_wordcount_minimal.py" examples_wordcount_minimal_count >}}
{{< /highlight >}}
{{< highlight go >}}
counted := stats.Count(s, words)
{{< /highlight >}}
4. The next transform formats each of the key/value pairs of unique words and
occurrence counts into a printable string suitable for writing to an output
file.
The map transform is a higher-level composite transform that encapsulates a
simple `ParDo`. For each element in the input `PCollection`, the map
transform applies a function that produces exactly one output element.
{{< highlight java >}}
.apply("FormatResults", MapElements
.into(TypeDescriptors.strings())
.via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
{{< /highlight >}}
{{< highlight py >}}
{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_examples_wordcount_minimal.py" examples_wordcount_minimal_map >}}
{{< /highlight >}}
{{< highlight go >}}
formatted := beam.ParDo(s, func(w string, c int) string {
return fmt.Sprintf("%s: %v", w, c)
}, counted)
{{< /highlight >}}
5. A text file write transform. This transform takes the final `PCollection` of
formatted Strings as input and writes each element to an output text file.
Each element in the input `PCollection` represents one line of text in the
resulting output file.
{{< highlight java >}}
.apply(TextIO.write().to("wordcounts"));
{{< /highlight >}}
{{< highlight py >}}
{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_examples_wordcount_minimal.py" examples_wordcount_minimal_write >}}
{{< /highlight >}}
{{< highlight go >}}
textio.Write(s, "wordcounts.txt", formatted)
{{< /highlight >}}
{{< paragraph class="language-java language-py" >}}
Note that the `Write` transform produces a trivial result value of type `PDone`,
which in this case is ignored.
{{< /paragraph >}}
{{< paragraph class="language-go" >}}
Note that the `Write` transform returns no PCollections.
{{< /paragraph >}}
### Running the pipeline
{{< paragraph class="language-java language-py" >}}
Run the pipeline by calling the `run` method, which sends your pipeline to be
executed by the pipeline runner that you specified in your `PipelineOptions`.
{{< /paragraph >}}
{{< paragraph class="language-go" >}}
Run the pipeline by passing it to a runner.
{{< /paragraph >}}
{{< highlight java >}}
p.run().waitUntilFinish();
{{< /highlight >}}
{{< highlight py >}}
with beam.Pipeline(...) as p:
[construction]
# p.run() automatically called
{{< /highlight >}}
{{< highlight go >}}
direct.Execute(context.Background(), p)
{{< /highlight >}}
{{< paragraph class="language-java language-py" >}}
Note that the `run` method is asynchronous. For a blocking execution, call the
<span class="language-java">`waitUntilFinish`</span>
<span class="language-py">`wait_until_finish`</span> method on the result object
returned by the call to `run`.
{{< /paragraph >}}
### Try the full example in Playground
{{< playground height="700px" >}}
{{< playground_snippet language="java" path="SDK_JAVA_MinimalWordCount" >}}
{{< playground_snippet language="py" path="SDK_PYTHON_WordCountMinimal" >}}
{{< playground_snippet language="go" path="SDK_GO_MinimalWordCount" >}}
{{< /playground >}}
## WordCount example
This WordCount example introduces a few recommended programming practices that
can make your pipeline easier to read, write, and maintain. While not explicitly
required, they can make your pipeline's execution more flexible, aid in testing
your pipeline, and help make your pipeline's code reusable.
This section assumes that you have a good understanding of the basic concepts in
building a pipeline. If you feel that you aren't at that point yet, read the
above section, [MinimalWordCount](#minimalwordcount-example).
**To run this example in Java:**
Set up your development environment and generate the Maven archetype as
described in the [Java WordCount quickstart](/get-started/quickstart-java/).
Then run the pipeline with one of the runners:
{{< runner direct >}}
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--inputFile=pom.xml --output=counts" -Pdirect-runner
{{< /runner >}}
{{< runner flink >}}
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--runner=FlinkRunner --inputFile=pom.xml --output=counts" -Pflink-runner
{{< /runner >}}
{{< runner flinkCluster >}}
$ mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=target/word-count-beam-bundled-0.1.jar \
--inputFile=/path/to/quickstart/pom.xml --output=/tmp/counts" -Pflink-runner
You can monitor the running job by visiting the Flink dashboard at http://<flink master>:8081
{{< /runner >}}
{{< runner spark >}}
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--runner=SparkRunner --inputFile=pom.xml --output=counts" -Pspark-runner
{{< /runner >}}
{{< runner dataflow >}}
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--runner=DataflowRunner --gcpTempLocation=gs://YOUR_GCS_BUCKET/tmp \
--project=YOUR_PROJECT --region=GCE_REGION \
--inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://YOUR_GCS_BUCKET/counts" \
-Pdataflow-runner
{{< /runner >}}
{{< runner samza >}}
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--inputFile=pom.xml --output=counts --runner=SamzaRunner" -Psamza-runner
{{< /runner >}}
{{< runner nemo >}}
$ mvn package -Pnemo-runner && java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.WordCount \
--runner=NemoRunner --inputFile=`pwd`/pom.xml --output=counts
{{< /runner >}}
{{< runner jet >}}
$ mvn package -P jet-runner && java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.WordCount \
--runner=JetRunner --jetLocalMode=3 --inputFile=`pwd`/pom.xml --output=counts
{{< /runner >}}
To view the full code in Java, see
**[WordCount](https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/WordCount.java).**
**To run this example in Python:**
{{< runner direct >}}
python -m apache_beam.examples.wordcount --input YOUR_INPUT_FILE --output counts
{{< /runner >}}
{{< runner flink >}}
python -m apache_beam.examples.wordcount --input /path/to/inputfile \
--output /path/to/write/counts \
--runner FlinkRunner
{{< /runner >}}
{{< runner flinkCluster >}}
# Running Beam Python on a distributed Flink cluster requires additional configuration.
# See /documentation/runners/flink/ for more information.
{{< /runner >}}
{{< runner spark >}}
python -m apache_beam.examples.wordcount --input /path/to/inputfile \
--output /path/to/write/counts \
--runner SparkRunner
{{< /runner >}}
{{< runner dataflow >}}
# As part of the initial setup, install Google Cloud Platform specific extra components.
pip install apache-beam[gcp]
python -m apache_beam.examples.wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://YOUR_GCS_BUCKET/counts \
--runner DataflowRunner \
--project YOUR_GCP_PROJECT \
--region YOUR_GCP_REGION \
--temp_location gs://YOUR_GCS_BUCKET/tmp/
{{< /runner >}}
{{< runner samza >}}
This runner is not yet available for the Python SDK.
{{< /runner >}}
{{< runner nemo >}}
This runner is not yet available for the Python SDK.
{{< /runner >}}
{{< runner jet >}}
This runner is not yet available for the Python SDK.
{{< /runner >}}
To view the full code in Python, see
**[wordcount.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount.py).**
**To run this example in Go:**
{{< runner direct >}}
$ go install github.com/apache/beam/sdks/v2/go/examples/wordcount
$ wordcount --input <PATH_TO_INPUT_FILE> --output counts
{{< /runner >}}
{{< runner flink >}}
This runner is not yet available for the Go SDK.
{{< /runner >}}
{{< runner flinkCluster >}}
This runner is not yet available for the Go SDK.
{{< /runner >}}
{{< runner spark >}}
This runner is not yet available for the Go SDK.
{{< /runner >}}
{{< runner dataflow >}}
$ go install github.com/apache/beam/sdks/v2/go/examples/wordcount
# As part of the initial setup, for non linux users - install package unix before run
$ go get -u golang.org/x/sys/unix
$ wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://<your-gcs-bucket>/counts \
--runner dataflow \
--project your-gcp-project \
--region your-gcp-region \
--temp_location gs://<your-gcs-bucket>/tmp/ \
--staging_location gs://<your-gcs-bucket>/binaries/ \
--worker_harness_container_image=apache/beam_go_sdk:latest
{{< /runner >}}
{{< runner samza >}}
This runner is not yet available for the Go SDK.
{{< /runner >}}
{{< runner nemo >}}
This runner is not yet available for the Go SDK.
{{< /runner >}}
{{< runner jet >}}
This runner is not yet available for the Go SDK.
{{< /runner >}}
To view the full code in Go, see
**[wordcount.go](https://github.com/apache/beam/blob/master/sdks/go/examples/wordcount/wordcount.go).**
**New Concepts:**
* Applying `ParDo` with an explicit `DoFn`
* Creating Composite Transforms
* Using Parameterizable `PipelineOptions`
The following sections explain these key concepts in detail, and break down the
pipeline code into smaller sections.
### Specifying explicit DoFns
{{< paragraph class="language-java language-py" >}}
When using `ParDo` transforms, you need to specify the processing operation that
gets applied to each element in the input `PCollection`. This processing
operation is a subclass of the SDK class `DoFn`. You can create the `DoFn`
subclasses for each `ParDo` inline, as an anonymous inner class instance, as is
done in the previous example (MinimalWordCount). However, it's often a good
idea to define the `DoFn` at the global level, which makes it easier to unit
test and can make the `ParDo` code more readable.
{{< /paragraph >}}
{{< paragraph class="language-go" >}}
When using `ParDo` transforms, you need to specify the processing operation that
gets applied to each element in the input `PCollection`. This processing
operation is either a named function or a struct with specially-named methods. You
can use anonymous functions (but not closures). However, it's often a good
idea to define the `DoFn` at the global level, which makes it easier to unit
test and can make the `ParDo` code more readable.
{{< /paragraph >}}
{{< highlight java >}}
// In this example, ExtractWordsFn is a DoFn that is defined as a static class:
static class ExtractWordsFn extends DoFn<String, String> {
...
@ProcessElement
public void processElement(ProcessContext c) {
...
}
}
{{< /highlight >}}
{{< highlight py >}}
# In this example, the DoFns are defined as classes:
{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_examples_wordcount_wordcount.py" examples_wordcount_wordcount_dofn >}}
{{< /highlight >}}
{{< highlight go >}}
// In this example, extractFn is a DoFn that is defined as a function:
func extractFn(ctx context.Context, line string, emit func(string)) {
...
}
{{< /highlight >}}
### Creating composite transforms
{{< paragraph class="language-java language-py" >}}
If you have a processing operation that consists of multiple transforms or
`ParDo` steps, you can create it as a subclass of `PTransform`. Creating a
`PTransform` subclass allows you to encapsulate complex transforms, can make
your pipeline's structure more clear and modular, and makes unit testing easier.
{{< /paragraph >}}
{{< paragraph class="language-go" >}}
If you have a processing operation that consists of multiple transforms or
`ParDo` steps, you can use a normal Go function to encapsulate them. You can
furthermore use a named subscope to group them as a composite transform visible
for monitoring.
{{< /paragraph >}}
{{< paragraph class="language-java language-py" >}}
In this example, two transforms are encapsulated as the `PTransform` subclass
`CountWords`. `CountWords` contains the `ParDo` that runs `ExtractWordsFn` and
the SDK-provided `Count` transform.
{{< /paragraph >}}
{{< paragraph class="language-go" >}}
In this example, two transforms are encapsulated as a `CountWords` function.
{{< /paragraph >}}
When `CountWords` is defined, we specify its ultimate input and output; the
input is the `PCollection<String>` for the extraction operation, and the output
is the `PCollection<KV<String, Long>>` produced by the count operation.
{{< highlight java >}}
public static class CountWords extends PTransform<PCollection<String>,
PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(
ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts =
words.apply(Count.<String>perElement());
return wordCounts;
}
}
public static void main(String[] args) throws IOException {
Pipeline p = ...
p.apply(...)
.apply(new CountWords())
...
}
{{< /highlight >}}
{{< highlight py >}}
{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_examples_wordcount_wordcount.py" examples_wordcount_wordcount_composite >}}
{{< /highlight >}}
{{< highlight go >}}
func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
s = s.Scope("CountWords")
// Convert lines of text into individual words.
col := beam.ParDo(s, extractFn, lines)
// Count the number of times each word occurs.
return stats.Count(s, col)
}
{{< /highlight >}}
### Using parameterizable PipelineOptions
You can hard-code various execution options when you run your pipeline. However,
the more common way is to define your own configuration options via command-line
argument parsing. Defining your configuration options via the command-line makes
the code more easily portable across different runners.
{{< paragraph class="language-java language-py" >}}
Add arguments to be processed by the command-line parser, and specify default
values for them. You can then access the options values in your pipeline code.
{{< /paragraph >}}
{{< paragraph class="language-go" >}}
You can use the standard `flag` package for this purpose.
{{< /paragraph >}}
{{< highlight java >}}
public static interface WordCountOptions extends PipelineOptions {
@Description("Path of the file to read from")
@Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
String getInputFile();
void setInputFile(String value);
...
}
public static void main(String[] args) {
WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(WordCountOptions.class);
Pipeline p = Pipeline.create(options);
...
}
{{< /highlight >}}
{{< highlight py >}}
{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_examples_wordcount_wordcount.py" examples_wordcount_wordcount_options >}}
{{< /highlight >}}
{{< highlight go >}}
var input = flag.String("input", "gs://apache-beam-samples/shakespeare/kinglear.txt", "File(s) to read.")
func main() {
...
p := beam.NewPipeline()
s := p.Root()
lines := textio.Read(s, *input)
...
{{< /highlight >}}
### Try the full example in Playground
{{< playground height="700px" >}}
{{< playground_snippet language="java" path="SDK_JAVA_WordCount" >}}
{{< playground_snippet language="py" path="SDK_PYTHON_WordCount" >}}
{{< playground_snippet language="go" path="SDK_GO_WordCount" >}}
{{< /playground >}}
## DebuggingWordCount example
The DebuggingWordCount example demonstrates some best practices for
instrumenting your pipeline code.
**To run this example in Java:**
{{< runner direct >}}
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.DebuggingWordCount \
-Dexec.args="--output=counts" -Pdirect-runner
{{< /runner >}}
{{< runner flink >}}
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.DebuggingWordCount \
-Dexec.args="--runner=FlinkRunner --output=counts" -Pflink-runner
{{< /runner >}}
{{< runner flinkCluster >}}
$ mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.DebuggingWordCount \
-Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=target/word-count-beam-bundled-0.1.jar \
--output=/tmp/counts" -Pflink-runner
You can monitor the running job by visiting the Flink dashboard at http://<flink master>:8081
{{< /runner >}}
{{< runner spark >}}
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.DebuggingWordCount \
-Dexec.args="--runner=SparkRunner --output=counts" -Pspark-runner
{{< /runner >}}
{{< runner dataflow >}}
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.DebuggingWordCount \
-Dexec.args="--runner=DataflowRunner --gcpTempLocation=gs://<your-gcs-bucket>/tmp \
--project=YOUR_PROJECT --region=GCE_REGION \
--inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://<your-gcs-bucket>/counts" \
-Pdataflow-runner
{{< /runner >}}
{{< runner samza >}}
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.DebuggingWordCount \
-Dexec.args="--runner=SamzaRunner --output=counts" -Psamza-runner
{{< /runner >}}
{{< runner nemo >}}
$ mvn package -Pnemo-runner && java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.DebuggingWordCount \
--runner=NemoRunner --inputFile=`pwd`/pom.xml --output=counts
{{< /runner >}}
{{< runner jet >}}
$ mvn package -P jet-runner && java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.DebuggingWordCount \
--runner=JetRunner --jetLocalMode=3 --output=counts
{{< /runner >}}
To view the full code in Java, see
[DebuggingWordCount](https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java).
**To run this example in Python:**
{{< runner direct >}}
python -m apache_beam.examples.wordcount_debugging --input YOUR_INPUT_FILE --output counts
{{< /runner >}}
{{< runner flink >}}
This runner is not yet available for the Python SDK.
{{< /runner >}}
{{< runner flinkCluster >}}
This runner is not yet available for the Python SDK.
{{< /runner >}}
{{< runner spark >}}
This runner is not yet available for the Python SDK.
{{< /runner >}}
{{< runner dataflow >}}
# As part of the initial setup, install Google Cloud Platform specific extra components.
pip install apache-beam[gcp]
python -m apache_beam.examples.wordcount_debugging --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://YOUR_GCS_BUCKET/counts \
--runner DataflowRunner \
--project YOUR_GCP_PROJECT \
--temp_location gs://YOUR_GCS_BUCKET/tmp/
{{< /runner >}}
{{< runner samza >}}
This runner is not yet available for the Python SDK.
{{< /runner >}}
{{< runner nemo >}}
This runner is not yet available for the Python SDK.
{{< /runner >}}
{{< runner jet >}}
This runner is not yet available for the Python SDK.
{{< /runner >}}
To view the full code in Python, see
**[wordcount_debugging.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount_debugging.py).**
**To run this example in Go:**
{{< runner direct >}}
$ go install github.com/apache/beam/sdks/v2/go/examples/debugging_wordcount
$ debugging_wordcount --input <PATH_TO_INPUT_FILE> --output counts
{{< /runner >}}
{{< runner flink >}}
This runner is not yet available for the Go SDK.
{{< /runner >}}
{{< runner flinkCluster >}}
This runner is not yet available for the Go SDK.
{{< /runner >}}
{{< runner spark >}}
This runner is not yet available for the Go SDK.
{{< /runner >}}
{{< runner dataflow >}}
$ go install github.com/apache/beam/sdks/v2/go/examples/debugging_wordcount
# As part of the initial setup, for non linux users - install package unix before run
$ go get -u golang.org/x/sys/unix
$ debugging_wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://<your-gcs-bucket>/counts \
--runner dataflow \
--project your-gcp-project \
--region your-gcp-region \
--temp_location gs://<your-gcs-bucket>/tmp/ \
--staging_location gs://<your-gcs-bucket>/binaries/ \
--worker_harness_container_image=apache-docker-beam-snapshots-docker.bintray.io/beam/go:20180515
{{< /runner >}}
{{< runner samza >}}
This runner is not yet available for the Go SDK.
{{< /runner >}}
{{< runner nemo >}}
This runner is not yet available for the Go SDK.
{{< /runner >}}
{{< runner jet >}}
This runner is not yet available for the Go SDK.
{{< /runner >}}
To view the full code in Go, see
**[debugging_wordcount.go](https://github.com/apache/beam/blob/master/sdks/go/examples/debugging_wordcount/debugging_wordcount.go).**
**New Concepts:**
* Logging
* Testing your Pipeline via `PAssert`
The following sections explain these key concepts in detail, and break down the
pipeline code into smaller sections.
### Logging
Each runner may choose to handle logs in its own way.
{{< highlight java >}}
// This example uses .trace and .debug:
public class DebuggingWordCount {
public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> {
...
@ProcessElement
public void processElement(ProcessContext c) {
if (...) {
...
LOG.debug("Matched: " + c.element().getKey());
} else {
...
LOG.trace("Did not match: " + c.element().getKey());
}
}
}
}
{{< /highlight >}}
{{< highlight py >}}
{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_examples_wordcount_debugging.py" example_wordcount_debugging_logging >}}
{{< /highlight >}}
{{< highlight go >}}
type filterFn struct {
...
}
func (f *filterFn) ProcessElement(ctx context.Context, word string, count int, emit func(string, int)) {
if f.re.MatchString(word) {
// Log at the "INFO" level each element that we match.
log.Infof(ctx, "Matched: %v", word)
emit(word, count)
} else {
// Log at the "DEBUG" level each element that is not matched.
log.Debugf(ctx, "Did not match: %v", word)
}
}
{{< /highlight >}}
#### Direct Runner
When executing your pipeline with the `DirectRunner`, you can print log
messages directly to your local console. <span class="language-java">If you use
the Beam SDK for Java, you must add `Slf4j` to your class path.</span>
#### Cloud Dataflow Runner
When executing your pipeline with the `DataflowRunner`, you can use Stackdriver
Logging. Stackdriver Logging aggregates the logs from all of your Cloud Dataflow
job's workers to a single location in the Google Cloud Platform Console. You can
use Stackdriver Logging to search and access the logs from all of the workers
that Cloud Dataflow has spun up to complete your job. Logging statements in your
pipeline's `DoFn` instances will appear in Stackdriver Logging as your pipeline
runs.
{{< paragraph class="language-java language-py" >}}
You can also control the worker log levels. Cloud Dataflow workers that execute
user code are configured to log to Stackdriver Logging by default at "INFO" log
level and higher. You can override log levels for specific logging namespaces by
specifying: `--workerLogLevelOverrides={"Name1":"Level1","Name2":"Level2",...}`.
For example, by specifying `--workerLogLevelOverrides={"org.apache.beam.examples":"DEBUG"}`
when executing a pipeline using the Cloud Dataflow service, Stackdriver Logging
will contain only "DEBUG" or higher level logs for the package in addition to
the default "INFO" or higher level logs.
{{< /paragraph >}}
{{< paragraph class="language-java language-py" >}}
The default Cloud Dataflow worker logging configuration can be overridden by
specifying `--defaultWorkerLogLevel=<one of TRACE, DEBUG, INFO, WARN, ERROR>`.
For example, by specifying `--defaultWorkerLogLevel=DEBUG` when executing a
pipeline with the Cloud Dataflow service, Cloud Logging will contain all "DEBUG"
or higher level logs. Note that changing the default worker log level to TRACE
or DEBUG significantly increases the amount of logs output.
{{< /paragraph >}}
#### Apache Spark Runner
> **Note:** This section is yet to be added. There is an open issue for this
> ([Issue 18076](https://github.com/apache/beam/issues/18076)).
#### Apache Flink Runner
> **Note:** This section is yet to be added. There is an open issue for this
> ([Issue 18075](https://github.com/apache/beam/issues/18075)).
#### Apache Nemo Runner
When executing your pipeline with the `NemoRunner`, most log messages are printed
directly to your local console. You should add `Slf4j` to your class path to make
full use of the logs. In order to observe the logs on each of the driver and the
executor sides, you should observe the folders created by Apache REEF. For example,
when running your pipeline through the local runtime, a folder called `REEF_LOCAL_RUNTIME`
will be created on your work directory, and the logs and the metric information can
all be found under the directory.
### Testing your pipeline with asserts
{{< paragraph class="language-java language-py" >}}
<span class="language-java">`PAssert`</span><span class="language-py">`assert_that`</span>
is a set of convenient PTransforms in the style of Hamcrest's collection
matchers that can be used when writing pipeline level tests to validate the
contents of PCollections. Asserts are best used in unit tests with small datasets.
{{< /paragraph >}}
{{< paragraph class="language-go" >}}
The `passert` package contains convenient PTransforms that can be used when
writing pipeline level tests to validate the contents of PCollections. Asserts
are best used in unit tests with small datasets.
{{< /paragraph >}}
{{< paragraph class="language-java" >}}
The following example verifies that the set of filtered words matches our
expected counts. The assert does not produce any output, and the pipeline only
succeeds if all of the expectations are met.
{{< /paragraph >}}
{{< paragraph class="language-py language-go" >}}
The following example verifies that two collections contain the same values. The
assert does not produce any output, and the pipeline only succeeds if all of the
expectations are met.
{{< /paragraph >}}
{{< highlight java >}}
public static void main(String[] args) {
...
List<KV<String, Long>> expectedResults = Arrays.asList(
KV.of("Flourish", 3L),
KV.of("stomach", 1L));
PAssert.that(filteredWords).containsInAnyOrder(expectedResults);
...
}
{{< /highlight >}}
{{< highlight py >}}
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
with TestPipeline() as p:
assert_that(p | Create([1, 2, 3]), equal_to([1, 2, 3]))
{{< /highlight >}}
{{< highlight go >}}
...
passert.Equals(s, formatted, "Flourish: 3", "stomach: 1")
{{< /highlight >}}
{{< paragraph class="language-java" >}}
See [DebuggingWordCountTest](https://github.com/apache/beam/blob/master/examples/java/src/test/java/org/apache/beam/examples/DebuggingWordCountTest.java)
for an example unit test.
{{< /paragraph >}}
### Try the full example in Playground
{{< playground height="700px" >}}
{{< playground_snippet language="java" path="SDK_JAVA_DebuggingWordCount" >}}
{{< playground_snippet language="py" path="SDK_PYTHON_WordCountDebugging" >}}
{{< playground_snippet language="go" path="SDK_GO_DebuggingWordCount" >}}
{{< /playground >}}
## WindowedWordCount example
The WindowedWordCount example counts words in text just as the previous
examples did, but introduces several advanced concepts.
**New Concepts:**
* Unbounded and bounded datasets
* Adding timestamps to data
* Windowing
* Reusing PTransforms over windowed PCollections
The following sections explain these key concepts in detail, and break down the
pipeline code into smaller sections.
**To run this example in Java:**
{{< runner direct >}}
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WindowedWordCount \
-Dexec.args="--inputFile=pom.xml --output=counts" -Pdirect-runner
{{< /runner >}}
{{< runner flink >}}
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WindowedWordCount \
-Dexec.args="--runner=FlinkRunner --inputFile=pom.xml --output=counts" -Pflink-runner
{{< /runner >}}
{{< runner flinkCluster >}}
$ mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.WindowedWordCount \
-Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=target/word-count-beam-bundled-0.1.jar \
--inputFile=/path/to/quickstart/pom.xml --output=/tmp/counts" -Pflink-runner
You can monitor the running job by visiting the Flink dashboard at http://<flink master>:8081
{{< /runner >}}
{{< runner spark >}}
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WindowedWordCount \
-Dexec.args="--runner=SparkRunner --inputFile=pom.xml --output=counts" -Pspark-runner
{{< /runner >}}
{{< runner dataflow >}}
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WindowedWordCount \
-Dexec.args="--runner=DataflowRunner --gcpTempLocation=gs://YOUR_GCS_BUCKET/tmp \
--project=YOUR_PROJECT --region=GCE_REGION \
--inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://YOUR_GCS_BUCKET/counts" \
-Pdataflow-runner
{{< /runner >}}
{{< runner samza >}}
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WindowedWordCount \
-Dexec.args="--runner=SamzaRunner --inputFile=pom.xml --output=counts" -Psamza-runner
{{< /runner >}}
{{< runner nemo >}}
$ mvn package -Pnemo-runner && java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.WindowedWordCount \
--runner=NemoRunner --inputFile=`pwd`/pom.xml --output=counts
{{< /runner >}}
{{< runner jet >}}
$ mvn package -P jet-runner && java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.WindowedWordCount \
--runner=JetRunner --jetLocalMode=3 --inputFile=`pwd`/pom.xml --output=counts
{{< /runner >}}
To view the full code in Java, see
**[WindowedWordCount](https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java).**
**To run this example in Python:**
This pipeline writes its results to a BigQuery table `--output_table`
parameter. using the format `PROJECT:DATASET.TABLE` or
`DATASET.TABLE`.
{{< runner direct >}}
python -m apache_beam.examples.windowed_wordcount --input YOUR_INPUT_FILE --output_table PROJECT:DATASET.TABLE
{{< /runner >}}
{{< runner flink >}}
This runner is not yet available for the Python SDK.
{{< /runner >}}
{{< runner flinkCluster >}}
This runner is not yet available for the Python SDK.
{{< /runner >}}
{{< runner spark >}}
This runner is not yet available for the Python SDK.
{{< /runner >}}
{{< runner dataflow >}}
# As part of the initial setup, install Google Cloud Platform specific extra components.
pip install apache-beam[gcp]
python -m apache_beam.examples.windowed_wordcount --input YOUR_INPUT_FILE \
--output_table PROJECT:DATASET.TABLE \
--runner DataflowRunner \
--project YOUR_GCP_PROJECT \
--temp_location gs://YOUR_GCS_BUCKET/tmp/
{{< /runner >}}
{{< runner samza >}}
This runner is not yet available for the Python SDK.
{{< /runner >}}
{{< runner nemo >}}
This runner is not yet available for the Python SDK.
{{< /runner >}}
{{< runner jet >}}
This runner is not yet available for the Python SDK.
{{< /runner >}}
To view the full code in Python, see
**[windowed_wordcount.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/windowed_wordcount.py).**
**To run this example in Go:**
{{< runner direct >}}
$ go install github.com/apache/beam/sdks/v2/go/examples/windowed_wordcount
$ windowed_wordcount --input <PATH_TO_INPUT_FILE> --output counts
{{< /runner >}}
{{< runner flink >}}
This runner is not yet available for the Go SDK.
{{< /runner >}}
{{< runner flinkCluster >}}
This runner is not yet available for the Go SDK.
{{< /runner >}}
{{< runner spark >}}
This runner is not yet available for the Go SDK.
{{< /runner >}}
{{< runner dataflow >}}
$ go install github.com/apache/beam/sdks/v2/go/examples/windowed_wordcount
# As part of the initial setup, for non linux users - install package unix before run
$ go get -u golang.org/x/sys/unix
$ windowed_wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://<your-gcs-bucket>/counts \
--runner dataflow \
--project your-gcp-project \
--temp_location gs://<your-gcs-bucket>/tmp/ \
--staging_location gs://<your-gcs-bucket>/binaries/ \
--worker_harness_container_image=apache-docker-beam-snapshots-docker.bintray.io/beam/go:20180515
{{< /runner >}}
{{< runner samza >}}
This runner is not yet available for the Go SDK.
{{< /runner >}}
{{< runner nemo >}}
This runner is not yet available for the Go SDK.
{{< /runner >}}
{{< runner jet >}}
This runner is not yet available for the Go SDK.
{{< /runner >}}
To view the full code in Go, see
**[windowed_wordcount.go](https://github.com/apache/beam/blob/master/sdks/go/examples/windowed_wordcount/windowed_wordcount.go).**
### Unbounded and bounded datasets
Beam allows you to create a single pipeline that can handle both bounded and
unbounded datasets. If your dataset has a fixed number of elements, it is a bounded
dataset and all of the data can be processed together. For bounded datasets,
the question to ask is "Do I have all of the data?" If data continuously
arrives (such as an endless stream of game scores in the
[Mobile gaming example](/get-started/mobile-gaming-example/),
it is an unbounded dataset. An unbounded dataset is never available for
processing at any one time, so the data must be processed using a streaming
pipeline that runs continuously. The dataset will only be complete up to a
certain point, so the question to ask is "Up until what point do I have all of
the data?" Beam uses [windowing](/documentation/programming-guide/#windowing)
to divide a continuously updating dataset into logical windows of finite size.
If your input is unbounded, you must use a runner that supports streaming.
If your pipeline's input is bounded, then all downstream PCollections will also be
bounded. Similarly, if the input is unbounded, then all downstream PCollections
of the pipeline will be unbounded, though separate branches may be independently
bounded.
Recall that the input for this example is a set of Shakespeare's texts, which is
a finite set of data. Therefore, this example reads bounded data from a text
file:
{{< highlight java >}}
public static void main(String[] args) throws IOException {
Options options = ...
Pipeline pipeline = Pipeline.create(options);
PCollection<String> input = pipeline
.apply(TextIO.read().from(options.getInputFile()))
{{< /highlight >}}
{{< highlight py >}}
def main(arvg=None):
parser = argparse.ArgumentParser()
parser.add_argument('--input-file',
dest='input_file',
default='/Users/home/words-example.txt')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
p = beam.Pipeline(options=pipeline_options)
lines = p | 'read' >> ReadFromText(known_args.input_file)
{{< /highlight >}}
{{< highlight go >}}
func main() {
...
p := beam.NewPipeline()
s := p.Root()
lines := textio.Read(s, *input)
...
}
{{< /highlight >}}
### Adding timestamps to data
Each element in a `PCollection` has an associated [timestamp](/documentation/programming-guide#element-timestamps).
The timestamp for each element is initially assigned by the source that creates
the `PCollection`. Some sources that create unbounded PCollections can assign
each new element a timestamp that corresponds to when the element was read or
added. You can manually assign or adjust timestamps with a `DoFn`; however, you
can only move timestamps forward in time.
In this example the input is bounded. For the purpose of the example, the `DoFn`
method named `AddTimestampsFn` (invoked by `ParDo`) will set a timestamp for
each element in the `PCollection`.
{{< highlight java >}}
.apply(ParDo.of(new AddTimestampFn(minTimestamp, maxTimestamp)));
{{< /highlight >}}
{{< highlight py >}}
beam.Map(AddTimestampFn(min_timestamp, max_timestamp))
{{< /highlight >}}
{{< highlight go >}}
timestampedLines := beam.ParDo(s, &addTimestampFn{Min: mtime.Now()}, lines)
{{< /highlight >}}
Below is the code for `AddTimestampFn`, a `DoFn` invoked by `ParDo`, that sets
the data element of the timestamp given the element itself. For example, if the
elements were log lines, this `ParDo` could parse the time out of the log string
and set it as the element's timestamp. There are no timestamps inherent in the
works of Shakespeare, so in this case we've made up random timestamps just to
illustrate the concept. Each line of the input text will get a random associated
timestamp sometime in a 2-hour period.
{{< highlight java >}}
static class AddTimestampFn extends DoFn<String, String> {
private final Instant minTimestamp;
private final Instant maxTimestamp;
AddTimestampFn(Instant minTimestamp, Instant maxTimestamp) {
this.minTimestamp = minTimestamp;
this.maxTimestamp = maxTimestamp;
}
@ProcessElement
public void processElement(ProcessContext c) {
Instant randomTimestamp =
new Instant(
ThreadLocalRandom.current()
.nextLong(minTimestamp.getMillis(), maxTimestamp.getMillis()));
/**
* Concept #2: Set the data element with that timestamp.
*/
c.outputWithTimestamp(c.element(), new Instant(randomTimestamp));
}
}
{{< /highlight >}}
{{< highlight py >}}
class AddTimestampFn(beam.DoFn):
def __init__(self, min_timestamp, max_timestamp):
self.min_timestamp = min_timestamp
self.max_timestamp = max_timestamp
def process(self, element):
return window.TimestampedValue(
element,
random.randint(self.min_timestamp, self.max_timestamp))
{{< /highlight >}}
{{< highlight go >}}
type addTimestampFn struct {
Min beam.EventTime `json:"min"`
}
func (f *addTimestampFn) ProcessElement(x beam.X) (beam.EventTime, beam.X) {
timestamp := f.Min.Add(time.Duration(rand.Int63n(2 * time.Hour.Nanoseconds())))
return timestamp, x
}
{{< /highlight >}}
{{< paragraph class="language-go" >}}
Note that the use of the `beam.X` "type variable" allows the transform to be
used for any type.
{{< /paragraph >}}
### Windowing
Beam uses a concept called **Windowing** to subdivide a `PCollection` into
bounded sets of elements. PTransforms that aggregate multiple elements process
each `PCollection` as a succession of multiple, finite windows, even though the
entire collection itself may be of infinite size (unbounded).
The WindowedWordCount example applies fixed-time windowing, wherein each
window represents a fixed time interval. The fixed window size for this example
defaults to 1 minute (you can change this with a command-line option).
{{< highlight java >}}
PCollection<String> windowedWords = input
.apply(Window.<String>into(
FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))));
{{< /highlight >}}
{{< highlight py >}}
windowed_words = input | beam.WindowInto(window.FixedWindows(60 * window_size_minutes))
{{< /highlight >}}
{{< highlight go >}}
windowedLines := beam.WindowInto(s, window.NewFixedWindows(time.Minute), timestampedLines)
{{< /highlight >}}
### Reusing PTransforms over windowed PCollections
You can reuse existing PTransforms that were created for manipulating simple
PCollections over windowed PCollections as well.
{{< highlight java >}}
PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new WordCount.CountWords());
{{< /highlight >}}
{{< highlight py >}}
word_counts = windowed_words | CountWords()
{{< /highlight >}}
{{< highlight go >}}
counted := wordcount.CountWords(s, windowedLines)
{{< /highlight >}}
{{< paragraph class="language-java language-go" >}}
### Try the full example in Playground
{{< /paragraph >}}
{{< playground height="700px" >}}
{{< playground_snippet language="java" path="SDK_JAVA_WindowedWordCount" >}}
{{< playground_snippet language="go" path="SDK_GO_WindowedWordCount" >}}
{{< /playground >}}
## StreamingWordCount example
The StreamingWordCount example is a streaming pipeline that reads Pub/Sub
messages from a Pub/Sub subscription or topic, and performs a frequency count on
the words in each message. Similar to WindowedWordCount, this example applies
fixed-time windowing, wherein each window represents a fixed time interval. The
fixed window size for this example is 15 seconds. The pipeline outputs the
frequency count of the words seen in each 15 second window.
**New Concepts:**
* Reading an unbounded dataset
* Writing unbounded results
**To run this example in Java:**
> **Note:** StreamingWordCount is not yet available for the Java SDK.
**To run this example in Python:**
{{< runner direct >}}
python -m apache_beam.examples.streaming_wordcount \
--input_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_INPUT_TOPIC" \
--output_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_OUTPUT_TOPIC" \
--streaming
{{< /runner >}}
{{< runner flink >}}
This runner is not yet available for the Python SDK.
{{< /runner >}}
{{< runner flinkCluster >}}
This runner is not yet available for the Python SDK.
{{< /runner >}}
{{< runner spark >}}
This runner is not yet available for the Python SDK.
{{< /runner >}}
{{< runner dataflow >}}
# As part of the initial setup, install Google Cloud Platform specific extra components.
pip install apache-beam[gcp]
python -m apache_beam.examples.streaming_wordcount \
--runner DataflowRunner \
--project YOUR_GCP_PROJECT \
--region YOUR_GCP_REGION \
--temp_location gs://YOUR_GCS_BUCKET/tmp/ \
--input_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_INPUT_TOPIC" \
--output_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_OUTPUT_TOPIC" \
--streaming
{{< /runner >}}
{{< runner samza >}}
This runner is not yet available for the Python SDK.
{{< /runner >}}
{{< runner nemo >}}
This runner is not yet available for the Python SDK.
{{< /runner >}}
{{< runner jet >}}
This runner is not yet available for the Python SDK.
{{< /runner >}}
To view the full code in Python, see
**[streaming_wordcount.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/streaming_wordcount.py).**
**To run this example in Go:**
> **Note:** StreamingWordCount is not yet available for the Go SDK. There is an open issue for this
([Issue 18879](https://github.com/apache/beam/issues/18879)).
### Reading an unbounded dataset
This example uses an unbounded dataset as input. The code reads Pub/Sub
messages from a Pub/Sub subscription or topic using
[`beam.io.ReadFromPubSub`](https://beam.apache.org/releases/pydoc/{{< param release_latest >}}/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub).
{{< highlight java >}}
// This example is not currently available for the Beam SDK for Java.
{{< /highlight >}}
{{< highlight py >}}
# Read from Pub/Sub into a PCollection.
if known_args.input_subscription:
data = p | beam.io.ReadFromPubSub(
subscription=known_args.input_subscription)
else:
data = p | beam.io.ReadFromPubSub(topic=known_args.input_topic)
lines = data | 'DecodeString' >> beam.Map(lambda d: d.decode('utf-8'))
{{< /highlight >}}
{{< highlight go >}}
// This example is not currently available for the Beam SDK for Go.
{{< /highlight >}}
### Writing unbounded results
When the input is unbounded, the same is true of the output `PCollection`. As
such, you must make sure to choose an appropriate I/O for the results. Some I/Os
support only bounded output, while others support both bounded and unbounded
outputs.
This example uses an unbounded `PCollection` and streams the results to
Google Pub/Sub. The code formats the results and writes them to a Pub/Sub topic
using [`beam.io.WriteToPubSub`](https://beam.apache.org/releases/pydoc/{{< param release_latest >}}/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.WriteToPubSub).
{{< highlight java >}}
// This example is not currently available for the Beam SDK for Java.
{{< /highlight >}}
{{< highlight py >}}
# Write to Pub/Sub
_ = (output
| 'EncodeString' >> Map(lambda s: s.encode('utf-8'))
| beam.io.WriteToPubSub(known_args.output_topic))
{{< /highlight >}}
{{< highlight go >}}
// This example is not currently available for the Beam SDK for Go.
{{< /highlight >}}
## Next Steps
* Walk through the Mobile Gaming examples in the [Mobile Gaming Example Walkthrough](/get-started/mobile-gaming-example).
* Take a self-paced tour through our [Learning Resources](/documentation/resources/learning-resources).
* Dive in to some of our favorite [Videos and Podcasts](/get-started/resources/videos-and-podcasts).
* Join the Beam [users@](/community/contact-us) mailing list.
Please don't hesitate to [reach out](/community/contact-us) if you encounter any issues!