blob: 05bf3d8325075eaae4dfb88a0ac2126d1f98ca30 [file] [log] [blame]
"nbformat": 4,
"nbformat_minor": 0,
"metadata": {
"colab": {
"name": "Reading and writing data -- Tour of Beam",
"provenance": [],
"collapsed_sections": [],
"toc_visible": true
"kernelspec": {
"name": "python3",
"display_name": "Python 3"
"cells": [
"cell_type": "code",
"metadata": {
"cellView": "form",
"id": "upmJn_DjcThx"
"source": [
"#@title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n",
"# Licensed to the Apache Software Foundation (ASF) under one\n",
"# or more contributor license agreements. See the NOTICE file\n",
"# distributed with this work for additional information\n",
"# regarding copyright ownership. The ASF licenses this file\n",
"# to you under the Apache License, Version 2.0 (the\n",
"# \"License\"); you may not use this file except in compliance\n",
"# with the License. You may obtain a copy of the License at\n",
"# Unless required by applicable law or agreed to in writing,\n",
"# software distributed under the License is distributed on an\n",
"# KIND, either express or implied. See the License for the\n",
"# specific language governing permissions and limitations\n",
"# under the License."
"execution_count": null,
"outputs": []
"cell_type": "markdown",
"metadata": {
"id": "5UC_aGanx6oE"
"source": [
"# Reading and writing data -- _Tour of Beam_\n",
"So far we've learned some of the basic transforms like\n",
"[`Combine`](, and\n",
"These allow us to transform data in any way, but so far we've used\n",
"to get data from an in-memory\n",
"[`iterable`](, like a `list`.\n",
"This works well for experimenting with small datasets. For larger datasets we can use `Source` transforms to read data and `Sink` transforms to write data.\n",
"If there are no built-in `Source` or `Sink` transforms, we can also easily create our custom I/O transforms.\n",
"Let's create some data files and see how we can read them in Beam."
"cell_type": "code",
"metadata": {
"id": "R_Yhoc6N_Flg",
"outputId": "94144efc-6b65-4eb7-ea72-23fe3e192428",
"colab": {
"base_uri": "https://localhost:8080/"
"source": [
"# Install apache-beam with pip.\n",
"!pip install --quiet apache-beam\n",
"# Create a directory for our data files.\n",
"!mkdir -p data"
"execution_count": 1,
"outputs": [
"output_type": "stream",
"text": [
"\u001b[K |████████████████████████████████| 9.0MB 7.4MB/s \n",
"\u001b[K |████████████████████████████████| 153kB 40.0MB/s \n",
"\u001b[K |████████████████████████████████| 17.7MB 236kB/s \n",
"\u001b[K |████████████████████████████████| 61kB 7.0MB/s \n",
"\u001b[K |████████████████████████████████| 61kB 6.9MB/s \n",
"\u001b[K |████████████████████████████████| 2.3MB 40.2MB/s \n",
"\u001b[K |████████████████████████████████| 829kB 40.3MB/s \n",
"\u001b[K |████████████████████████████████| 112kB 44.7MB/s \n",
"\u001b[?25h Building wheel for dill ( ... \u001b[?25l\u001b[?25hdone\n",
" Building wheel for avro-python3 ( ... \u001b[?25l\u001b[?25hdone\n",
" Building wheel for future ( ... \u001b[?25l\u001b[?25hdone\n",
"\u001b[31mERROR: multiprocess has requirement dill>=0.3.3, but you'll have dill which is incompatible.\u001b[0m\n",
"\u001b[31mERROR: google-colab 1.0.0 has requirement requests~=2.23.0, but you'll have requests 2.25.1 which is incompatible.\u001b[0m\n",
"\u001b[31mERROR: datascience 0.10.6 has requirement folium==0.2.1, but you'll have folium 0.8.3 which is incompatible.\u001b[0m\n"
"name": "stdout"
"cell_type": "code",
"metadata": {
"id": "sQUUi4H9s-g2",
"outputId": "da02ce82-61ce-43e2-a5f0-21b06df76bd6",
"colab": {
"base_uri": "https://localhost:8080/"
"source": [
"%%writefile data/my-text-file-1.txt\n",
"This is just a plain text file, UTF-8 strings are allowed 🎉.\n",
"Each line in the file is one element in the PCollection."
"execution_count": 2,
"outputs": [
"output_type": "stream",
"text": [
"Writing data/my-text-file-1.txt\n"
"name": "stdout"
"cell_type": "code",
"metadata": {
"id": "BWVVeTSOlKug",
"outputId": "2f0ad045-0af3-4ca7-f5a3-1f348b5c1517",
"colab": {
"base_uri": "https://localhost:8080/"
"source": [
"%%writefile data/my-text-file-2.txt\n",
"There are no guarantees on the order of the elements.\n",
"execution_count": 3,
"outputs": [
"output_type": "stream",
"text": [
"Writing data/my-text-file-2.txt\n"
"name": "stdout"
"cell_type": "code",
"metadata": {
"id": "NhCws6ncbDJG",
"outputId": "0b5e0dc6-43ee-4786-c1d5-984fd422e445",
"colab": {
"base_uri": "https://localhost:8080/"
"source": [
"%%writefile data/penguins.csv\n",
"execution_count": 4,
"outputs": [
"output_type": "stream",
"text": [
"Writing data/penguins.csv\n"
"name": "stdout"
"cell_type": "markdown",
"metadata": {
"id": "_OkWHiAvpWDZ"
"source": [
"# Reading from text files\n",
"We can use the\n",
"transform to read text files into `str` elements.\n",
"It takes a\n",
"[_glob pattern_](\n",
"as an input, and reads all the files that match that pattern.\n",
"It returns one element for each line in the file.\n",
"For example, in the pattern `data/*.txt`, the `*` is a wildcard that matches anything. This pattern matches all the files in the `data/` directory with a `.txt` extension."
"cell_type": "code",
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
"id": "xDXdE9uysriw",
"outputId": "3ef9f7a9-8291-42a1-be03-71e50de266f5"
"source": [
"import apache_beam as beam\n",
"input_files = 'data/*.txt'\n",
"with beam.Pipeline() as pipeline:\n",
" (\n",
" pipeline\n",
" | 'Read files' >>\n",
" | 'Print contents' >> beam.Map(print)\n",
" )"
"execution_count": 8,
"outputs": [
"output_type": "stream",
"text": [
"This is just a plain text file, UTF-8 strings are allowed 🎉.\n",
"Each line in the file is one element in the PCollection.\n",
"There are no guarantees on the order of the elements.\n",
"name": "stdout"
"cell_type": "markdown",
"metadata": {
"id": "9-2wmzEWsdrb"
"source": [
"# Writing to text files\n",
"We can use the\n",
"[`WriteToText`]( transform to write `str` elements into text files.\n",
"It takes a _file path prefix_ as an input, and it writes the all `str` elements into one or more files with filenames starting with that prefix. You can optionally pass a `file_name_suffix` as well, usually used for the file extension. Each element goes into its own line in the output files."
"cell_type": "code",
"metadata": {
"id": "nkPlfoTfz61I"
"source": [
"import apache_beam as beam\n",
"output_file_name_prefix = 'outputs/file'\n",
"with beam.Pipeline() as pipeline:\n",
" (\n",
" pipeline\n",
" | 'Create file lines' >> beam.Create([\n",
" 'Each element must be a string.',\n",
" 'It writes one element per line.',\n",
" 'There are no guarantees on the line order.',\n",
" 'The data might be written into multiple files.',\n",
" ])\n",
" | 'Write to files' >>\n",
" output_file_name_prefix,\n",
" file_name_suffix='.txt')\n",
" )"
"execution_count": 6,
"outputs": []
"cell_type": "code",
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
"id": "8au0yJSd1itt",
"outputId": "4822458b-2724-42e9-c71f-280a82d505d6"
"source": [
"# Lets look at the output files and contents.\n",
"!head outputs/file*.txt"
"execution_count": 7,
"outputs": [
"output_type": "stream",
"text": [
"Each element must be a string.\n",
"It writes one element per line.\n",
"There are no guarantees on the line order.\n",
"The data might be written into multiple files.\n"
"name": "stdout"
"cell_type": "markdown",
"metadata": {
"id": "21CCdZispqYK"
"source": [
"# Reading data\n",
"Your data might reside in various input formats. Take a look at the\n",
"[Built-in I/O Transforms](\n",
"page for a list of all the available I/O transforms in Beam.\n",
"If none of those work for you, you might need to create your own input transform.\n",
"> ℹ️ For a more in-depth guide, take a look at the\n",
"[Developing a new I/O connector]( page."
"cell_type": "markdown",
"metadata": {
"id": "7dQEym1QRG4y"
"source": [
"## Reading from an `iterable`\n",
"The easiest way to create elements is using\n",
"A common way is having a [`generator`]( function. This could take an input and _expand_ it into a large amount of elements. The nice thing about `generator`s is that they don't have to fit everything into memory like a `list`, they simply\n",
"elements as they process them.\n",
"For example, let's define a `generator` called `count`, that `yield`s the numbers from `0` to `n`. We use `Create` for the initial `n` value(s) and then exapand them with `FlatMap`."
"cell_type": "code",
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
"id": "wR6WY6wOMVhb",
"outputId": "f90ae0a1-0cb4-4f25-fa93-10f9de856a95"
"source": [
"import apache_beam as beam\n",
"from typing import Iterable\n",
"def count(n: int) -> Iterable[int]:\n",
" for i in range(n):\n",
" yield i\n",
"n = 5\n",
"with beam.Pipeline() as pipeline:\n",
" (\n",
" pipeline\n",
" | 'Create inputs' >> beam.Create([n])\n",
" | 'Generate elements' >> beam.FlatMap(count)\n",
" | 'Print elements' >> beam.Map(print)\n",
" )"
"execution_count": 9,
"outputs": [
"output_type": "stream",
"text": [
"name": "stdout"
"cell_type": "markdown",
"metadata": {
"id": "G4fw7NE1RQNf"
"source": [
"## Creating an input transform\n",
"For a nicer interface, we could abstract the `Create` and the `FlatMap` into a custom `PTransform`. This would give a more intuitive way to use it, while hiding the inner workings.\n",
"We need a new class that inherits from `beam.PTransform`. We can do this more conveniently with the\n",
"[`beam.ptransform_fn`]( decorator.\n",
"The `PTransform` function takes the input `PCollection` as the first argument, and any other inputs from the generator function, like `n`, can be arguments to the `PTransform` as well. The original generator function can be defined locally within the `PTransform`.\n",
"Finally, we apply the `Create` and `FlatMap` transforms and return a new `PCollection`.\n",
"We can also, optionally, add type hints with the [`with_input_types`]( and [`with_output_types`]( decorators. They serve both as documentation, and are a way to ensure your data types are consistent throughout your pipeline. This becomes more useful as the complexity grows.\n",
"Since our `PTransform` is expected to be the first transform in the pipeline, it doesn't receive any inputs. We can mark it as the beginning with the [`PBegin`]( type hint.\n",
"Finally, to enable type checking, you can pass `--type_check_additional=all` when running your pipeline. Alternatively, you can also pass it directly to `PipelineOptions` if you want them enabled by default. To learn more about pipeline options, see [Configuring pipeline options]("
"cell_type": "code",
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
"id": "m8iXqE1CRnn5",
"outputId": "d77fd363-76eb-49ce-8729-82d6cd38cfda"
"source": [
"import apache_beam as beam\n",
"from apache_beam.options.pipeline_options import PipelineOptions\n",
"from typing import Iterable\n",
"def Count(pbegin: beam.pvalue.PBegin, n: int) -> beam.PCollection[int]:\n",
" def count(n: int) -> Iterable[int]:\n",
" for i in range(n):\n",
" yield i\n",
" return (\n",
" pbegin\n",
" | 'Create inputs' >> beam.Create([n])\n",
" | 'Generate elements' >> beam.FlatMap(count)\n",
" )\n",
"n = 5\n",
"options = PipelineOptions(flags=[], type_check_additional='all')\n",
"with beam.Pipeline(options=options) as pipeline:\n",
" (\n",
" pipeline\n",
" | f'Count to {n}' >> Count(n)\n",
" | 'Print elements' >> beam.Map(print)\n",
" )"
"execution_count": 15,
"outputs": [
"output_type": "stream",
"text": [
"name": "stdout"
"cell_type": "markdown",
"metadata": {
"id": "e02_vFmUg-mK"
"source": [
"## Example: Reading CSV files\n",
"Lets say we want to read CSV files to get elements as Python dictionaries. We like how `ReadFromText` expands a file pattern, but we might want to allow for multiple patterns as well.\n",
"We create a `ReadCsvFiles` transform, which takes a list of `file_patterns` as input. It expands all the `glob` patterns, and then, for each file name it reads each row as a `dict` using the\n",
"[`csv.DictReader`]( module.\n",
"We could use the [`open`]( function to open a local file, but Beam already supports several different file systems besides local files.\n",
"To leverage that, we can use the [``]( module.\n",
"> ℹ️ The [`open`](\n",
"> function from the Beam filesystem reads bytes,\n",
"> it's roughly equivalent to opening a file in `rb` mode.\n",
"> To write a file, you would use\n",
"> [`create`]( instead."
"cell_type": "code",
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
"id": "ywVbJxegaZbo",
"outputId": "8dd0fdf3-43e8-47db-8442-ed9e88ef6c95"
"source": [
"import apache_beam as beam\n",
"from import FileSystems as beam_fs\n",
"from apache_beam.options.pipeline_options import PipelineOptions\n",
"import codecs\n",
"import csv\n",
"from typing import Dict, Iterable, List\n",
"@beam.typehints.with_output_types(Dict[str, str])\n",
"def ReadCsvFiles(pbegin: beam.pvalue.PBegin, file_patterns: List[str]) -> beam.PCollection[Dict[str, str]]:\n",
" def expand_pattern(pattern: str) -> Iterable[str]:\n",
" for match_result in beam_fs.match([pattern])[0].metadata_list:\n",
" yield match_result.path\n",
" def read_csv_lines(file_name: str) -> Iterable[Dict[str, str]]:\n",
" with as f:\n",
" # Beam reads files as bytes, but csv expects strings,\n",
" # so we need to decode the bytes into utf-8 strings.\n",
" for row in csv.DictReader(codecs.iterdecode(f, 'utf-8')):\n",
" yield dict(row)\n",
" return (\n",
" pbegin\n",
" | 'Create file patterns' >> beam.Create(file_patterns)\n",
" | 'Expand file patterns' >> beam.FlatMap(expand_pattern)\n",
" | 'Read CSV lines' >> beam.FlatMap(read_csv_lines)\n",
" )\n",
"input_patterns = ['data/*.csv']\n",
"options = PipelineOptions(flags=[], type_check_additional='all')\n",
"with beam.Pipeline(options=options) as pipeline:\n",
" (\n",
" pipeline\n",
" | 'Read CSV files' >> ReadCsvFiles(input_patterns)\n",
" | 'Print elements' >> beam.Map(print)\n",
" )"
"execution_count": 16,
"outputs": [
"output_type": "stream",
"text": [
"{'species': '0', 'culmen_length_mm': '0.2545454545454545', 'culmen_depth_mm': '0.6666666666666666', 'flipper_length_mm': '0.15254237288135594', 'body_mass_g': '0.2916666666666667'}\n",
"{'species': '0', 'culmen_length_mm': '0.26909090909090905', 'culmen_depth_mm': '0.5119047619047618', 'flipper_length_mm': '0.23728813559322035', 'body_mass_g': '0.3055555555555556'}\n",
"{'species': '1', 'culmen_length_mm': '0.5236363636363636', 'culmen_depth_mm': '0.5714285714285713', 'flipper_length_mm': '0.3389830508474576', 'body_mass_g': '0.2222222222222222'}\n",
"{'species': '1', 'culmen_length_mm': '0.6509090909090909', 'culmen_depth_mm': '0.7619047619047619', 'flipper_length_mm': '0.4067796610169492', 'body_mass_g': '0.3333333333333333'}\n",
"{'species': '2', 'culmen_length_mm': '0.509090909090909', 'culmen_depth_mm': '0.011904761904761862', 'flipper_length_mm': '0.6610169491525424', 'body_mass_g': '0.5'}\n",
"{'species': '2', 'culmen_length_mm': '0.6509090909090909', 'culmen_depth_mm': '0.38095238095238104', 'flipper_length_mm': '0.9830508474576272', 'body_mass_g': '0.8333333333333334'}\n"
"name": "stdout"
"cell_type": "markdown",
"metadata": {
"id": "ZyzB_RO9Vs1D"
"source": [
"## Example: Reading from a SQLite database\n",
"Lets begin by creating a small SQLite local database file.\n",
"Run the _\"Creating the SQLite database\"_ cell to create a new SQLite3 database with the filename you choose. You can double-click it to see the source code if you want."
"cell_type": "code",
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
"id": "EJ58A0AoV02o",
"cellView": "form",
"outputId": "f932e834-8d65-4ddc-a4f8-1fc825c30b41"
"source": [
"#@title Creating the SQLite database\n",
"import sqlite3\n",
"database_file = \"moon-phases.db\" #@param {type:\"string\"}\n",
"with sqlite3.connect(database_file) as db:\n",
" cursor = db.cursor()\n",
" # Create the moon_phases table.\n",
" cursor.execute('''\n",
" CREATE TABLE IF NOT EXISTS moon_phases (\n",
" phase_emoji TEXT NOT NULL,\n",
" peak_datetime DATETIME NOT NULL,\n",
" phase TEXT NOT NULL)''')\n",
" # Truncate the table if it's already populated.\n",
" cursor.execute('DELETE FROM moon_phases')\n",
" # Insert some sample data.\n",
" insert_moon_phase = 'INSERT INTO moon_phases(phase_emoji, peak_datetime, phase) VALUES(?, ?, ?)'\n",
" cursor.execute(insert_moon_phase, ('🌕', '2017-12-03 15:47:00', 'Full Moon'))\n",
" cursor.execute(insert_moon_phase, ('🌗', '2017-12-10 07:51:00', 'Last Quarter'))\n",
" cursor.execute(insert_moon_phase, ('🌑', '2017-12-18 06:30:00', 'New Moon'))\n",
" cursor.execute(insert_moon_phase, ('🌓', '2017-12-26 09:20:00', 'First Quarter'))\n",
" cursor.execute(insert_moon_phase, ('🌕', '2018-01-02 02:24:00', 'Full Moon'))\n",
" cursor.execute(insert_moon_phase, ('🌗', '2018-01-08 22:25:00', 'Last Quarter'))\n",
" cursor.execute(insert_moon_phase, ('🌑', '2018-01-17 02:17:00', 'New Moon'))\n",
" cursor.execute(insert_moon_phase, ('🌓', '2018-01-24 22:20:00', 'First Quarter'))\n",
" cursor.execute(insert_moon_phase, ('🌕', '2018-01-31 13:27:00', 'Full Moon'))\n",
" # Query for the data in the table to make sure it's populated.\n",
" cursor.execute('SELECT * FROM moon_phases')\n",
" for row in cursor.fetchall():\n",
" print(row)"
"execution_count": 21,
"outputs": [
"output_type": "stream",
"text": [
"(1, '🌕', '2017-12-03 15:47:00', 'Full Moon')\n",
"(2, '🌗', '2017-12-10 07:51:00', 'Last Quarter')\n",
"(3, '🌑', '2017-12-18 06:30:00', 'New Moon')\n",
"(4, '🌓', '2017-12-26 09:20:00', 'First Quarter')\n",
"(5, '🌕', '2018-01-02 02:24:00', 'Full Moon')\n",
"(6, '🌗', '2018-01-08 22:25:00', 'Last Quarter')\n",
"(7, '🌑', '2018-01-17 02:17:00', 'New Moon')\n",
"(8, '🌓', '2018-01-24 22:20:00', 'First Quarter')\n",
"(9, '🌕', '2018-01-31 13:27:00', 'Full Moon')\n"
"name": "stdout"
"cell_type": "markdown",
"metadata": {
"id": "8y-bRhPVWai6"
"source": [
"We could use a `FlatMap` transform to receive a SQL query and `yield` each result row, but that would mean creating a new database connection for each query. If we generated a large number of queries, creating that many connections could be a bottleneck.\n",
"It would be nice to create the database connection only once for each worker, and every query could use the same connection if needed.\n",
"We can use a\n",
"[custom `DoFn` transform](\n",
"for this. It allows us to open and close resources, like the database connection, only _once_ per `DoFn` _instance_ by using the `setup` and `teardown` methods.\n",
"> ℹ️ It should be safe to _read_ from a database with multiple concurrent processes using the same connection, but only one process should be _writing_ at once."
"cell_type": "code",
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
"id": "Bnpwqr-NV5DF",
"outputId": "5f69a99c-c711-47cf-f13a-c780de57f3e6"
"source": [
"import apache_beam as beam\n",
"from apache_beam.options.pipeline_options import PipelineOptions\n",
"import sqlite3\n",
"from typing import Iterable, List, Tuple\n",
"class SQLiteSelect(beam.DoFn):\n",
" def __init__(self, database_file: str):\n",
" self.database_file = database_file\n",
" self.connection = None\n",
" def setup(self):\n",
" self.connection = sqlite3.connect(self.database_file)\n",
" def process(self, query: Tuple[str, List[str]]) -> Iterable[Dict[str, str]]:\n",
" table, columns = query\n",
" cursor = self.connection.cursor()\n",
" cursor.execute(f\"SELECT {','.join(columns)} FROM {table}\")\n",
" for row in cursor.fetchall():\n",
" yield dict(zip(columns, row))\n",
" def teardown(self):\n",
" self.connection.close()\n",
"@beam.typehints.with_output_types(Dict[str, str])\n",
"def SelectFromSQLite(\n",
" pbegin: beam.pvalue.PBegin,\n",
" database_file: str,\n",
" queries: List[Tuple[str, List[str]]],\n",
") -> beam.PCollection[Dict[str, str]]:\n",
" return (\n",
" pbegin\n",
" | 'Create None' >> beam.Create(queries)\n",
" | 'SQLite SELECT' >> beam.ParDo(SQLiteSelect(database_file))\n",
" )\n",
"queries = [\n",
" # (table_name, [column1, column2, ...])\n",
" ('moon_phases', ['phase_emoji', 'peak_datetime', 'phase']),\n",
" ('moon_phases', ['phase_emoji', 'phase']),\n",
"options = PipelineOptions(flags=[], type_check_additional='all')\n",
"with beam.Pipeline(options=options) as pipeline:\n",
" (\n",
" pipeline\n",
" | 'Read from SQLite' >> SelectFromSQLite(database_file, queries)\n",
" | 'Print rows' >> beam.Map(print)\n",
" )"
"execution_count": 25,
"outputs": [
"output_type": "stream",
"text": [
"{'phase_emoji': '🌕', 'peak_datetime': '2017-12-03 15:47:00', 'phase': 'Full Moon'}\n",
"{'phase_emoji': '🌗', 'peak_datetime': '2017-12-10 07:51:00', 'phase': 'Last Quarter'}\n",
"{'phase_emoji': '🌑', 'peak_datetime': '2017-12-18 06:30:00', 'phase': 'New Moon'}\n",
"{'phase_emoji': '🌓', 'peak_datetime': '2017-12-26 09:20:00', 'phase': 'First Quarter'}\n",
"{'phase_emoji': '🌕', 'peak_datetime': '2018-01-02 02:24:00', 'phase': 'Full Moon'}\n",
"{'phase_emoji': '🌗', 'peak_datetime': '2018-01-08 22:25:00', 'phase': 'Last Quarter'}\n",
"{'phase_emoji': '🌑', 'peak_datetime': '2018-01-17 02:17:00', 'phase': 'New Moon'}\n",
"{'phase_emoji': '🌓', 'peak_datetime': '2018-01-24 22:20:00', 'phase': 'First Quarter'}\n",
"{'phase_emoji': '🌕', 'peak_datetime': '2018-01-31 13:27:00', 'phase': 'Full Moon'}\n",
"{'phase_emoji': '🌕', 'phase': 'Full Moon'}\n",
"{'phase_emoji': '🌗', 'phase': 'Last Quarter'}\n",
"{'phase_emoji': '🌑', 'phase': 'New Moon'}\n",
"{'phase_emoji': '🌓', 'phase': 'First Quarter'}\n",
"{'phase_emoji': '🌕', 'phase': 'Full Moon'}\n",
"{'phase_emoji': '🌗', 'phase': 'Last Quarter'}\n",
"{'phase_emoji': '🌑', 'phase': 'New Moon'}\n",
"{'phase_emoji': '🌓', 'phase': 'First Quarter'}\n",
"{'phase_emoji': '🌕', 'phase': 'Full Moon'}\n"
"name": "stdout"
"cell_type": "markdown",
"metadata": {
"id": "C5Mx_pfNpu_q"
"source": [
"# Writing data\n",
"Your might want to write your data in various output formats. Take a look at the\n",
"[Built-in I/O Transforms](\n",
"page for a list of all the available I/O transforms in Beam.\n",
"If none of those work for you, you might need to create your own output transform.\n",
"> ℹ️ For a more in-depth guide, take a look at the\n",
"[Developing a new I/O connector]( page."
"cell_type": "markdown",
"metadata": {
"id": "FpM368NEhc-q"
"source": [
"## Creating an output transform\n",
"The most straightforward way to write data would be to use a `Map` transform to write each element into our desired output format. In most cases, however, this would result in a lot of overhead creating, connecting to, and/or deleting resources.\n",
"Instead, most data services are optimized to write _batches_ of elements at a time. Batch writes only connects to the service once, and can load many elements at a time.\n",
"Here, we discuss two common ways of batching elements for optimized writes: _fixed-sized batches_, and\n",
"of elements_."
"cell_type": "markdown",
"metadata": {
"id": "5gypFFh4hM48"
"source": [
"## Writing fixed-sized batches\n",
"If the order of the elements _is not_ important, we can simply create fixed-sized batches and write those independently.\n",
"We can use\n",
"to get fixed-sized batches. Note that it expects `(key, value)` pairs. Since `GroupIntoBatches` is an _aggregation_, all the elements in a batch _must_ fit into memory for each worker.\n",
"> ℹ️ `GroupIntoBatches` requires a `(key, value)` pair. For simplicity, this example uses a placeholder `None` key and discards it later. Depending on your data, there might be a key that makes more sense. Using a _balanced_ key, where each key contains around the same number of elements, may help parallelize the batching process.\n",
"Let's create something similar to `WriteToText` but keep it simple with a unique identifier in the file name instead of the file count.\n",
"To write a file using the Beam `filesystems` module, we need to use [`create`](, which writes `bytes` into the file.\n",
"> ℹ️ To read a file instead, use the [`open`](\n",
"> function instead.\n",
"For the output type hint, we can use [`PDone`]( to indicate this is the last transform in a given pipeline."
"cell_type": "code",
"metadata": {
"id": "LcRHXwyT8Rrj"
"source": [
"import apache_beam as beam\n",
"from import FileSystems as beam_fs\n",
"from apache_beam.options.pipeline_options import PipelineOptions\n",
"import os\n",
"import uuid\n",
"from typing import Iterable\n",
"def WriteBatchesToFiles(\n",
" pcollection: beam.PCollection[str],\n",
" file_name_prefix: str,\n",
" file_name_suffix: str = '.txt',\n",
" batch_size: int = 100,\n",
") -> beam.pvalue.PDone:\n",
" def expand_pattern(pattern):\n",
" for match_result in beam_fs.match([pattern])[0].metadata_list:\n",
" yield match_result.path\n",
" def write_file(lines: Iterable[str]):\n",
" file_name = f\"{file_name_prefix}-{uuid.uuid4().hex}{file_name_suffix}\"\n",
" with beam_fs.create(file_name) as f:\n",
" for line in lines:\n",
" f.write(f\"{line}\\n\".encode('utf-8'))\n",
" # Remove existing files matching the output file_name pattern.\n",
" for path in expand_pattern(f\"{file_name_prefix}*{file_name_suffix}\"):\n",
" os.remove(path)\n",
" return (\n",
" pcollection\n",
" # For simplicity we key with `None` and discard it.\n",
" | 'Key with None' >> beam.WithKeys(lambda _: None)\n",
" | 'Group into batches' >> beam.GroupIntoBatches(batch_size)\n",
" | 'Discard key' >> beam.Values()\n",
" | 'Write file' >> beam.Map(write_file)\n",
" )\n",
"output_file_name_prefix = 'outputs/batch'\n",
"options = PipelineOptions(flags=[], type_check_additional='all')\n",
"with beam.Pipeline(options=options) as pipeline:\n",
" (\n",
" pipeline\n",
" | 'Create file lines' >> beam.Create([\n",
" 'Each element must be a string.',\n",
" 'It writes one element per line.',\n",
" 'There are no guarantees on the line order.',\n",
" 'The data might be written into multiple files.',\n",
" ])\n",
" | 'Write batches to files' >> WriteBatchesToFiles(\n",
" file_name_prefix=output_file_name_prefix,\n",
" file_name_suffix='.txt',\n",
" batch_size=3,\n",
" )\n",
" )"
"execution_count": null,
"outputs": []
"cell_type": "code",
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
"id": "CUklk4JtEbft",
"outputId": "adddbd9f-e66d-4def-ba59-1eafccdbe793"
"source": [
"# Lets look at the output files and contents.\n",
"!head outputs/batch*.txt"
"execution_count": 27,
"outputs": [
"output_type": "stream",
"text": [
"==> outputs/batch-30d399fb3f24430db193e8130f439cb0.txt <==\n",
"Each element must be a string.\n",
"It writes one element per line.\n",
"There are no guarantees on the line order.\n",
"==> outputs/batch-ab16a5c2018e4c32b01a5acaa2671fd0.txt <==\n",
"The data might be written into multiple files.\n"
"name": "stdout"
"cell_type": "markdown",
"metadata": {
"id": "hbmPT317hP5K"
"source": [
"## Writing windows of elements\n",
"If the order of the elements _is_ important, we could batch the elements by windows. This could be useful in _streaming_ pipelines, where we have an indefinite number of incoming elements and we would like to write windows as they are being processed.\n",
"> ℹ️ For more information about windows and triggers, check the [Windowing]( page.\n",
"We use a\n",
"[custom `DoFn` transform](\n",
"to extract the window start time and end time.\n",
"We use this for the file names of the output files."
"cell_type": "code",
"metadata": {
"id": "v_qK300FG9js"
"source": [
"import apache_beam as beam\n",
"from import FileSystems as beam_fs\n",
"from apache_beam.options.pipeline_options import PipelineOptions\n",
"from datetime import datetime\n",
"import time\n",
"from typing import Any, Dict\n",
"def unix_time(time_str: str) -> int:\n",
" return time.mktime(time.strptime(time_str, '%Y-%m-%d %H:%M:%S'))\n",
"class WithWindowInfo(beam.DoFn):\n",
" def process(self, element: Any, window=beam.DoFn.WindowParam) -> Iterable[Dict[str, Any]]:\n",
" yield {\n",
" 'element': element,\n",
" 'window_start': window.start.to_utc_datetime(),\n",
" 'window_end': window.end.to_utc_datetime(),\n",
" }\n",
"def WriteWindowsToFiles(\n",
" pcollection: beam.PCollection[str],\n",
" file_name_prefix: str,\n",
" file_name_suffix: str = '.txt',\n",
") -> beam.pvalue.PDone:\n",
" def write_file(batch: Dict[str, Any]):\n",
" start_date = batch['window_start'].date()\n",
" start_time = batch['window_start'].time()\n",
" end_time = batch['window_end'].time()\n",
" file_name = f\"{file_name_prefix}-{start_date}-{start_time}-{end_time}{file_name_suffix}\"\n",
" with beam_fs.create(file_name) as f:\n",
" for x in batch['element']:\n",
" f.write(f\"{x}\\n\".encode('utf-8'))\n",
" return (\n",
" pcollection\n",
" | 'Group all per window' >> beam.GroupBy(lambda _: None)\n",
" | 'Discard key' >> beam.Values()\n",
" | 'Get window info' >> beam.ParDo(WithWindowInfo())\n",
" | 'Write files' >> beam.Map(write_file)\n",
" )\n",
"output_file_name_prefix = 'outputs/window'\n",
"window_size_sec = 5 * 60 # 5 minutes\n",
"options = PipelineOptions(flags=[], type_check_additional='all')\n",
"with beam.Pipeline(options=options) as pipeline:\n",
" (\n",
" pipeline\n",
" | 'Create elements' >> beam.Create([\n",
" {'timestamp': unix_time('2020-03-19 08:49:00'), 'event': 'login'},\n",
" {'timestamp': unix_time('2020-03-19 08:49:20'), 'event': 'view_account'},\n",
" {'timestamp': unix_time('2020-03-19 08:50:00'), 'event': 'view_orders'},\n",
" {'timestamp': unix_time('2020-03-19 08:51:00'), 'event': 'track_order'},\n",
" {'timestamp': unix_time('2020-03-19 09:00:00'), 'event': 'logout'},\n",
" ])\n",
" | 'With timestamps' >> beam.Map(\n",
" lambda x: beam.window.TimestampedValue(x, x['timestamp']))\n",
" | 'Fixed-sized windows' >> beam.WindowInto(\n",
" beam.window.FixedWindows(window_size_sec))\n",
" | 'To string' >> beam.Map(\n",
" lambda x: f\"{datetime.fromtimestamp(x['timestamp'])}: {x['event']}\")\n",
" | 'Write windows to files' >> WriteWindowsToFiles(\n",
" file_name_prefix=output_file_name_prefix,\n",
" file_name_suffix='.txt',\n",
" )\n",
" )"
"execution_count": 28,
"outputs": []
"cell_type": "code",
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
"id": "4QXKKVawTJ2_",
"outputId": "96a84b29-3fd2-46f4-b21b-d3f07daa928b"
"source": [
"# Lets look at the output files and contents.\n",
"!head outputs/window*.txt"
"execution_count": 29,
"outputs": [
"output_type": "stream",
"text": [
"==> outputs/window-2020-03-19-08:45:00-08:50:00.txt <==\n",
"2020-03-19 08:49:00: login\n",
"2020-03-19 08:49:20: view_account\n",
"==> outputs/window-2020-03-19-08:50:00-08:55:00.txt <==\n",
"2020-03-19 08:50:00: view_orders\n",
"2020-03-19 08:51:00: track_order\n",
"==> outputs/window-2020-03-19-09:00:00-09:05:00.txt <==\n",
"2020-03-19 09:00:00: logout\n"
"name": "stdout"
"cell_type": "markdown",
"metadata": {
"id": "gnoz_mWtxSjW"
"source": [
"# What's next?\n",
"* [Programming guide]( -- learn about all the Apache Beam concepts in more depth.\n",
"* [Transform catalog]( -- check out all the available transforms.\n",
"* [Mobile gaming example]( -- learn more about windowing, triggers, and streaming through a complete example pipeline.\n",
"* [Runners]( -- check the available runners, their capabilities, and how to run your pipeline in them."