blob: 693f0c4f44de07b44a73a6d999961da5edfe9532 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.beam.examples.cookbook;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
* An example that reads the public 'Shakespeare' data, and for each word in
* the dataset that is over a given length, generates a string containing the
* list of play names in which that word appears, and saves this information
* to a bigquery table.
* <p>Note: Before running this example, you must create a BigQuery dataset to contain your output
* table.
* <p>To execute this pipeline locally, specify the BigQuery table for the output:
* <pre>{@code
* }</pre>
* <p>To change the runner, specify:
* <pre>{@code
* }
* </pre>
* See examples/java/ for instructions about how to configure different runners.
* <p>The BigQuery input table defaults to {@code publicdata:samples.shakespeare} and can
* be overridden with {@code --input}.
public class CombinePerKeyExamples {
// Use the shakespeare public BigQuery sample
private static final String SHAKESPEARE_TABLE =
// We'll track words >= this word length across all plays in the table.
private static final int MIN_WORD_LENGTH = 9;
* Examines each row in the input table. If the word is greater than or equal to MIN_WORD_LENGTH,
* outputs word, play_name.
static class ExtractLargeWordsFn extends DoFn<TableRow, KV<String, String>> {
private final Counter smallerWords = Metrics.counter(ExtractLargeWordsFn.class, "smallerWords");
public void processElement(ProcessContext c){
TableRow row = c.element();
String playName = (String) row.get("corpus");
String word = (String) row.get("word");
if (word.length() >= MIN_WORD_LENGTH) {
c.output(KV.of(word, playName));
} else {
// Track how many smaller words we're not including. This information will be
// visible in the Monitoring UI.;
* Prepares the data for writing to BigQuery by building a TableRow object
* containing a word with a string listing the plays in which it appeared.
static class FormatShakespeareOutputFn extends DoFn<KV<String, String>, TableRow> {
public void processElement(ProcessContext c) {
TableRow row = new TableRow()
.set("word", c.element().getKey())
.set("all_plays", c.element().getValue());
* Reads the public 'Shakespeare' data, and for each word in the dataset
* over a given length, generates a string containing the list of play names
* in which that word appears. It does this via the Combine.perKey
* transform, with the ConcatWords combine function.
* <p>Combine.perKey is similar to a GroupByKey followed by a ParDo, but
* has more restricted semantics that allow it to be executed more
* efficiently. These records are then formatted as BQ table rows.
static class PlaysForWord
extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {
public PCollection<TableRow> expand(PCollection<TableRow> rows) {
// row... => <word, play_name> ...
PCollection<KV<String, String>> words = rows.apply(
ParDo.of(new ExtractLargeWordsFn()));
// word, play_name => word, all_plays ...
PCollection<KV<String, String>> wordAllPlays =
words.apply(Combine.<String, String>perKey(
new ConcatWords()));
// <word, all_plays>... => row...
PCollection<TableRow> results = wordAllPlays.apply(
ParDo.of(new FormatShakespeareOutputFn()));
return results;
* A 'combine function' used with the Combine.perKey transform. Builds a
* comma-separated string of all input items. So, it will build a string
* containing all the different Shakespeare plays in which the given input
* word has appeared.
public static class ConcatWords implements SerializableFunction<Iterable<String>, String> {
public String apply(Iterable<String> input) {
StringBuilder all = new StringBuilder();
for (String item : input) {
if (!item.isEmpty()) {
if (all.length() == 0) {
} else {
return all.toString();
* Options supported by {@link CombinePerKeyExamples}.
* <p>Inherits standard configuration options.
private interface Options extends PipelineOptions {
@Description("Table to read from, specified as "
+ "<project_id>:<dataset_id>.<table_id>")
String getInput();
void setInput(String value);
@Description("Table to write to, specified as "
+ "<project_id>:<dataset_id>.<table_id>. "
+ "The dataset_id must already exist")
String getOutput();
void setOutput(String value);
public static void main(String[] args)
throws Exception {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline p = Pipeline.create(options);
// Build the table schema for the output table.
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("word").setType("STRING"));
fields.add(new TableFieldSchema().setName("all_plays").setType("STRING"));
TableSchema schema = new TableSchema().setFields(fields);
.apply(new PlaysForWord())