blob: fc038b47982e79ee79a0c4a7ecd63523815c1711 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import * as beam from "../../src/apache_beam";
import { PCollection } from "../../src/apache_beam";
import * as combiners from "../../src/apache_beam/transforms/combiners";
import * as pardo from "../../src/apache_beam/transforms/pardo";
import { assertDeepEqual } from "../../src/apache_beam/testing/assert";
describe("Programming Guide Tested Samples", function () {
describe("Pipelines", function () {
it("pipelines_constructing_creating", async function () {
// [START pipelines_constructing_creating]
await beam.createRunner().run(function pipeline(root) {
// Use root to build a pipeline.
// [END pipelines_constructing_creating]
it("pipeline_options", async function () {
const yargs = { argv: {} };
// [START pipeline_options]
const pipeline_options = {
runner: "default",
project: "my_project",
const runner = beam.createRunner(pipeline_options);
const runnerFromCommandLineOptions = beam.createRunner(yargs.argv);
// [END pipeline_options]
it("pipeline_options_define_custom", async function () {
const yargs = { argv: {} };
// [START pipeline_options_define_custom]
const options = yargs.argv; // Or an alternative command-line parsing library.
// Use options.input and options.output during pipeline construction.
// [END pipeline_options_define_custom]
it("pipelines_constructing_reading", async function () {
const textio = require("../../src/apache_beam/io/textio");
// [START pipelines_constructing_reading]
async function pipeline(root: beam.Root) {
// Note that textio.ReadFromText is an AsyncPTransform.
const pcoll: PCollection<string> = await root.applyAsync(
// [END pipelines_constructing_reading]
describe("PCollections", function () {
it("create_pcollection", async function () {
// [START create_pcollection]
function pipeline(root: beam.Root) {
const pcoll = root.apply(
"To be, or not to be: that is the question: ",
"Whether 'tis nobler in the mind to suffer ",
"The slings and arrows of outrageous fortune, ",
"Or to take arms against a sea of troubles, ",
// [END create_pcollection]
await beam.createRunner().run(pipeline);
describe("Transforms", function () {
it("model_pardo", async function () {
// [START model_pardo_pardo]
function computeWordLengthFn(): beam.DoFn<string, number> {
return {
process: function* (element) {
yield element.length;
// [END model_pardo_pardo]
await beam.createRunner().run((root: beam.Root) => {
const words = root.apply(beam.create(["a", "bb", "cccc"]));
// [START model_pardo_apply]
const result = words.apply(beam.parDo(computeWordLengthFn()));
// [END model_pardo_apply]
result.apply(assertDeepEqual([1, 2, 4]));
it("model_pardo_using_flatmap", async function () {
await beam.createRunner().run((root: beam.Root) => {
const words = root.apply(beam.create(["a", "bb", "cccc"]));
// [START model_pardo_using_flatmap]
const result = words.flatMap((word) => [word.length]);
// [END model_pardo_using_flatmap]
result.apply(assertDeepEqual([1, 2, 4]));
it("model_pardo_using_map", async function () {
await beam.createRunner().run((root: beam.Root) => {
const words = root.apply(beam.create(["a", "bb", "cccc"]));
// [START model_pardo_using_map]
const result = => word.length);
// [END model_pardo_using_map]
result.apply(assertDeepEqual([1, 2, 4]));
it("groupby", async function () {
await beam.createRunner().run((root: beam.Root) => {
const cats = [
{ word: "cat", score: 1 },
{ word: "cat", score: 5 },
const dogs = [{ word: "dog", score: 5 }];
function sortValues({ key, value }) {
return { key, value: value.sort() };
const scores = root.apply(beam.create(cats.concat(dogs)));
// [START groupby]
// This will produce a PCollection with elements like
// {key: "cat", value: [{ word: "cat", score: 1 },
// { word: "cat", score: 5 }, ...]}
// {key: "dog", value: [{ word: "dog", score: 5 }, ...]}
const grouped_by_word = scores.apply(beam.groupBy("word"));
// This will produce a PCollection with elements like
// {key: 3, value: [{ word: "cat", score: 1 },
// { word: "dog", score: 5 },
// { word: "cat", score: 5 }, ...]}
const by_word_length = scores.apply(beam.groupBy((x) => x.word.length));
// [END groupby]
// XXX Should it be {key, values} rather than {key, value}?
grouped_by_word //
{ key: "cat", value: cats },
{ key: "dog", value: dogs },
assertDeepEqual([{ key: 3, value: cats.concat(dogs).sort() }])
it("cogroupbykey", async function () {
await beam.createRunner().run((root: beam.Root) => {
// [START cogroupbykey_inputs]
const emails_list = [
{ name: "amy", email: "" },
{ name: "carl", email: "" },
{ name: "julia", email: "" },
{ name: "carl", email: "" },
const phones_list = [
{ name: "amy", phone: "111-222-3333" },
{ name: "james", phone: "222-333-4444" },
{ name: "amy", phone: "333-444-5555" },
{ name: "carl", phone: "444-555-6666" },
const emails = root.apply(beam.create(emails_list));
const phones = root.apply(beam.create(phones_list));
// [END cogroupbykey_inputs]
// [START cogroupbykey_raw_outputs]
const results = [
name: "amy",
values: {
emails: [{ name: "amy", email: "" }],
phones: [
{ name: "amy", phone: "111-222-3333" },
{ name: "amy", phone: "333-444-5555" },
name: "carl",
values: {
emails: [
{ name: "carl", email: "" },
{ name: "carl", email: "" },
phones: [{ name: "carl", phone: "444-555-6666" }],
name: "james",
values: {
emails: [],
phones: [{ name: "james", phone: "222-333-4444" }],
name: "julia",
values: {
emails: [{ name: "julia", email: "" }],
phones: [],
// [END cogroupbykey_raw_outputs]
// [START cogroupbykey]
const formatted_results_pcoll = beam
.P({ emails, phones })
.map(function formatResults({ key, values }) {
const emails = =>;
const phones = =>;
return `${key}; [${emails}]; [${phones}]`;
// [END cogroupbykey]
// [START cogroupbykey_formatted_outputs]
const formatted_results = [
"amy; []; [111-222-3333,333-444-5555]",
"carl; [,]; [444-555-6666]",
"james; []; [222-333-4444]",
"julia; []; []",
// [END cogroupbykey_formatted_outputs]
it("combine_simple_sum", async function () {
// prettier-ignore
await beam.createRunner().run((root: beam.Root) => {
// [START combine_simple_sum]
const pcoll = root.apply(beam.create([1, 10, 100, 1000]));
const result = pcoll.apply(
.combining((c) => c, (x, y) => x + y, "sum")
.combining((c) => c, (x, y) => x * y, "product")
const expected = { sum: 1111, product: 1000000 }
// [END combine_simple_sum]
it("combine_custom_average", async function () {
await beam.createRunner().run((root: beam.Root) => {
// [START combine_custom_average]
const meanCombineFn: beam.CombineFn<number, [number, number], number> =
createAccumulator: () => [0, 0],
addInput: ([sum, count]: [number, number], i: number) => [
sum + i,
count + 1,
mergeAccumulators: (accumulators: [number, number][]) =>
accumulators.reduce(([sum0, count0], [sum1, count1]) => [
sum0 + sum1,
count0 + count1,
extractOutput: ([sum, count]: [number, number]) => sum / count,
// [END combine_custom_average]
// [START combine_global_average]
const pcoll = root.apply(beam.create([4, 5, 6]));
const result = pcoll.apply(
beam.groupGlobally().combining((c) => c, meanCombineFn, "mean")
// [END combine_global_average]
result.apply(assertDeepEqual([{ mean: 5 }]));
it("combine_per_key", async function () {
await beam.createRunner().run((root: beam.Root) => {
// [START combine_per_key]
const pcoll = root.apply(
{ player: "alice", accuracy: 1.0 },
{ player: "bob", accuracy: 0.99 },
{ player: "eve", accuracy: 0.5 },
{ player: "eve", accuracy: 0.25 },
const result = pcoll.apply(
.combining("accuracy", combiners.mean, "mean")
.combining("accuracy", combiners.max, "max")
const expected = [
{ player: "alice", mean: 1.0, max: 1.0 },
{ player: "bob", mean: 0.99, max: 0.99 },
{ player: "eve", mean: 0.375, max: 0.5 },
// [END combine_per_key]
it("model_multiple_pcollections_flatten", async function () {
await beam.createRunner().run((root: beam.Root) => {
// [START model_multiple_pcollections_flatten]
const fib = root.apply(beam.create([1, 1, 2, 3, 5, 8]));
const pow = root.apply(beam.create([1, 2, 4, 8, 16, 32]));
const result = beam.P([fib, pow]).apply(beam.flatten());
// [END model_multiple_pcollections_flatten]
result.apply(assertDeepEqual([1, 1, 1, 2, 2, 3, 4, 5, 8, 8, 16, 32]));
it("model_multiple_pcollections_partition", async function () {
await beam.createRunner().run((root: beam.Root) => {
type Student = number;
function getPercentile(s: Student) {
return s;
const students = root.apply(beam.create([1, 5, 50, 98, 99]));
// [START model_multiple_pcollections_partition]
const deciles: PCollection<Student>[] = students.apply(
(student, numPartitions) =>
Math.floor((getPercentile(student) / 100) * numPartitions),
const topDecile: PCollection<Student> = deciles[9];
// [END model_multiple_pcollections_partition]
topDecile.apply(assertDeepEqual([98, 99]));
it("model_pardo_side_input", async function () {
await beam.createRunner().run((root: beam.Root) => {
const words = root.apply(beam.create(["a", "bb", "cccc"]));
// [START model_pardo_side_input]
// meanLengthPColl will contain a single number whose value is the
// average length of the words
const meanLengthPColl: PCollection<number> = words
.combining((word) => word.length, combiners.mean, "mean")
.map(({ mean }) => mean);
// Now we use this as a side input to yield only words that are
// smaller than average.
const smallWords = words.flatMap(
// This is the function, taking context as a second argument.
function* keepSmall(word, context) {
if (word.length < context.meanLength.lookup()) {
yield word;
// This is the context that will be passed as a second argument.
{ meanLength: pardo.singletonSideInput(meanLengthPColl) }
// [END model_pardo_side_input]
smallWords.apply(assertDeepEqual(["a", "bb"]));
it("model_multiple_output", async function () {
await beam.createRunner().run((root: beam.Root) => {
const words = root.apply(beam.create(["a", "bb", "cccccc", "xx"]));
function isMarkedWord(word) {
return word.includes("x");
// [START model_multiple_output_dofn]
const to_split = words.flatMap(function* (word) {
if (word.length < 5) {
yield { below: word };
} else {
yield { above: word };
if (isMarkedWord(word)) {
yield { marked: word };
// [END model_multiple_output_dofn]
// [START model_multiple_output]
const { below, above, marked } = to_split.apply(
beam.split(["below", "above", "marked"])
// [END model_multiple_output]
below.apply(assertDeepEqual(["a", "bb", "xx"]));
it("other_params", async function () {
await beam.createRunner().run((root: beam.Root) => {
const pcoll = root.apply(beam.create(["a", "b", "c"]));
// [START timestamp_param]
function processFn(element, context) {
return context.timestamp.lookup();
}, { timestamp: pardo.timestampParam() });
// [END timestamp_param]
function processFn(element, context) {}
// [START window_param], { timestamp: pardo.windowParam() });
// [END window_param]
// [START pane_info_param], { timestamp: pardo.paneInfoParam() });
// [END pane_info_param]
it("countwords_composite", async function () {
await beam.createRunner().run((root: beam.Root) => {
const lines = root.apply(beam.create(["a b", "b c c c"]));
// [START countwords_composite]
function countWords(lines: PCollection<string>) {
return lines //
.map((s: string) => s.toLowerCase())
.flatMap(function* splitWords(line: string) {
yield* line.split(/[^a-z]+/);
const counted = lines.apply(countWords);
// [END countwords_composite]
{ element: "a", count: 1 },
{ element: "b", count: 2 },
{ element: "c", count: 3 },