blob: 0a98f687947f9da8bfdf9dd2a54d32d164270af1 [file] [log] [blame]
{
"cells": [
{
"cell_type": "markdown",
"id": "83acd0be",
"metadata": {},
"source": [
"Licensed under the Apache License, Version 2.0 (the \"License\");\n",
"<!--\n",
" Licensed to the Apache Software Foundation (ASF) under one\n",
" or more contributor license agreements. See the NOTICE file\n",
" distributed with this work for additional information\n",
" regarding copyright ownership. The ASF licenses this file\n",
" to you under the Apache License, Version 2.0 (the\n",
" \"License\"); you may not use this file except in compliance\n",
" with the License. You may obtain a copy of the License at\n",
"\n",
" http://www.apache.org/licenses/LICENSE-2.0\n",
"\n",
" Unless required by applicable law or agreed to in writing,\n",
" software distributed under the License is distributed on an\n",
" \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
" KIND, either express or implied. See the License for the\n",
" specific language governing permissions and limitations\n",
" under the License.\n",
"-->\n"
]
},
{
"cell_type": "markdown",
"id": "5022179a",
"metadata": {},
"source": [
"# Run Beam SQL in notebooks\n",
"\n",
"[Beam SQL](https://beam.apache.org/documentation/dsls/sql/overview/) allows a Beam user to query PCollections with SQL statements. Currently, `InteractiveRunner` does not support `SqlTransform` due to [BEAM-10708](https://issues.apache.org/jira/browse/BEAM-10708). However, a user could use the `beam_sql` magic to run Beam SQL in the notebook and introspect the result.\n",
"\n",
"`beam_sql` is an IPython [custom magic](https://ipython.readthedocs.io/en/stable/config/custommagics.html). If you're not familiar with magics, here are some [built-in examples](https://ipython.readthedocs.io/en/stable/interactive/magics.html). It's a convenient way to validate your queries locally against known/test data sources when prototyping a Beam pipeline with SQL, before productionizing it on remote cluster/services.\n",
"\n",
"First, let's load the `beam_sql` magic:"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c6b6e3c1",
"metadata": {},
"outputs": [],
"source": [
"%load_ext apache_beam.runners.interactive.sql.beam_sql_magics"
]
},
{
"cell_type": "markdown",
"id": "a7c43b84",
"metadata": {},
"source": [
"Since SQL support in Beam Python SDK is implemented through xLang external transform, make sure you have below prerequisites:\n",
"- Have `docker` installed;\n",
"- Have jdk8 or jdk11 installed and $JAVA_HOME set;"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "b280710a",
"metadata": {},
"outputs": [],
"source": [
"!docker image list\n",
"!java --version\n",
"!echo $JAVA_HOME"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "28b1b320",
"metadata": {},
"outputs": [],
"source": [
"# Optionally sets the logging level to reduce distraction.\n",
"import logging\n",
"\n",
"logging.root.setLevel(logging.ERROR)"
]
},
{
"cell_type": "markdown",
"id": "f6b8789f",
"metadata": {},
"source": [
"**Important**: if you're using Beam built from your local source code, additionally:\n",
"\n",
"- Have the Java expansion service shadowjar built. Go to the root directory of your local beam repo and then execute:\n",
" `./gradlew :sdks:java:extensions:sql:expansion-service:shadowJar`.\n",
"- Based on your jdk version, pull the docker image `docker pull apache/beam_java11_sdk` or java17, java21.\n",
"- Then tag the image with your current Beam dev version. You can check the dev version under `apache_beam.version.__version__`. For example, if you're using jdk11 and dev version is `x.x.x.dev`, execute `docker image tag apache/beam_java11_sdk:latest apache/beam_java11_sdk:x.x.x.dev`."
]
},
{
"cell_type": "markdown",
"id": "14c8967d",
"metadata": {},
"source": [
"## Query#1 - A simple static query\n",
"\n",
"The `beam_sql` magic can be used as either a line magic or a cell magic.\n",
"You can check its usage by running:"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c212dd89",
"metadata": {},
"outputs": [],
"source": [
"%beam_sql -h"
]
},
{
"cell_type": "markdown",
"id": "7914c1aa",
"metadata": {},
"source": [
"You can run a simple SQL query (in Apache Calcite SQL [syntax](https://beam.apache.org/documentation/dsls/sql/calcite/query-syntax/)) to create a [schema-aware PCollection](https://beam.apache.org/documentation/programming-guide/#schemas) from static values."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "895341fa",
"metadata": {},
"outputs": [],
"source": [
"%%beam_sql -o query1_data\n",
"SELECT CAST(5 AS INT) AS `id`, CAST('foo' AS VARCHAR) AS `str`, CAST(3.14 AS DOUBLE) AS `flt`"
]
},
{
"cell_type": "markdown",
"id": "c394ead5",
"metadata": {},
"source": [
"The `beam_sql` magic shows you the result of the SQL query.\n",
"\n",
"It also creates and outputs a PCollection named `query1_data` with `element_type` like `BeamSchema_...(id: int32, str: str)`.\n",
"\n",
"Note that you have **not** explicitly created a Beam pipeline. You get a PCollection because the `beam_sql` magic always **implicitly creates** a pipeline to execute your SQL query. To hold the elements with each field's type info, Beam automatically creates a schema as the `element_type` for the created PCollection."
]
},
{
"cell_type": "markdown",
"id": "981b2cc9",
"metadata": {},
"source": [
"To introspect the data again with more knobs, you can use `show`."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e97caf83",
"metadata": {},
"outputs": [],
"source": [
"from apache_beam.runners.interactive import interactive_beam as ib\n",
"ib.show(query1_data)\n",
"# Uncomment below to set more args.\n",
"# ib.show(query1_data, visualize_data=True, include_window_info=True)"
]
},
{
"cell_type": "markdown",
"id": "f58b15a8",
"metadata": {},
"source": [
"To materialize the PCollection into a pandas [DataFrame](https://pandas.pydata.org/pandas-docs/stable/user_guide/dsintro.html#dataframe) object, you can use `collect`."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "47b8da1a",
"metadata": {},
"outputs": [],
"source": [
"ib.collect(query1_data)"
]
},
{
"cell_type": "markdown",
"id": "09b4f24c",
"metadata": {},
"source": [
"You can also additionally append some transforms such as writing to a text file and print the elements:"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9a650bbb",
"metadata": {},
"outputs": [],
"source": [
"import apache_beam as beam\n",
"\n",
"coder=beam.coders.registry.get_coder(query1_data.element_type)\n",
"print(coder)\n",
"query1_data | beam.io.textio.WriteToText('/tmp/query1_data', coder=coder)\n",
"query1_data | beam.Map(print)"
]
},
{
"cell_type": "markdown",
"id": "6cf89704",
"metadata": {},
"source": [
"Execute the pipeline as a normal pipeline running on DirectRunner and inspect the output file."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d524e1a0",
"metadata": {},
"outputs": [],
"source": [
"!rm -rf /tmp/query1_data*\n",
"query1_data.pipeline.run().wait_until_finish()\n",
"!ls /tmp/query1_data*\n",
"!cat /tmp/query1_data*"
]
},
{
"cell_type": "markdown",
"id": "5600945a",
"metadata": {},
"source": [
"The coder in use is a `RowCoder`. The element is encoded and written to the text file. When inspecting it directly, it may display garbled strings. The file will be revisited later in Query#4."
]
},
{
"cell_type": "markdown",
"id": "30aa1188",
"metadata": {},
"source": [
"### [Optional] Omit the `-o` option.\n",
"If the option is omitted, an output name is auto-generated based on the SQL query and PCollection (if any) it queries. Optionally, you can also use the `_[{execution_count}]` convention: `_` for last output and `_{execution_count}` for a specific cell execution output.\n",
"\n",
"However, explicitly naming the output is recommended for better notebook readability and to avoid unexpected errors.\n",
"\n",
"Below example outputs a PCollection named like `sql_output_...`."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "b445e4f1",
"metadata": {},
"outputs": [],
"source": [
"%%beam_sql\n",
"SELECT CAST(1 AS INT) AS `id`, CAST('foo' AS VARCHAR) AS `str`, CAST(3.14 AS DOUBLE) AS `flt`"
]
},
{
"cell_type": "markdown",
"id": "c7b9e4fb",
"metadata": {},
"source": [
"Now that you are familiar with the `beam_sql` magic, you can build more queries against PCollections.\n",
"\n",
"Let's install the `names` package to randomly generate some names."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ef1ca0fc",
"metadata": {},
"outputs": [],
"source": [
"%pip install names"
]
},
{
"cell_type": "markdown",
"id": "1c0d5739",
"metadata": {},
"source": [
"Import all modules needed for this example."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "20cdf3b9",
"metadata": {},
"outputs": [],
"source": [
"import names\n",
"import typing\n",
"\n",
"import apache_beam as beam\n",
"from apache_beam.runners.interactive.interactive_runner import InteractiveRunner\n",
"from apache_beam.runners.interactive import interactive_beam as ib"
]
},
{
"cell_type": "markdown",
"id": "00db1574",
"metadata": {},
"source": [
"Create a pipeline `p` with the `InteractiveRunner`."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "24caeb60",
"metadata": {},
"outputs": [],
"source": [
"p = beam.Pipeline(InteractiveRunner())"
]
},
{
"cell_type": "markdown",
"id": "0a4ca6eb",
"metadata": {},
"source": [
"Then let's create a schema with `typing.NamedTuple`. Let's call it `Person` with a field `id` and a field `name`."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "23910a9d",
"metadata": {},
"outputs": [],
"source": [
"class Person(typing.NamedTuple):\n",
" id: int\n",
" name: str"
]
},
{
"cell_type": "markdown",
"id": "5c626d63",
"metadata": {},
"source": [
"With `beam_sql` magic, you can utilize all the Beam I/O connectors (streaming is currently not supported due to `DirectRunner` not supporting streaming pipeline with `SqlTransform`) as source of data, then build a SQL query against all the data and check the output. If needed, you can sink the output following the `WriteToText` example demonstrated above."
]
},
{
"cell_type": "markdown",
"id": "2d892920",
"metadata": {},
"source": [
"## Query#2 - Querying a single PCollection\n",
"\n",
"Let's build a PCollection with 10 random `Person` typed elements."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8a5fc9b9",
"metadata": {},
"outputs": [],
"source": [
"persons = (p \n",
" | beam.Create([Person(id=x, name=names.get_full_name()) for x in range(10)]))\n",
"ib.show(persons)"
]
},
{
"cell_type": "markdown",
"id": "84d64746",
"metadata": {},
"source": [
"You can look for all elements with `id < 5` in `persons` with the below query and assign the output to `persons_id_lt_5`. Also, you can enable `-v` option to see more details about the execution."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "07db1116",
"metadata": {},
"outputs": [],
"source": [
"%%beam_sql -o persons_id_lt_5 -v\n",
"SELECT * FROM persons WHERE id <5"
]
},
{
"cell_type": "markdown",
"id": "68afa962",
"metadata": {},
"source": [
"With `-v`, if it's the first time running this query, you might see a warning message about\n",
"\n",
"```\n",
"Schema Person has not been registered to use a RowCoder. Automatically registering it by running: beam.coders.registry.register_coder(Person, beam.coders.RowCoder)\n",
"```\n",
"\n",
"The `beam_sql` magic helps registering a `RowCoder` for each schema you define and use whenever it finds one. You can also explicitly run the same code to do so.\n",
"\n",
"Note the output element type is `Person(id: int, name: str)` instead of `BeamSchema_...` because you have selected all the fields from a single PCollection of the known type `Person(id: int, name: str)`."
]
},
{
"cell_type": "markdown",
"id": "79587515",
"metadata": {},
"source": [
"## Query#3 - Joining multiple PCollections\n",
"\n",
"You can build a `persons_2` PCollection with a different range of `id`s and `name`s. "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c01fa39a",
"metadata": {},
"outputs": [],
"source": [
"persons_2 = (p \n",
" | beam.Create([Person(id=x, name=names.get_full_name()) for x in range(5, 15)]))\n",
"ib.show(persons_2)"
]
},
{
"cell_type": "markdown",
"id": "6904ff8e",
"metadata": {},
"source": [
"Then query for all `name`s from `persons` and `persons_2` with the same `id`s and assign the output to `persons_with_common_id`."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2a0a60ff",
"metadata": {},
"outputs": [],
"source": [
"%%beam_sql -o persons_with_common_id -v\n",
"SELECT * FROM persons JOIN persons_2 USING (id)"
]
},
{
"cell_type": "markdown",
"id": "4bb4df8a",
"metadata": {},
"source": [
"Note the output element type is now some `BeamSchema_...(id: int64, name: str, name0: str)`. Because you have selected columns from both PCollections, there is no known schema to hold the result. Beam automatically creates a schema and differentiates conflicted field `name` by suffixing `0` to one of them.\n",
"\n",
"And since `Person` is already previously registered with a `RowCoder`, there is no more warning about registering it anymore even with `-v`."
]
},
{
"cell_type": "markdown",
"id": "cfcfeb76",
"metadata": {},
"source": [
"## Query#4 - Join multiple PCollections, including I/O."
]
},
{
"cell_type": "markdown",
"id": "ce8abc3d",
"metadata": {},
"source": [
"Let's read the file written by Query#1 and use it to join `persons` and `persons_2` to find `name`s with the common `id` in all three of them. "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d1dea37b",
"metadata": {},
"outputs": [],
"source": [
"# Use the exact same coder used when WriteToText and explicitly set the output types.\n",
"query1_result_in_file = p | beam.io.ReadFromText(\n",
" '/tmp/query1_data*', coder=coder).with_output_types(\n",
" query1_data.element_type)\n",
"\n",
"# Check all the data sources.\n",
"ib.show(query1_result_in_file)\n",
"ib.show(persons)\n",
"ib.show(persons_2)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4bf6c422",
"metadata": {},
"outputs": [],
"source": [
"%%beam_sql -o entry_with_common_id\n",
"\n",
"SELECT query1_result_in_file.id, persons.name AS `name_1`, persons_2.name AS `name_2`\n",
"FROM query1_result_in_file JOIN persons ON query1_result_in_file.id = persons.id\n",
"JOIN persons_2 ON query1_result_in_file.id = persons_2.id"
]
},
{
"cell_type": "markdown",
"id": "282f6173",
"metadata": {},
"source": [
"You can also chain another `beam_sql` magic to get just `name_1`:"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d858dd6c",
"metadata": {},
"outputs": [],
"source": [
"%%beam_sql -o name_found\n",
"SELECT name_1 AS `name` FROM entry_with_common_id"
]
}
],
"metadata": {
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.11"
}
},
"nbformat": 4,
"nbformat_minor": 5
}