blob: 5c039cde2428eee4b73944e5d5603145229410e7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples;
import java.io.IOException;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
import org.apache.beam.examples.common.ExampleOptions;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.joda.time.Instant;
/**
* An example that counts words in text, and can run over either unbounded or bounded input
* collections.
*
* <p>This class, {@link WindowedWordCount}, is the last in a series of four successively more
* detailed 'word count' examples. First take a look at {@link MinimalWordCount},
* {@link WordCount}, and {@link DebuggingWordCount}.
*
* <p>Basic concepts, also in the MinimalWordCount, WordCount, and DebuggingWordCount examples:
* Reading text files; counting a PCollection; writing to GCS; executing a Pipeline both locally
* and using a selected runner; defining DoFns;
* user-defined PTransforms; defining PipelineOptions.
*
* <p>New Concepts:
* <pre>
* 1. Unbounded and bounded pipeline input modes
* 2. Adding timestamps to data
* 3. Windowing
* 4. Re-using PTransforms over windowed PCollections
* 5. Accessing the window of an element
* 6. Writing data to per-window text files
* </pre>
*
* <p>By default, the examples will run with the {@code DirectRunner}.
* To change the runner, specify:
* <pre>{@code
* --runner=YOUR_SELECTED_RUNNER
* }
* </pre>
* See examples/java/README.md for instructions about how to configure different runners.
*
* <p>To execute this pipeline locally, specify a local output file (if using the
* {@code DirectRunner}) or output prefix on a supported distributed file system.
* <pre>{@code
* --output=[YOUR_LOCAL_FILE | YOUR_OUTPUT_PREFIX]
* }</pre>
*
* <p>The input file defaults to a public data set containing the text of of King Lear,
* by William Shakespeare. You can override it and choose your own input with {@code --inputFile}.
*
* <p>By default, the pipeline will do fixed windowing, on 1-minute windows. You can
* change this interval by setting the {@code --windowSize} parameter, e.g. {@code --windowSize=10}
* for 10-minute windows.
*
* <p>The example will try to cancel the pipeline on the signal to terminate the process (CTRL-C).
*/
public class WindowedWordCount {
static final int WINDOW_SIZE = 10; // Default window duration in minutes
/**
* Concept #2: A DoFn that sets the data element timestamp. This is a silly method, just for
* this example, for the bounded data case.
*
* <p>Imagine that many ghosts of Shakespeare are all typing madly at the same time to recreate
* his masterworks. Each line of the corpus will get a random associated timestamp somewhere in a
* 2-hour period.
*/
static class AddTimestampFn extends DoFn<String, String> {
private final Instant minTimestamp;
private final Instant maxTimestamp;
AddTimestampFn(Instant minTimestamp, Instant maxTimestamp) {
this.minTimestamp = minTimestamp;
this.maxTimestamp = maxTimestamp;
}
@ProcessElement
public void processElement(ProcessContext c) {
Instant randomTimestamp =
new Instant(
ThreadLocalRandom.current()
.nextLong(minTimestamp.getMillis(), maxTimestamp.getMillis()));
/**
* Concept #2: Set the data element with that timestamp.
*/
c.outputWithTimestamp(c.element(), new Instant(randomTimestamp));
}
}
/** A {@link DefaultValueFactory} that returns the current system time. */
public static class DefaultToCurrentSystemTime implements DefaultValueFactory<Long> {
@Override
public Long create(PipelineOptions options) {
return System.currentTimeMillis();
}
}
/** A {@link DefaultValueFactory} that returns the minimum timestamp plus one hour. */
public static class DefaultToMinTimestampPlusOneHour implements DefaultValueFactory<Long> {
@Override
public Long create(PipelineOptions options) {
return options.as(Options.class).getMinTimestampMillis()
+ Duration.standardHours(1).getMillis();
}
}
/**
* Options for {@link WindowedWordCount}.
*
* <p>Inherits standard example configuration options, which allow specification of the
* runner, as well as the {@link WordCount.WordCountOptions} support for
* specification of the input and output files.
*/
public interface Options extends WordCount.WordCountOptions,
ExampleOptions, ExampleBigQueryTableOptions {
@Description("Fixed window duration, in minutes")
@Default.Integer(WINDOW_SIZE)
Integer getWindowSize();
void setWindowSize(Integer value);
@Description("Minimum randomly assigned timestamp, in milliseconds-since-epoch")
@Default.InstanceFactory(DefaultToCurrentSystemTime.class)
Long getMinTimestampMillis();
void setMinTimestampMillis(Long value);
@Description("Maximum randomly assigned timestamp, in milliseconds-since-epoch")
@Default.InstanceFactory(DefaultToMinTimestampPlusOneHour.class)
Long getMaxTimestampMillis();
void setMaxTimestampMillis(Long value);
@Description("Fixed number of shards to produce per window, or null for runner-chosen sharding")
Integer getNumShards();
void setNumShards(Integer numShards);
}
public static void main(String[] args) throws IOException {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
final String output = options.getOutput();
final Instant minTimestamp = new Instant(options.getMinTimestampMillis());
final Instant maxTimestamp = new Instant(options.getMaxTimestampMillis());
Pipeline pipeline = Pipeline.create(options);
/**
* Concept #1: the Beam SDK lets us run the same pipeline with either a bounded or
* unbounded input source.
*/
PCollection<String> input = pipeline
/** Read from the GCS file. */
.apply(TextIO.read().from(options.getInputFile()))
// Concept #2: Add an element timestamp, using an artificial time just to show windowing.
// See AddTimestampFn for more detail on this.
.apply(ParDo.of(new AddTimestampFn(minTimestamp, maxTimestamp)));
/**
* Concept #3: Window into fixed windows. The fixed window size for this example defaults to 1
* minute (you can change this with a command-line option). See the documentation for more
* information on how fixed windows work, and for information on the other types of windowing
* available (e.g., sliding windows).
*/
PCollection<String> windowedWords =
input.apply(
Window.<String>into(
FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))));
/**
* Concept #4: Re-use our existing CountWords transform that does not have knowledge of
* windows over a PCollection containing windowed values.
*/
PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new WordCount.CountWords());
/**
* Concept #5: Format the results and write to a sharded file partitioned by window, using a
* simple ParDo operation. Because there may be failures followed by retries, the
* writes must be idempotent, but the details of writing to files is elided here.
*/
wordCounts
.apply(MapElements.via(new WordCount.FormatAsTextFn()))
.apply(new WriteOneFilePerWindow(output, options.getNumShards()));
PipelineResult result = pipeline.run();
try {
result.waitUntilFinish();
} catch (Exception exc) {
result.cancel();
}
}
}