blob: a0922911c173839905f065e421bc78fda0467382 [file] [log] [blame]
{
"license": [
"Licensed to the Apache Software Foundation (ASF) under one",
"or more contributor license agreements. See the NOTICE file",
"distributed with this work for additional information",
"regarding copyright ownership. The ASF licenses this file",
"to you under the Apache License, Version 2.0 (the",
"\"License\"); you may not use this file except in compliance",
"with the License. You may obtain a copy of the License at",
"",
" http://www.apache.org/licenses/LICENSE-2.0",
"",
"Unless required by applicable law or agreed to in writing,",
"software distributed under the License is distributed on an",
"\"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY",
"KIND, either express or implied. See the License for the",
"specific language governing permissions and limitations",
"under the License."
],
"nbformat": 4,
"cells": [
{
"source": [
"<a href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/get-started/try-apache-beam-go.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
],
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "view-in-github"
}
},
{
"source": [
"# Try Apache Beam - Go\n",
"\n",
"In this notebook, we set up your development environment and work through a simple example using the [DirectRunner](https://beam.apache.org/documentation/runners/direct/). You can explore other runners with the [Beam Capatibility Matrix](https://beam.apache.org/documentation/runners/capability-matrix/).\n",
"\n",
"To navigate through different sections, use the table of contents. From **View** drop-down list, select **Table of contents**.\n",
"\n",
"To run a code cell, you can click the **Run cell** button at the top left of the cell, or by select it and press **`Shift+Enter`**. Try modifying a code cell and re-running it to see what happens.\n",
"\n",
"To learn more about Colab, see [Welcome to Colaboratory!](https://colab.sandbox.google.com/notebooks/welcome.ipynb)."
],
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "lNKIMlEDZ_Vw"
}
},
{
"source": [
"# Setup\n",
"\n",
"First, you need to set up your environment."
],
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "Fz6KSQ13_3Rr"
}
},
{
"source": [
"import os\n",
"\n",
"# Run and print a shell command.\n",
"def run(cmd):\n",
" print('>> {}'.format(cmd))\n",
" !{cmd}\n",
" print('')\n",
"\n",
"# Change directory to $HOME.\n",
"print(f\"Changing directory to $HOME: {os.environ['HOME']}\\n\")\n",
"os.chdir(os.environ['HOME'])\n",
"\n",
"# Copy the input file into the local filesystem.\n",
"run('mkdir -p data')\n",
"run('gsutil cp gs://dataflow-samples/shakespeare/kinglear.txt data/')"
],
"cell_type": "code",
"execution_count": 1,
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"Changing directory to $HOME: /root\n",
"\n",
">> mkdir -p data\n",
"\n",
">> gsutil cp gs://dataflow-samples/shakespeare/kinglear.txt data/\n",
"Copying gs://dataflow-samples/shakespeare/kinglear.txt...\n",
"/ [1 files][153.6 KiB/153.6 KiB] \n",
"Operation completed over 1 objects/153.6 KiB. \n",
"\n"
]
}
],
"metadata": {
"outputId": "54640caa-f322-4b1e-c124-bfb274a70b56",
"colab_type": "code",
"id": "GOOk81Jj_yUy",
"colab": {
"base_uri": "https://localhost:8080/",
"height": 170
}
}
},
{
"source": [
"## Installing development tools\n",
"\n",
"Let's start by installing Go. This will take a while, so feel free to go for a walk or do some stretching."
],
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "bgSegMTHlSqb"
}
},
{
"source": [
"# Update and upgrade the system before installing anything else.\n",
"run('apt-get update > /dev/null')\n",
"run('apt-get upgrade > /dev/null')\n",
"\n",
"# Install the Go package.\n",
"run('apt-get install golang-go > /dev/null')\n",
"\n",
"# Check the Go version to see if everything is working well.\n",
"run('go version')\n",
"\n",
"# Finally, let's install the Apache Beam SDK for Go.\n",
"run('go get -u github.com/apache/beam/sdks/go/...')"
],
"cell_type": "code",
"execution_count": 2,
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
">> apt-get update > /dev/null\n",
"\n",
">> apt-get upgrade > /dev/null\n",
"\n",
">> apt-get install golang-go > /dev/null\n",
"\n",
">> go version\n",
"go version go1.10.4 linux/amd64\n",
"\n",
">> go get -u github.com/apache/beam/sdks/go/...\n",
"\n"
]
}
],
"metadata": {
"outputId": "60d90f8a-a949-4584-c824-f671cd241d5d",
"colab_type": "code",
"id": "ibHfVnpolP3b",
"colab": {
"base_uri": "https://localhost:8080/",
"height": 204
}
}
},
{
"source": [
"## Creating the directory structure\n",
"\n",
"Go requires all packages to be contained within the `GOPATH`. By default it is located in `$HOME/go`, you can check yours using the `go env GOPATH` command.\n",
"\n",
"Inside the `GOPATH` there should be a `src` directory that holds up all the packages, and a `bin` directory will be created containing all the compiled binaries.\n",
"\n",
"To learn more about Go's directory structure, see [How to Write Go Code](https://golang.org/doc/code.html)."
],
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "ungfz1aFqYnn"
}
},
{
"source": [
"# Get the GOPATH.\n",
"cmd_stdout = !go env GOPATH\n",
"GOPATH = cmd_stdout[0]\n",
"print(f\"GOPATH={GOPATH}\\n\")\n",
"\n",
"# Create our source code wordcount package.\n",
"run(f\"mkdir -p {GOPATH}/src/wordcount\")"
],
"cell_type": "code",
"execution_count": 3,
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"GOPATH=/root/go\n",
"\n",
">> mkdir -p /root/go/src/wordcount\n",
"\n"
]
}
],
"metadata": {
"outputId": "8171ef7c-9498-4cb7-9b55-6a61fca3ed14",
"colab_type": "code",
"id": "yU3F1Snrt6Nv",
"colab": {
"base_uri": "https://localhost:8080/",
"height": 85
}
}
},
{
"source": [
"# Minimal word count\n",
"\n",
"The following example is the \"Hello, World!\" of data processing, a basic implementation of word count. We're creating a simple data processing pipeline that reads a text file and counts the number of occurrences of every word.\n",
"\n",
"There are many scenarios where all the data does not fit in memory. Notice that the outputs of the pipeline go to the file system, which allows for large processing jobs in distributed environments."
],
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "cPvvFB19uXNw"
}
},
{
"source": [
"## wordcount.go"
],
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "3MUaWD4Dm5NB"
}
},
{
"source": [
"%%writefile go/src/wordcount/wordcount.go\n",
"\n",
"package main\n",
"\n",
"import (\n",
"\t\"context\"\n",
" \t\"flag\"\n",
"\t\"fmt\"\n",
"\t\"regexp\"\n",
"\n",
"\t\"github.com/apache/beam/sdks/go/pkg/beam\"\n",
"\t\"github.com/apache/beam/sdks/go/pkg/beam/io/textio\"\n",
"\t\"github.com/apache/beam/sdks/go/pkg/beam/runners/direct\"\n",
"\t\"github.com/apache/beam/sdks/go/pkg/beam/transforms/stats\"\n",
"\n",
"\t_ \"github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local\"\n",
")\n",
"\n",
"var (\n",
"\tinput = flag.String(\"input\", \"data/*\", \"File(s) to read.\")\n",
"\toutput = flag.String(\"output\", \"outputs/wordcounts.txt\", \"Output filename.\")\n",
")\n",
"\n",
"var wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)\n",
"\n",
"func main() {\n",
" flag.Parse()\n",
"\n",
"\tbeam.Init()\n",
"\n",
"\tpipeline := beam.NewPipeline()\n",
"\troot := pipeline.Root()\n",
"\n",
"\tlines := textio.Read(root, *input)\n",
"\twords := beam.ParDo(root, func(line string, emit func(string)) {\n",
"\t\tfor _, word := range wordRE.FindAllString(line, -1) {\n",
"\t\t\temit(word)\n",
"\t\t}\n",
"\t}, lines)\n",
"\tcounted := stats.Count(root, words)\n",
"\tformatted := beam.ParDo(root, func(word string, count int) string {\n",
"\t\treturn fmt.Sprintf(\"%s: %v\", word, count)\n",
"\t}, counted)\n",
"\ttextio.Write(root, *output, formatted)\n",
"\n",
"\tdirect.Execute(context.Background(), pipeline)\n",
"}"
],
"cell_type": "code",
"execution_count": 4,
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"Writing go/src/wordcount/wordcount.go\n"
]
}
],
"metadata": {
"outputId": "cdb8f492-e653-4daa-dd1d-e2c3c257dd0e",
"colab_type": "code",
"id": "oUqfqWyMuIfR",
"colab": {
"base_uri": "https://localhost:8080/",
"height": 34
}
}
},
{
"source": [
"## Building and running\n",
"\n",
"Go allows us to run a program without having to explicitly compile it. Internally it will compile the source code into a binary and then run it."
],
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "to3rfuOhq0i3"
}
},
{
"source": [
"# Build and run the program.\n",
"run('rm -rf outputs/')\n",
"run(f\"go run {GOPATH}/src/wordcount/*.go\")\n",
"\n",
"# Sample the first 20 results, remember there are no ordering guarantees.\n",
"run('head -n 20 outputs/*')"
],
"cell_type": "code",
"execution_count": 5,
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
">> rm -rf outputs/\n",
"\n",
">> go run /root/go/src/wordcount/*.go\n",
"2019/03/04 23:05:37 Executing pipeline with the direct runner.\n",
"2019/03/04 23:05:37 Pipeline:\n",
"2019/03/04 23:05:37 Nodes: {1: []uint8/bytes GLO}\n",
"{2: string/string[string] GLO}\n",
"{3: string/string[string] GLO}\n",
"{4: string/string[string] GLO}\n",
"{5: string/string[string] GLO}\n",
"{6: KV<string,int>/KV<string[string],int[varintz]> GLO}\n",
"{7: CoGBK<string,int>/CoGBK<string[string],int[varintz]> GLO}\n",
"{8: KV<string,int>/KV<string[string],int[varintz]> GLO}\n",
"{9: string/string[string] GLO}\n",
"{10: KV<int,string>/KV<int[varintz],string[string]> GLO}\n",
"{11: CoGBK<int,string>/CoGBK<int[varintz],string[string]> GLO}\n",
"Edges: 1: Impulse [] -> [Out: []uint8 -> {1: []uint8/bytes GLO}]\n",
"2: ParDo [In(Main): []uint8 <- {1: []uint8/bytes GLO}] -> [Out: T -> {2: string/string[string] GLO}]\n",
"3: ParDo [In(Main): string <- {2: string/string[string] GLO}] -> [Out: string -> {3: string/string[string] GLO}]\n",
"4: ParDo [In(Main): string <- {3: string/string[string] GLO}] -> [Out: string -> {4: string/string[string] GLO}]\n",
"5: ParDo [In(Main): string <- {4: string/string[string] GLO}] -> [Out: string -> {5: string/string[string] GLO}]\n",
"6: ParDo [In(Main): T <- {5: string/string[string] GLO}] -> [Out: KV<T,int> -> {6: KV<string,int>/KV<string[string],int[varintz]> GLO}]\n",
"7: CoGBK [In(Main): KV<string,int> <- {6: KV<string,int>/KV<string[string],int[varintz]> GLO}] -> [Out: CoGBK<string,int> -> {7: CoGBK<string,int>/CoGBK<string[string],int[varintz]> GLO}]\n",
"8: Combine [In(Main): int <- {7: CoGBK<string,int>/CoGBK<string[string],int[varintz]> GLO}] -> [Out: KV<string,int> -> {8: KV<string,int>/KV<string[string],int[varintz]> GLO}]\n",
"9: ParDo [In(Main): KV<string,int> <- {8: KV<string,int>/KV<string[string],int[varintz]> GLO}] -> [Out: string -> {9: string/string[string] GLO}]\n",
"10: ParDo [In(Main): T <- {9: string/string[string] GLO}] -> [Out: KV<int,T> -> {10: KV<int,string>/KV<int[varintz],string[string]> GLO}]\n",
"11: CoGBK [In(Main): KV<int,string> <- {10: KV<int,string>/KV<int[varintz],string[string]> GLO}] -> [Out: CoGBK<int,string> -> {11: CoGBK<int,string>/CoGBK<int[varintz],string[string]> GLO}]\n",
"12: ParDo [In(Main): CoGBK<int,string> <- {11: CoGBK<int,string>/CoGBK<int[varintz],string[string]> GLO}] -> []\n",
"2019/03/04 23:05:37 Plan[plan]:\n",
"14: Impulse[0]\n",
"1: ParDo[textio.writeFileFn] Out:[]\n",
"2: CoGBK. Out:1\n",
"3: Inject[0]. Out:2\n",
"4: ParDo[beam.addFixedKeyFn] Out:[3]\n",
"5: ParDo[main.main.func2] Out:[4]\n",
"6: Combine[stats.sumIntFn] Keyed:false Out:5\n",
"7: CoGBK. Out:6\n",
"8: Inject[0]. Out:7\n",
"9: ParDo[stats.mapFn] Out:[8]\n",
"10: ParDo[main.main.func1] Out:[9]\n",
"11: ParDo[textio.readFn] Out:[10]\n",
"12: ParDo[textio.expandFn] Out:[11]\n",
"13: ParDo[beam.createFn] Out:[12]\n",
"2019/03/04 23:05:37 Reading from data/kinglear.txt\n",
"2019/03/04 23:05:37 Writing to outputs/wordcounts.txt\n",
"\n",
">> head -n 20 outputs/*\n",
"breeding: 3\n",
"alas: 1\n",
"condition: 2\n",
"whole: 1\n",
"rarity: 1\n",
"hoping: 1\n",
"oath: 4\n",
"pretence: 2\n",
"beastly: 1\n",
"chide: 1\n",
"mile: 1\n",
"Villain: 1\n",
"preach: 1\n",
"rescue: 1\n",
"Alarum: 2\n",
"loath: 1\n",
"clotpoll: 1\n",
"shortly: 2\n",
"alack: 3\n",
"What: 75\n",
"\n"
]
}
],
"metadata": {
"outputId": "bb7e1169-e080-4d80-deb7-11af178a5230",
"colab_type": "code",
"id": "0FbGD-rpocgx",
"colab": {
"base_uri": "https://localhost:8080/",
"height": 1193
}
}
},
{
"source": [
"# Word count with comments\n",
"\n",
"Below is mostly the same code as above, but with comments explaining every line in more detail."
],
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "k-HubCrk-h_G"
}
},
{
"source": [
"%%writefile go/src/wordcount/wordcount.go\n",
"\n",
"package main\n",
"\n",
"import (\n",
"\t\"context\"\n",
" \"flag\"\n",
"\t\"fmt\"\n",
"\t\"regexp\"\n",
"\n",
"\t\"github.com/apache/beam/sdks/go/pkg/beam\"\n",
"\t\"github.com/apache/beam/sdks/go/pkg/beam/io/textio\"\n",
"\t\"github.com/apache/beam/sdks/go/pkg/beam/runners/direct\"\n",
"\t\"github.com/apache/beam/sdks/go/pkg/beam/transforms/stats\"\n",
"\n",
"\t_ \"github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local\"\n",
")\n",
"\n",
"var (\n",
"\tinput = flag.String(\"input\", \"data/*\", \"File(s) to read.\")\n",
"\toutput = flag.String(\"output\", \"outputs/wordcounts.txt\", \"Output filename.\")\n",
")\n",
"\n",
"var wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)\n",
"\n",
"func main() {\n",
" flag.Parse()\n",
"\n",
"\tbeam.Init()\n",
"\n",
"\tpipeline := beam.NewPipeline()\n",
"\troot := pipeline.Root()\n",
"\n",
" // Read lines from a text file.\n",
"\tlines := textio.Read(root, *input)\n",
"\n",
" // Use a regular expression to iterate over all words in the line.\n",
"\twords := beam.ParDo(root, func(line string, emit func(string)) {\n",
"\t\tfor _, word := range wordRE.FindAllString(line, -1) {\n",
"\t\t\temit(word)\n",
"\t\t}\n",
"\t}, lines)\n",
"\n",
" // Count each unique word.\n",
"\tcounted := stats.Count(root, words)\n",
"\n",
" // Format the results into a string so we can write them to a file.\n",
"\tformatted := beam.ParDo(root, func(word string, count int) string {\n",
"\t\treturn fmt.Sprintf(\"%s: %v\", word, count)\n",
"\t}, counted)\n",
"\n",
" // Finally, write the results to a file.\n",
"\ttextio.Write(root, *output, formatted)\n",
"\n",
" // We have to explicitly run the pipeline, otherwise it's only a definition.\n",
"\tdirect.Execute(context.Background(), pipeline)\n",
"}"
],
"cell_type": "code",
"execution_count": 6,
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"Overwriting go/src/wordcount/wordcount.go\n"
]
}
],
"metadata": {
"outputId": "6242a213-effd-4cf9-aaba-caf3ebf72ef2",
"colab_type": "code",
"id": "x_D7sxUHFzUp",
"colab": {
"base_uri": "https://localhost:8080/",
"height": 34
}
}
},
{
"source": [
"# Build and run the program.\n",
"run('rm -rf outputs/')\n",
"run('go run go/src/wordcount/*.go 2>/dev/null')\n",
"\n",
"# Sample the first 20 results, remember there are no ordering guarantees.\n",
"run('head -n 20 outputs/*')"
],
"cell_type": "code",
"execution_count": 7,
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
">> rm -rf outputs/\n",
"\n",
">> go run go/src/wordcount/*.go 2>/dev/null\n",
"\n",
">> head -n 20 outputs/*\n",
"hawthorn: 2\n",
"With: 31\n",
"vain: 3\n",
"football: 1\n",
"showest: 1\n",
"rarest: 1\n",
"Acquaint: 1\n",
"Bids: 1\n",
"another: 9\n",
"tadpole: 1\n",
"Oppressed: 1\n",
"Revoke: 1\n",
"images: 1\n",
"lameness: 1\n",
"Instantly: 1\n",
"rages: 1\n",
"Neither: 1\n",
"quest: 1\n",
"mills: 1\n",
"weapons: 1\n",
"\n"
]
}
],
"metadata": {
"outputId": "78f66139-06df-4991-f19c-1d13803980bc",
"colab_type": "code",
"id": "H620PSl46wDK",
"colab": {
"base_uri": "https://localhost:8080/",
"height": 459
}
}
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"name": "python3"
},
"colab": {
"include_colab_link": true,
"name": "Try Apache Beam - Go",
"toc_visible": true,
"provenance": [],
"collapsed_sections": [],
"version": "0.3.2"
}
},
"nbformat_minor": 0
}