| { |
| "cells": [ |
| { |
| "cell_type": "markdown", |
| "id": "83acd0be", |
| "metadata": {}, |
| "source": [ |
| "Licensed under the Apache License, Version 2.0 (the \"License\");\n", |
| "<!--\n", |
| " Licensed to the Apache Software Foundation (ASF) under one\n", |
| " or more contributor license agreements. See the NOTICE file\n", |
| " distributed with this work for additional information\n", |
| " regarding copyright ownership. The ASF licenses this file\n", |
| " to you under the Apache License, Version 2.0 (the\n", |
| " \"License\"); you may not use this file except in compliance\n", |
| " with the License. You may obtain a copy of the License at\n", |
| "\n", |
| " http://www.apache.org/licenses/LICENSE-2.0\n", |
| "\n", |
| " Unless required by applicable law or agreed to in writing,\n", |
| " software distributed under the License is distributed on an\n", |
| " \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n", |
| " KIND, either express or implied. See the License for the\n", |
| " specific language governing permissions and limitations\n", |
| " under the License.\n", |
| "-->\n" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "5022179a", |
| "metadata": {}, |
| "source": [ |
| "# Run Beam SQL in notebooks\n", |
| "\n", |
| "[Beam SQL](https://beam.apache.org/documentation/dsls/sql/overview/) allows a Beam user to query PCollections with SQL statements. Currently, `InteractiveRunner` does not support `SqlTransform` due to [BEAM-10708](https://issues.apache.org/jira/browse/BEAM-10708). However, a user could use the `beam_sql` magic to run Beam SQL in the notebook and introspect the result.\n", |
| "\n", |
| "`beam_sql` is an IPython [custom magic](https://ipython.readthedocs.io/en/stable/config/custommagics.html). If you're not familiar with magics, here are some [built-in examples](https://ipython.readthedocs.io/en/stable/interactive/magics.html). It's a convenient way to validate your queries locally against known/test data sources when prototyping a Beam pipeline with SQL, before productionizing it on remote cluster/services.\n", |
| "\n", |
| "First, let's load the `beam_sql` magic:" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "c6b6e3c1", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "%load_ext apache_beam.runners.interactive.sql.beam_sql_magics" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "a7c43b84", |
| "metadata": {}, |
| "source": [ |
| "Since SQL support in Beam Python SDK is implemented through xLang external transform, make sure you have below prerequisites:\n", |
| "- Have `docker` installed;\n", |
| "- Have jdk8 or jdk11 installed and $JAVA_HOME set;" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "b280710a", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "!docker image list\n", |
| "!java --version\n", |
| "!echo $JAVA_HOME" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "28b1b320", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "# Optionally sets the logging level to reduce distraction.\n", |
| "import logging\n", |
| "\n", |
| "logging.root.setLevel(logging.ERROR)" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "f6b8789f", |
| "metadata": {}, |
| "source": [ |
| "**Important**: if you're using Beam built from your local source code, additionally:\n", |
| "\n", |
| "- Have the Java expansion service shadowjar built. Go to the root directory of your local beam repo and then execute:\n", |
| " `./gradlew :sdks:java:extensions:sql:expansion-service:shadowJar`.\n", |
| "- Based on your jdk version, pull the docker image `docker pull apache/beam_java11_sdk` or java17, java21.\n", |
| "- Then tag the image with your current Beam dev version. You can check the dev version under `apache_beam.version.__version__`. For example, if you're using jdk11 and dev version is `x.x.x.dev`, execute `docker image tag apache/beam_java11_sdk:latest apache/beam_java11_sdk:x.x.x.dev`." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "14c8967d", |
| "metadata": {}, |
| "source": [ |
| "## Query#1 - A simple static query\n", |
| "\n", |
| "The `beam_sql` magic can be used as either a line magic or a cell magic.\n", |
| "You can check its usage by running:" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "c212dd89", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "%beam_sql -h" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "7914c1aa", |
| "metadata": {}, |
| "source": [ |
| "You can run a simple SQL query (in Apache Calcite SQL [syntax](https://beam.apache.org/documentation/dsls/sql/calcite/query-syntax/)) to create a [schema-aware PCollection](https://beam.apache.org/documentation/programming-guide/#schemas) from static values." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "895341fa", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "%%beam_sql -o query1_data\n", |
| "SELECT CAST(5 AS INT) AS `id`, CAST('foo' AS VARCHAR) AS `str`, CAST(3.14 AS DOUBLE) AS `flt`" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "c394ead5", |
| "metadata": {}, |
| "source": [ |
| "The `beam_sql` magic shows you the result of the SQL query.\n", |
| "\n", |
| "It also creates and outputs a PCollection named `query1_data` with `element_type` like `BeamSchema_...(id: int32, str: str)`.\n", |
| "\n", |
| "Note that you have **not** explicitly created a Beam pipeline. You get a PCollection because the `beam_sql` magic always **implicitly creates** a pipeline to execute your SQL query. To hold the elements with each field's type info, Beam automatically creates a schema as the `element_type` for the created PCollection." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "981b2cc9", |
| "metadata": {}, |
| "source": [ |
| "To introspect the data again with more knobs, you can use `show`." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "e97caf83", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "from apache_beam.runners.interactive import interactive_beam as ib\n", |
| "ib.show(query1_data)\n", |
| "# Uncomment below to set more args.\n", |
| "# ib.show(query1_data, visualize_data=True, include_window_info=True)" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "f58b15a8", |
| "metadata": {}, |
| "source": [ |
| "To materialize the PCollection into a pandas [DataFrame](https://pandas.pydata.org/pandas-docs/stable/user_guide/dsintro.html#dataframe) object, you can use `collect`." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "47b8da1a", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "ib.collect(query1_data)" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "09b4f24c", |
| "metadata": {}, |
| "source": [ |
| "You can also additionally append some transforms such as writing to a text file and print the elements:" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "9a650bbb", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "import apache_beam as beam\n", |
| "\n", |
| "coder=beam.coders.registry.get_coder(query1_data.element_type)\n", |
| "print(coder)\n", |
| "query1_data | beam.io.textio.WriteToText('/tmp/query1_data', coder=coder)\n", |
| "query1_data | beam.Map(print)" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "6cf89704", |
| "metadata": {}, |
| "source": [ |
| "Execute the pipeline as a normal pipeline running on DirectRunner and inspect the output file." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "d524e1a0", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "!rm -rf /tmp/query1_data*\n", |
| "query1_data.pipeline.run().wait_until_finish()\n", |
| "!ls /tmp/query1_data*\n", |
| "!cat /tmp/query1_data*" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "5600945a", |
| "metadata": {}, |
| "source": [ |
| "The coder in use is a `RowCoder`. The element is encoded and written to the text file. When inspecting it directly, it may display garbled strings. The file will be revisited later in Query#4." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "30aa1188", |
| "metadata": {}, |
| "source": [ |
| "### [Optional] Omit the `-o` option.\n", |
| "If the option is omitted, an output name is auto-generated based on the SQL query and PCollection (if any) it queries. Optionally, you can also use the `_[{execution_count}]` convention: `_` for last output and `_{execution_count}` for a specific cell execution output.\n", |
| "\n", |
| "However, explicitly naming the output is recommended for better notebook readability and to avoid unexpected errors.\n", |
| "\n", |
| "Below example outputs a PCollection named like `sql_output_...`." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "b445e4f1", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "%%beam_sql\n", |
| "SELECT CAST(1 AS INT) AS `id`, CAST('foo' AS VARCHAR) AS `str`, CAST(3.14 AS DOUBLE) AS `flt`" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "c7b9e4fb", |
| "metadata": {}, |
| "source": [ |
| "Now that you are familiar with the `beam_sql` magic, you can build more queries against PCollections.\n", |
| "\n", |
| "Let's install the `names` package to randomly generate some names." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "ef1ca0fc", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "%pip install names" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "1c0d5739", |
| "metadata": {}, |
| "source": [ |
| "Import all modules needed for this example." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "20cdf3b9", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "import names\n", |
| "import typing\n", |
| "\n", |
| "import apache_beam as beam\n", |
| "from apache_beam.runners.interactive.interactive_runner import InteractiveRunner\n", |
| "from apache_beam.runners.interactive import interactive_beam as ib" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "00db1574", |
| "metadata": {}, |
| "source": [ |
| "Create a pipeline `p` with the `InteractiveRunner`." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "24caeb60", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "p = beam.Pipeline(InteractiveRunner())" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "0a4ca6eb", |
| "metadata": {}, |
| "source": [ |
| "Then let's create a schema with `typing.NamedTuple`. Let's call it `Person` with a field `id` and a field `name`." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "23910a9d", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "class Person(typing.NamedTuple):\n", |
| " id: int\n", |
| " name: str" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "5c626d63", |
| "metadata": {}, |
| "source": [ |
| "With `beam_sql` magic, you can utilize all the Beam I/O connectors (streaming is currently not supported due to `DirectRunner` not supporting streaming pipeline with `SqlTransform`) as source of data, then build a SQL query against all the data and check the output. If needed, you can sink the output following the `WriteToText` example demonstrated above." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "2d892920", |
| "metadata": {}, |
| "source": [ |
| "## Query#2 - Querying a single PCollection\n", |
| "\n", |
| "Let's build a PCollection with 10 random `Person` typed elements." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "8a5fc9b9", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "persons = (p \n", |
| " | beam.Create([Person(id=x, name=names.get_full_name()) for x in range(10)]))\n", |
| "ib.show(persons)" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "84d64746", |
| "metadata": {}, |
| "source": [ |
| "You can look for all elements with `id < 5` in `persons` with the below query and assign the output to `persons_id_lt_5`. Also, you can enable `-v` option to see more details about the execution." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "07db1116", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "%%beam_sql -o persons_id_lt_5 -v\n", |
| "SELECT * FROM persons WHERE id <5" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "68afa962", |
| "metadata": {}, |
| "source": [ |
| "With `-v`, if it's the first time running this query, you might see a warning message about\n", |
| "\n", |
| "```\n", |
| "Schema Person has not been registered to use a RowCoder. Automatically registering it by running: beam.coders.registry.register_coder(Person, beam.coders.RowCoder)\n", |
| "```\n", |
| "\n", |
| "The `beam_sql` magic helps registering a `RowCoder` for each schema you define and use whenever it finds one. You can also explicitly run the same code to do so.\n", |
| "\n", |
| "Note the output element type is `Person(id: int, name: str)` instead of `BeamSchema_...` because you have selected all the fields from a single PCollection of the known type `Person(id: int, name: str)`." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "79587515", |
| "metadata": {}, |
| "source": [ |
| "## Query#3 - Joining multiple PCollections\n", |
| "\n", |
| "You can build a `persons_2` PCollection with a different range of `id`s and `name`s. " |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "c01fa39a", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "persons_2 = (p \n", |
| " | beam.Create([Person(id=x, name=names.get_full_name()) for x in range(5, 15)]))\n", |
| "ib.show(persons_2)" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "6904ff8e", |
| "metadata": {}, |
| "source": [ |
| "Then query for all `name`s from `persons` and `persons_2` with the same `id`s and assign the output to `persons_with_common_id`." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "2a0a60ff", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "%%beam_sql -o persons_with_common_id -v\n", |
| "SELECT * FROM persons JOIN persons_2 USING (id)" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "4bb4df8a", |
| "metadata": {}, |
| "source": [ |
| "Note the output element type is now some `BeamSchema_...(id: int64, name: str, name0: str)`. Because you have selected columns from both PCollections, there is no known schema to hold the result. Beam automatically creates a schema and differentiates conflicted field `name` by suffixing `0` to one of them.\n", |
| "\n", |
| "And since `Person` is already previously registered with a `RowCoder`, there is no more warning about registering it anymore even with `-v`." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "cfcfeb76", |
| "metadata": {}, |
| "source": [ |
| "## Query#4 - Join multiple PCollections, including I/O." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "ce8abc3d", |
| "metadata": {}, |
| "source": [ |
| "Let's read the file written by Query#1 and use it to join `persons` and `persons_2` to find `name`s with the common `id` in all three of them. " |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "d1dea37b", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "# Use the exact same coder used when WriteToText and explicitly set the output types.\n", |
| "query1_result_in_file = p | beam.io.ReadFromText(\n", |
| " '/tmp/query1_data*', coder=coder).with_output_types(\n", |
| " query1_data.element_type)\n", |
| "\n", |
| "# Check all the data sources.\n", |
| "ib.show(query1_result_in_file)\n", |
| "ib.show(persons)\n", |
| "ib.show(persons_2)" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "4bf6c422", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "%%beam_sql -o entry_with_common_id\n", |
| "\n", |
| "SELECT query1_result_in_file.id, persons.name AS `name_1`, persons_2.name AS `name_2`\n", |
| "FROM query1_result_in_file JOIN persons ON query1_result_in_file.id = persons.id\n", |
| "JOIN persons_2 ON query1_result_in_file.id = persons_2.id" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "282f6173", |
| "metadata": {}, |
| "source": [ |
| "You can also chain another `beam_sql` magic to get just `name_1`:" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "d858dd6c", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "%%beam_sql -o name_found\n", |
| "SELECT name_1 AS `name` FROM entry_with_common_id" |
| ] |
| } |
| ], |
| "metadata": { |
| "language_info": { |
| "codemirror_mode": { |
| "name": "ipython", |
| "version": 3 |
| }, |
| "file_extension": ".py", |
| "mimetype": "text/x-python", |
| "name": "python", |
| "nbconvert_exporter": "python", |
| "pygments_lexer": "ipython3", |
| "version": "3.7.11" |
| } |
| }, |
| "nbformat": 4, |
| "nbformat_minor": 5 |
| } |