| { |
| "license": [ |
| "Licensed to the Apache Software Foundation (ASF) under one", |
| "or more contributor license agreements. See the NOTICE file", |
| "distributed with this work for additional information", |
| "regarding copyright ownership. The ASF licenses this file", |
| "to you under the Apache License, Version 2.0 (the", |
| "\"License\"); you may not use this file except in compliance", |
| "with the License. You may obtain a copy of the License at", |
| "", |
| " http://www.apache.org/licenses/LICENSE-2.0", |
| "", |
| "Unless required by applicable law or agreed to in writing,", |
| "software distributed under the License is distributed on an", |
| "\"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY", |
| "KIND, either express or implied. See the License for the", |
| "specific language governing permissions and limitations", |
| "under the License." |
| ], |
| "cells": [ |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "view-in-github", |
| "colab_type": "text" |
| }, |
| "source": [ |
| "<a href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/get-started/try-apache-beam-yaml.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "cellView": "form", |
| "id": "dLvdnuWTdHOv" |
| }, |
| "outputs": [], |
| "source": [ |
| "#@title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n", |
| "\n", |
| "# Licensed to the Apache Software Foundation (ASF) under one\n", |
| "# or more contributor license agreements. See the NOTICE file\n", |
| "# distributed with this work for additional information\n", |
| "# regarding copyright ownership. The ASF licenses this file\n", |
| "# to you under the Apache License, Version 2.0 (the\n", |
| "# \"License\"); you may not use this file except in compliance\n", |
| "# with the License. You may obtain a copy of the License at\n", |
| "#\n", |
| "# http://www.apache.org/licenses/LICENSE-2.0\n", |
| "#\n", |
| "# Unless required by applicable law or agreed to in writing,\n", |
| "# software distributed under the License is distributed on an\n", |
| "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n", |
| "# KIND, either express or implied. See the License for the\n", |
| "# specific language governing permissions and limitations\n", |
| "# under the License." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "lNKIMlEDZ_Vw" |
| }, |
| "source": [ |
| "# Try Apache Beam - YAML\n", |
| "\n", |
| "While Beam provides powerful APIs for authoring sophisticated data processing pipelines, it still has a high barrier for getting started and authoring simple pipelines. Even setting up the environment, installing the dependencies, and setting up the project can be a challenge.\n", |
| "\n", |
| "Here we provide a simple declarative syntax for describing pipelines that does not require coding experience or learning how to use an SDK—any text editor will do. Some installation may be required to actually *execute* a pipeline, but we envision various services (such as Dataflow) to accept yaml pipelines directly obviating the need for even that in the future. We also anticipate the ability to generate code directly from these higher-level yaml descriptions, should one want to graduate to a full Beam SDK (and possibly the other direction as well as far as possible).\n", |
| "\n", |
| "It should be noted that everything here is still under development, but any features already included are considered stable. Feedback is welcome at dev@apache.beam.org.\n", |
| "\n", |
| "In this notebook, you set up your development environment and write a simple pipeline using YAML. Then you run it locally, using the [DirectRunner](https://beam.apache.org/documentation/runners/direct/). You can explore other runners with the [Beam Capability Matrix](https://beam.apache.org/documentation/runners/capability-matrix/).\n", |
| "\n", |
| "To navigate through different sections, use the table of contents. From **View** drop-down list, select **Table of contents**.\n", |
| "\n", |
| "To run a code cell, click the **Run cell** button at the top left of the cell, or select it and press **`Shift+Enter`**. Try modifying a code cell and re-running it to see what happens.\n", |
| "\n", |
| "To learn more about Colab, see [Welcome to Colaboratory!](https://colab.sandbox.google.com/notebooks/welcome.ipynb)." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "Fz6KSQ13_3Rr" |
| }, |
| "source": [ |
| "# Setup\n", |
| "\n", |
| "First, you need to set up your environment. The following code installs `apache-beam` and creates directories for your data, pipelines and results." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "id": "GOOk81Jj_yUy" |
| }, |
| "outputs": [], |
| "source": [ |
| "# Install apache-beam\n", |
| "%pip install --quiet apache-beam[yaml]\n", |
| "\n", |
| "# Create directories for storing data, pipelines and results\n", |
| "! mkdir -p data\n", |
| "! mkdir -p pipelines\n", |
| "! mkdir -p results" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "w8gdWUSid6fF" |
| }, |
| "source": [] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "AAdV13NpdHOx" |
| }, |
| "source": [ |
| "We'll also create an artificial dataset that represents a simple database. The csv data contains information about different people. Each line represents a single person and their details are separated by commas. The file contains 5 columns: id, firstname, age, country and a profession." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "UBD4GqHRdHOx", |
| "outputId": "536c94fb-732a-4b6c-8c25-c68ffa2be185" |
| }, |
| "outputs": [ |
| { |
| "output_type": "stream", |
| "name": "stdout", |
| "text": [ |
| "Overwriting data/people.csv\n" |
| ] |
| } |
| ], |
| "source": [ |
| "%%writefile 'data/people.csv'\n", |
| "id,firstname,age,country,profession\n", |
| "1,Reeba,58,Belgium,unemployed\n", |
| "2,Maud,45,Spain,firefighter\n", |
| "3,Meg,11,France,unemployed\n", |
| "4,Rani,53,Spain,doctor\n", |
| "5,Natka,26,France,doctor\n", |
| "6,Aurore,32,Italy,police officer\n", |
| "7,Elvira,39,Italy,doctor\n", |
| "8,Asia,10,Belgium,doctor\n", |
| "9,Lesly,35,Spain,firefighter\n", |
| "10,Orelia,31,Germany,police officer\n", |
| "11,Theodora,16,Italy,unemployed\n", |
| "12,Viviene,44,Germany,police officer\n", |
| "13,Teriann,50,Belgium,police officer\n", |
| "14,Carol-Jean,23,Germany,unemployed\n", |
| "15,Drucie,15,Spain,police officer\n", |
| "16,Elie,10,Italy,doctor\n", |
| "17,Shaylyn,34,Belgium,worker\n", |
| "18,Fayre,33,Spain,police officer\n", |
| "19,Sabina,52,Germany,police officer\n", |
| "20,Aryn,20,Belgium,police officer\n", |
| "21,Darlleen,49,Spain,worker\n", |
| "22,Jere,18,Italy,worker\n", |
| "23,Candi,60,Germany,police officer\n", |
| "24,Sindee,40,Germany,firefighter\n", |
| "25,Selma,20,Spain,worker\n", |
| "26,Vonny,35,Germany,doctor\n", |
| "27,Kate,53,Spain,worker\n", |
| "28,Annabela,48,Belgium,worker\n", |
| "29,Jenilee,55,Germany,police officer\n", |
| "30,Gusella,44,France,police officer\n", |
| "31,Fawne,35,Spain,worker\n", |
| "32,Karolina,39,Spain,police officer\n", |
| "33,Sadie,58,Germany,firefighter\n", |
| "34,Clo,10,Italy,police officer\n", |
| "35,Beth,46,Spain,firefighter\n", |
| "36,Adore,18,Italy,firefighter\n", |
| "37,Tarra,29,Spain,doctor\n", |
| "38,Jessamyn,36,France,police officer\n", |
| "39,Deedee,24,Germany,unemployed\n", |
| "40,Patricia,45,Italy,doctor\n", |
| "41,Wileen,39,Spain,police officer\n", |
| "42,Paola,55,Italy,worker\n", |
| "43,Gwyneth,37,Italy,worker\n", |
| "44,Stacey,36,Spain,worker\n", |
| "45,Camile,60,Germany,unemployed\n", |
| "46,Sheree,10,Spain,unemployed\n", |
| "47,Albertina,53,France,police officer\n", |
| "48,Jinny,30,Spain,firefighter\n", |
| "49,Kayla,57,Italy,firefighter\n", |
| "50,Jaime,55,France,doctor" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "RcgxZfvXdHOx" |
| }, |
| "source": [ |
| "Let's validate if the file was created correctly. You should see the first few lines from the generated file. Validate if the beginning of the file matches with the declared content above." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "y5394CsSdHOx", |
| "outputId": "8eb21a73-63e0-433a-98ef-46acf6fa5917" |
| }, |
| "outputs": [ |
| { |
| "output_type": "stream", |
| "name": "stdout", |
| "text": [ |
| "id,firstname,age,country,profession\n", |
| "1,Reeba,58,Belgium,unemployed\n", |
| "2,Maud,45,Spain,firefighter\n", |
| "3,Meg,11,France,unemployed\n", |
| "4,Rani,53,Spain,doctor\n", |
| "5,Natka,26,France,doctor\n", |
| "6,Aurore,32,Italy,police officer\n", |
| "7,Elvira,39,Italy,doctor\n", |
| "8,Asia,10,Belgium,doctor\n", |
| "9,Lesly,35,Spain,firefighter\n" |
| ] |
| } |
| ], |
| "source": [ |
| "! head data/people.csv" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "zkluLENwdHOx" |
| }, |
| "source": [ |
| "# Your first YAML pipelines\n", |
| "\n", |
| "In this section we'll present you the basic structure of a YAML pipeline and present you some available transforms.\n", |
| "Below is a simple pipeline that reads data from the csv file we've just created and logs the elements for debugging purposes.\n", |
| "\n", |
| "The `LogForTesting` transform lets us log the data when developing a pipeline. Remember, it is not advised to use this transform in production.\n", |
| "\n", |
| "Let's define the pipeline and save it to a file:" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "90o2G7R8dHOx", |
| "outputId": "4eb76a3f-cc4b-4a73-b2f8-3cc4be07dfb5" |
| }, |
| "outputs": [ |
| { |
| "output_type": "stream", |
| "name": "stdout", |
| "text": [ |
| "Writing pipelines/pipeline-01.yaml\n" |
| ] |
| } |
| ], |
| "source": [ |
| "%%writefile 'pipelines/pipeline-01.yaml'\n", |
| "pipeline:\n", |
| " type: chain\n", |
| " transforms:\n", |
| " - type: ReadFromCsv\n", |
| " config:\n", |
| " path: data/people.csv\n", |
| " - type: LogForTesting" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "collapsed": false, |
| "id": "8YSE99x2dHOx" |
| }, |
| "source": [ |
| "We can verify the contents of this file by running:" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "q82oCs2HdHOx", |
| "outputId": "9f2c48f3-1a60-4190-ab21-5e5edaadf859" |
| }, |
| "outputs": [ |
| { |
| "output_type": "stream", |
| "name": "stdout", |
| "text": [ |
| "pipeline:\n", |
| " type: chain\n", |
| " transforms:\n", |
| " - type: ReadFromCsv\n", |
| " config:\n", |
| " path: data/people.csv\n", |
| " - type: LogForTesting\n" |
| ] |
| } |
| ], |
| "source": [ |
| "! cat pipelines/pipeline-01.yaml" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "collapsed": false, |
| "id": "x2o6I1Z-dHOx" |
| }, |
| "source": [ |
| "Now, we can execute the yaml pipeline by passing this file as an argument to the following command:" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "irAgfW28dHOy", |
| "outputId": "a21ef548-1f35-4670-8fdc-0c1111a4c80e" |
| }, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "WARNING:root:crcmod package not found. This package is required if python-snappy or google-crc32c are not installed. To ensure crcmod is installed, install the tfrecord extra: pip install apache-beam[tfrecord]\n", |
| "WARNING:apache_beam.options.pipeline_options:Bucket gs://derrickaw-test/temp used as temp_location has soft-delete policy enabled. To avoid being billed for unnecessary storage costs, turn off the soft delete feature on buckets that your Dataflow jobs use for temporary and staging storage. For more information, see https://cloud.google.com/storage/docs/use-soft-delete#remove-soft-delete-policy.\n", |
| "Building pipeline...\n", |
| "2026-01-20 19:44:15.913679: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.\n", |
| "2026-01-20 19:44:15.918901: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.\n", |
| "2026-01-20 19:44:15.934463: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered\n", |
| "WARNING: All log messages before absl::InitializeLog() is called are written to STDERR\n", |
| "E0000 00:00:1768938255.959552 38575 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered\n", |
| "E0000 00:00:1768938255.966385 38575 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered\n", |
| "W0000 00:00:1768938255.984019 38575 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.\n", |
| "W0000 00:00:1768938255.984067 38575 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.\n", |
| "W0000 00:00:1768938255.984072 38575 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.\n", |
| "W0000 00:00:1768938255.984076 38575 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.\n", |
| "2026-01-20 19:44:15.989514: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.\n", |
| "To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.\n", |
| "WARNING:torchao.kernel.intmm:Warning: Detected no triton, on systems without Triton certain kernels will not work\n", |
| "INFO:datasets:TensorFlow version 2.19.0 available.\n", |
| "INFO:datasets:JAX version 0.7.2 available.\n", |
| "/usr/local/lib/python3.12/dist-packages/google/cloud/aiplatform/models.py:52: FutureWarning: Support for google-cloud-storage < 3.0.0 will be removed in a future version of google-cloud-aiplatform. Please upgrade to google-cloud-storage >= 3.0.0.\n", |
| " from google.cloud.aiplatform.utils import gcs_utils\n", |
| "WARNING:root:Could not load ML transform module apache_beam.ml.transforms.tft: No module named 'tensorflow_transform'. Please install the necessary module dependencies\n", |
| "INFO:apache_beam.yaml.yaml_transform:Expanding \"ReadFromCsv\" at line 4 \n", |
| "INFO:root:Computing dataframe stage <ComputeStage(PTransform) label=[[ComputedExpression[get_column_Series_134004179629504], ComputedExpression[astype_Series_134004179618080], ComputedExpression[set_column_DataFrame_134004179625952], ComputedExpression[get_column_Series_134004179627632], ComputedExpression[astype_Series_134004179621440], ComputedExpression[set_column_DataFrame_134004179368608], ComputedExpression[get_column_Series_134004179362080], ComputedExpression[astype_Series_134004179367408], ComputedExpression[set_column_DataFrame_134004179362128]]:134004179857920]> for Stage[inputs={PlaceholderExpression[placeholder_DataFrame_134004179631040]}, partitioning=Arbitrary, ops=[ComputedExpression[get_column_Series_134004179629504], ComputedExpression[astype_Series_134004179618080], ComputedExpression[set_column_DataFrame_134004179625952], ComputedExpression[get_column_Series_134004179627632], ComputedExpression[astype_Series_134004179621440], ComputedExpression[set_column_DataFrame_134004179368608], ComputedExpression[get_column_Series_134004179362080], ComputedExpression[astype_Series_134004179367408], ComputedExpression[set_column_DataFrame_134004179362128]], outputs={PlaceholderExpression[placeholder_DataFrame_134004179631040], ComputedExpression[set_column_DataFrame_134004179362128]}]\n", |
| "INFO:apache_beam.yaml.yaml_transform:Expanding \"LogForTesting\" at line 7 \n", |
| "Running pipeline...\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:Pipeline has additional dependencies to be installed in SDK worker container, consider using the SDK container image pre-building workflow to avoid repetitive installations. Learn more on https://cloud.google.com/dataflow/docs/guides/using-custom-containers#prebuild\n", |
| "/usr/local/lib/python3.12/dist-packages/apache_beam/runners/dataflow/internal/apiclient.py:1170: UserWarning: A non-standard version of Beam SDK detected: 2.71.0rc3. Dataflow runner will use container image tag 2.71.0. This use case is not supported.\n", |
| " warnings.warn(\n", |
| "/usr/local/lib/python3.12/dist-packages/apache_beam/runners/dataflow/internal/apiclient.py:1170: UserWarning: A non-standard version of Beam SDK detected: 2.71.0rc3. Dataflow runner will use container image tag 2.71.0. This use case is not supported.\n", |
| " warnings.warn(\n", |
| "INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://derrickaw-test/temp/beamapp-root-0120194432-803993-ymqls1zn.1768938272.804163/pickled_main_session...\n", |
| "INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://derrickaw-test/temp/beamapp-root-0120194432-803993-ymqls1zn.1768938272.804163/pickled_main_session in 0 seconds.\n", |
| "INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://derrickaw-test/temp/beamapp-root-0120194432-803993-ymqls1zn.1768938272.804163/submission_environment_dependencies.txt...\n", |
| "INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://derrickaw-test/temp/beamapp-root-0120194432-803993-ymqls1zn.1768938272.804163/submission_environment_dependencies.txt in 0 seconds.\n", |
| "INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://derrickaw-test/temp/beamapp-root-0120194432-803993-ymqls1zn.1768938272.804163/pipeline.pb...\n", |
| "INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://derrickaw-test/temp/beamapp-root-0120194432-803993-ymqls1zn.1768938272.804163/pipeline.pb in 0 seconds.\n", |
| "INFO:apache_beam.runners.dataflow.internal.apiclient:Create job: <Job\n", |
| " clientRequestId: '20260120194432805038-1417'\n", |
| " createTime: '2026-01-20T19:44:34.587036Z'\n", |
| " currentStateTime: '1970-01-01T00:00:00Z'\n", |
| " id: '2026-01-20_11_44_34-2209544548871949506'\n", |
| " location: 'us-central1'\n", |
| " name: 'beamapp-root-0120194432-803993-ymqls1zn'\n", |
| " projectId: 'google.com:clouddfe'\n", |
| " stageStates: []\n", |
| " startTime: '2026-01-20T19:44:34.587036Z'\n", |
| " steps: []\n", |
| " tempFiles: []\n", |
| " type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)>\n", |
| "INFO:apache_beam.runners.dataflow.internal.apiclient:Created job with id: [2026-01-20_11_44_34-2209544548871949506]\n", |
| "INFO:apache_beam.runners.dataflow.internal.apiclient:Submitted job: 2026-01-20_11_44_34-2209544548871949506\n", |
| "INFO:apache_beam.runners.dataflow.internal.apiclient:To access the Dataflow monitoring console, please navigate to https://console.cloud.google.com/dataflow/jobs/us-central1/2026-01-20_11_44_34-2209544548871949506?project=google.com:clouddfe\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:Job 2026-01-20_11_44_34-2209544548871949506 is in state JOB_STATE_PENDING\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T19:44:38.071Z: JOB_MESSAGE_BASIC: Worker configuration: e2-standard-2 in us-central1-a.\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T19:44:39.652Z: JOB_MESSAGE_BASIC: Executing operation ReadFromCsv/_ReadFromPandas/Reshuffle/ReshufflePerKey/GroupByKey/Create\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T19:44:39.707Z: JOB_MESSAGE_BASIC: Starting 1 workers in us-central1-a...\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T19:44:39.803Z: JOB_MESSAGE_BASIC: Finished operation ReadFromCsv/_ReadFromPandas/Reshuffle/ReshufflePerKey/GroupByKey/Create\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T19:44:39.930Z: JOB_MESSAGE_BASIC: Executing operation ReadFromCsv/_ReadFromPandas/Create/Impulse+ReadFromCsv/_ReadFromPandas/Create/FlatMap(<lambda at core.py:4094>)+ReadFromCsv/_ReadFromPandas/Create/Map(decode)+ReadFromCsv/_ReadFromPandas/MatchAll/ParDo(_MatchAllFn)+ReadFromCsv/_ReadFromPandas/Map(<lambda at io.py:306>)+ReadFromCsv/_ReadFromPandas/Reshuffle/AddRandomKeys+ReadFromCsv/_ReadFromPandas/Reshuffle/ReshufflePerKey/Map(reify_metadata_default_window)+ReadFromCsv/_ReadFromPandas/Reshuffle/ReshufflePerKey/GroupByKey/Reify+ReadFromCsv/_ReadFromPandas/Reshuffle/ReshufflePerKey/GroupByKey/Write\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:Job 2026-01-20_11_44_34-2209544548871949506 is in state JOB_STATE_RUNNING\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T19:45:13.186Z: JOB_MESSAGE_BASIC: Your project already contains 100 Dataflow-created metric descriptors, so new user metrics of the form custom.googleapis.com/* will not be created. However, all user metrics are also available in the metric dataflow.googleapis.com/job/user_counter. If you rely on the custom metrics, you can delete old / unused metric descriptors. See https://developers.google.com/apis-explorer/#p/monitoring/v3/monitoring.projects.metricDescriptors.list and https://developers.google.com/apis-explorer/#p/monitoring/v3/monitoring.projects.metricDescriptors.delete\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T19:47:31.534Z: JOB_MESSAGE_BASIC: All workers have finished the startup processes and began to receive work requests.\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T19:47:32.596Z: JOB_MESSAGE_BASIC: Finished operation ReadFromCsv/_ReadFromPandas/Create/Impulse+ReadFromCsv/_ReadFromPandas/Create/FlatMap(<lambda at core.py:4094>)+ReadFromCsv/_ReadFromPandas/Create/Map(decode)+ReadFromCsv/_ReadFromPandas/MatchAll/ParDo(_MatchAllFn)+ReadFromCsv/_ReadFromPandas/Map(<lambda at io.py:306>)+ReadFromCsv/_ReadFromPandas/Reshuffle/AddRandomKeys+ReadFromCsv/_ReadFromPandas/Reshuffle/ReshufflePerKey/Map(reify_metadata_default_window)+ReadFromCsv/_ReadFromPandas/Reshuffle/ReshufflePerKey/GroupByKey/Reify+ReadFromCsv/_ReadFromPandas/Reshuffle/ReshufflePerKey/GroupByKey/Write\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T19:47:32.665Z: JOB_MESSAGE_BASIC: Executing operation ReadFromCsv/_ReadFromPandas/Reshuffle/ReshufflePerKey/GroupByKey/Close\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T19:47:32.688Z: JOB_MESSAGE_BASIC: Executing operation ReadFromCsv/_ReadFromPandas/Map(<lambda at io.py:301>)/View-python_side_input0\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T19:47:32.707Z: JOB_MESSAGE_BASIC: Finished operation ReadFromCsv/_ReadFromPandas/Reshuffle/ReshufflePerKey/GroupByKey/Close\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T19:47:32.742Z: JOB_MESSAGE_BASIC: Finished operation ReadFromCsv/_ReadFromPandas/Map(<lambda at io.py:301>)/View-python_side_input0\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T19:47:32.827Z: JOB_MESSAGE_BASIC: Executing operation ReadFromCsv/_ReadFromPandas/DoOnce/Impulse+ReadFromCsv/_ReadFromPandas/DoOnce/FlatMap(<lambda at core.py:4094>)+ReadFromCsv/_ReadFromPandas/DoOnce/Map(decode)+ReadFromCsv/_ReadFromPandas/Map(<lambda at io.py:301>)\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T19:47:34.489Z: JOB_MESSAGE_BASIC: Finished operation ReadFromCsv/_ReadFromPandas/DoOnce/Impulse+ReadFromCsv/_ReadFromPandas/DoOnce/FlatMap(<lambda at core.py:4094>)+ReadFromCsv/_ReadFromPandas/DoOnce/Map(decode)+ReadFromCsv/_ReadFromPandas/Map(<lambda at io.py:301>)\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T19:47:34.579Z: JOB_MESSAGE_BASIC: Executing operation ReadFromCsv/_ReadFromPandas/ParDo(_ReadFromPandasDoFn)/View-python_side_input0\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T19:47:34.623Z: JOB_MESSAGE_BASIC: Finished operation ReadFromCsv/_ReadFromPandas/ParDo(_ReadFromPandasDoFn)/View-python_side_input0\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T19:47:34.717Z: JOB_MESSAGE_BASIC: Executing operation ReadFromCsv/_ReadFromPandas/Reshuffle/ReshufflePerKey/GroupByKey/Read+ReadFromCsv/_ReadFromPandas/Reshuffle/ReshufflePerKey/GroupByKey/GroupByWindow+ReadFromCsv/_ReadFromPandas/Reshuffle/ReshufflePerKey/FlatMap(restore_metadata_default_window)+ReadFromCsv/_ReadFromPandas/Reshuffle/RemoveRandomKeys+ReadFromCsv/_ReadFromPandas/ReadMatches/ParDo(_ReadMatchesFn)+ReadFromCsv/_ReadFromPandas/ParDo(_ReadFromPandasDoFn)/PairWithRestriction+ReadFromCsv/_ReadFromPandas/ParDo(_ReadFromPandasDoFn)/SplitWithSizing\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T19:47:36.438Z: JOB_MESSAGE_BASIC: Finished operation ReadFromCsv/_ReadFromPandas/Reshuffle/ReshufflePerKey/GroupByKey/Read+ReadFromCsv/_ReadFromPandas/Reshuffle/ReshufflePerKey/GroupByKey/GroupByWindow+ReadFromCsv/_ReadFromPandas/Reshuffle/ReshufflePerKey/FlatMap(restore_metadata_default_window)+ReadFromCsv/_ReadFromPandas/Reshuffle/RemoveRandomKeys+ReadFromCsv/_ReadFromPandas/ReadMatches/ParDo(_ReadMatchesFn)+ReadFromCsv/_ReadFromPandas/ParDo(_ReadFromPandasDoFn)/PairWithRestriction+ReadFromCsv/_ReadFromPandas/ParDo(_ReadFromPandasDoFn)/SplitWithSizing\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T19:47:36.524Z: JOB_MESSAGE_BASIC: Executing operation ReadFromCsv/_ReadFromPandas/ParDo(_ReadFromPandasDoFn)/ProcessElementAndRestrictionWithSizing+ReadFromCsv/ToPCollection(df)/[ComputedExpression[get_column_Series_134004179629504], ComputedExpression[astype_Series_134004179618080], ComputedExpression[set_column_DataFrame_134004179625952], ComputedExpression[get_column_Series_134004179627632], ComputedExpression[astype_Series_134004179621440], ComputedExpression[set_column_DataFrame_134004179368608], ComputedExpression[get_column_Series_134004179362080], ComputedExpression[astype_Series_134004179367408], ComputedExpression[set_column_DataFrame_134004179362128]]:134004179857920/Map(<lambda at transforms.py:239>)+ReadFromCsv/ToPCollection(df)/[ComputedExpression[get_column_Series_134004179629504], ComputedExpression[astype_Series_134004179618080], ComputedExpression[set_column_DataFrame_134004179625952], ComputedExpression[get_column_Series_134004179627632], ComputedExpression[astype_Series_134004179621440], ComputedExpression[set_column_DataFrame_134004179368608], ComputedExpression[get_column_Series_134004179362080], ComputedExpression[astype_Series_134004179367408], ComputedExpression[set_column_DataFrame_134004179362128]]:134004179857920/FlatMap(evaluate)/FlatMap(evaluate)+ReadFromCsv/Unbatch 'set_column_DataFrame_134004179362128'+log_for_testing/LogForTesting\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T19:47:37.157Z: JOB_MESSAGE_BASIC: Finished operation ReadFromCsv/_ReadFromPandas/ParDo(_ReadFromPandasDoFn)/ProcessElementAndRestrictionWithSizing+ReadFromCsv/ToPCollection(df)/[ComputedExpression[get_column_Series_134004179629504], ComputedExpression[astype_Series_134004179618080], ComputedExpression[set_column_DataFrame_134004179625952], ComputedExpression[get_column_Series_134004179627632], ComputedExpression[astype_Series_134004179621440], ComputedExpression[set_column_DataFrame_134004179368608], ComputedExpression[get_column_Series_134004179362080], ComputedExpression[astype_Series_134004179367408], ComputedExpression[set_column_DataFrame_134004179362128]]:134004179857920/Map(<lambda at transforms.py:239>)+ReadFromCsv/ToPCollection(df)/[ComputedExpression[get_column_Series_134004179629504], ComputedExpression[astype_Series_134004179618080], ComputedExpression[set_column_DataFrame_134004179625952], ComputedExpression[get_column_Series_134004179627632], ComputedExpression[astype_Series_134004179621440], ComputedExpression[set_column_DataFrame_134004179368608], ComputedExpression[get_column_Series_134004179362080], ComputedExpression[astype_Series_134004179367408], ComputedExpression[set_column_DataFrame_134004179362128]]:134004179857920/FlatMap(evaluate)/FlatMap(evaluate)+ReadFromCsv/Unbatch 'set_column_DataFrame_134004179362128'+log_for_testing/LogForTesting\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T19:47:37.535Z: JOB_MESSAGE_BASIC: Stopping worker pool...\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T19:49:59.850Z: JOB_MESSAGE_BASIC: Worker pool stopped.\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:Job 2026-01-20_11_44_34-2209544548871949506 is in state JOB_STATE_DONE\n" |
| ] |
| } |
| ], |
| "source": [ |
| "! python -m apache_beam.yaml.main --pipeline_spec_file=pipelines/pipeline-01.yaml" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "6LP7PJ7ddHOy" |
| }, |
| "source": [ |
| "Here we use Python and `apache_beam` package to execute the pipeline, but we envision various services (such as Dataflow) to accept yaml pipelines directly obviating the need for that in the future.\n", |
| "\n", |
| "If you scroll through the output logs, you'll find entries such as:\n", |
| "```\n", |
| "INFO:root:BeamSchema_edf39b51_91da_418a_b28e_af04c9bae811(id=1, firstname='Reeba', age=58, country='Belgium', profession='unemployed')\n", |
| "INFO:root:BeamSchema_edf39b51_91da_418a_b28e_af04c9bae811(id=2, firstname='Maud', age=45, country='Spain', profession='firefighter')\n", |
| "INFO:root:BeamSchema_edf39b51_91da_418a_b28e_af04c9bae811(id=3, firstname='Meg', age=11, country='France', profession='unemployed')\n", |
| "INFO:root:BeamSchema_edf39b51_91da_418a_b28e_af04c9bae811(id=4, firstname='Rani', age=53, country='Spain', profession='doctor')\n", |
| "```\n", |
| "This is a representation of records from the input dataset." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "QZ6xKTbgdHOy" |
| }, |
| "source": [ |
| "Let's add a transform - `Filter`. To use this transform you need to specify the 'keep' condition and a language your condition is written in. Below you'll find an example with a condition written in Python.\n", |
| "This pipeline will filter out records containing people that are younger than 18 years old. The only records left to the next transform will be records representing adults. Verify this by scrolling to the bottom of the output logs." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "wguDJ3RcdHOy", |
| "outputId": "a7a47553-bf6c-4012-db9a-5e90834dd5f8" |
| }, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "Overwriting pipelines/pipeline-filter-01.yaml\n" |
| ] |
| } |
| ], |
| "source": [ |
| "%%writefile 'pipelines/pipeline-filter-01.yaml'\n", |
| "pipeline:\n", |
| " type: chain\n", |
| " transforms:\n", |
| " - type: ReadFromCsv\n", |
| " config:\n", |
| " path: data/people.csv\n", |
| " - type: Filter\n", |
| " config:\n", |
| " language: python\n", |
| " keep: \"age >= 18\"\n", |
| " - type: LogForTesting" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "TYbh3q0qdHOy", |
| "outputId": "337a8549-10d4-48e6-9180-65d8015fd58b" |
| }, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "WARNING:root:crcmod package not found. This package is required if python-snappy or google-crc32c are not installed. To ensure crcmod is installed, install the tfrecord extra: pip install apache-beam[tfrecord]\n", |
| "Building pipeline...\n", |
| "2026-01-20 19:51:23.984074: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.\n", |
| "2026-01-20 19:51:23.990548: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.\n", |
| "2026-01-20 19:51:24.020437: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered\n", |
| "WARNING: All log messages before absl::InitializeLog() is called are written to STDERR\n", |
| "E0000 00:00:1768938684.075062 40284 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered\n", |
| "E0000 00:00:1768938684.090512 40284 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered\n", |
| "W0000 00:00:1768938684.133356 40284 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.\n", |
| "W0000 00:00:1768938684.133444 40284 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.\n", |
| "W0000 00:00:1768938684.133450 40284 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.\n", |
| "W0000 00:00:1768938684.133456 40284 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.\n", |
| "2026-01-20 19:51:24.145964: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.\n", |
| "To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.\n", |
| "WARNING:torchao.kernel.intmm:Warning: Detected no triton, on systems without Triton certain kernels will not work\n", |
| "INFO:datasets:TensorFlow version 2.19.0 available.\n", |
| "INFO:datasets:JAX version 0.7.2 available.\n", |
| "/usr/local/lib/python3.12/dist-packages/google/cloud/aiplatform/models.py:52: FutureWarning: Support for google-cloud-storage < 3.0.0 will be removed in a future version of google-cloud-aiplatform. Please upgrade to google-cloud-storage >= 3.0.0.\n", |
| " from google.cloud.aiplatform.utils import gcs_utils\n", |
| "WARNING:root:Could not load ML transform module apache_beam.ml.transforms.tft: No module named 'tensorflow_transform'. Please install the necessary module dependencies\n", |
| "INFO:apache_beam.yaml.yaml_transform:Expanding \"ReadFromCsv\" at line 4 \n", |
| "INFO:root:Computing dataframe stage <ComputeStage(PTransform) label=[[ComputedExpression[get_column_Series_138354359966784], ComputedExpression[astype_Series_138354359961888], ComputedExpression[set_column_DataFrame_138354359973792], ComputedExpression[get_column_Series_138354359961168], ComputedExpression[astype_Series_138354359974128], ComputedExpression[set_column_DataFrame_138354359967072], ComputedExpression[get_column_Series_138354359963424], ComputedExpression[astype_Series_138354359963616], ComputedExpression[set_column_DataFrame_138354360454032]]:138354360571120]> for Stage[inputs={PlaceholderExpression[placeholder_DataFrame_138354359973744]}, partitioning=Arbitrary, ops=[ComputedExpression[get_column_Series_138354359966784], ComputedExpression[astype_Series_138354359961888], ComputedExpression[set_column_DataFrame_138354359973792], ComputedExpression[get_column_Series_138354359961168], ComputedExpression[astype_Series_138354359974128], ComputedExpression[set_column_DataFrame_138354359967072], ComputedExpression[get_column_Series_138354359963424], ComputedExpression[astype_Series_138354359963616], ComputedExpression[set_column_DataFrame_138354360454032]], outputs={PlaceholderExpression[placeholder_DataFrame_138354359973744], ComputedExpression[set_column_DataFrame_138354360454032]}]\n", |
| "INFO:apache_beam.yaml.yaml_transform:Expanding \"Filter\" at line 7 \n", |
| "INFO:apache_beam.yaml.yaml_transform:Expanding \"LogForTesting\" at line 11 \n", |
| "Running pipeline...\n", |
| "INFO:apache_beam.runners.worker.worker_pool_main:Listening for workers at localhost:44779\n", |
| "INFO:apache_beam.runners.portability.prism_runner:Using cached prism binary/zip from /root/.apache_beam/cache/prism/bin/apache_beam-v2.71.0-prism-linux-amd64.zip for https://github.com/apache/beam/releases/download/v2.71.0-RC3/apache_beam-v2.71.0-prism-linux-amd64.zip\n", |
| "INFO:apache_beam.runners.portability.prism_runner:Using cached prism binary from /root/.apache_beam/cache/prism/bin/apache_beam-v2.71.0-prism-linux-amd64 for /root/.apache_beam/cache/prism/bin/apache_beam-v2.71.0-prism-linux-amd64.zip\n", |
| "INFO:apache_beam.runners.portability.prism_runner:Prism binary path resolved to: /root/.apache_beam/cache/prism/bin/apache_beam-v2.71.0-prism-linux-amd64\n", |
| "INFO:apache_beam.utils.subprocess_server:Starting service with ('/root/.apache_beam/cache/prism/bin/apache_beam-v2.71.0-prism-linux-amd64' '--job_port' '58229' '--log_level' 'info' '--log_kind' 'json' '--serve_http=false')\n", |
| "INFO:PrismRunner:Serving JobManagement (endpoint='localhost:58229')\n", |
| "INFO:apache_beam.runners.portability.portable_runner:Environment \"LOOPBACK\" has started a component necessary for the execution. Be sure to run the pipeline using\n", |
| " with Pipeline() as p:\n", |
| " p.apply(..)\n", |
| "This ensures that the pipeline finishes before this program exits.\n", |
| "INFO:apache_beam.runners.portability.portable_runner:Job state changed to STOPPED\n", |
| "INFO:root:starting job-001[job]\n", |
| "INFO:apache_beam.runners.worker.statecache:Creating state cache with size 0\n", |
| "INFO:apache_beam.runners.portability.portable_runner:Job state changed to STARTING\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:Creating insecure control channel for localhost:58229.\n", |
| "INFO:root:running job-001[job]\n", |
| "INFO:apache_beam.runners.portability.portable_runner:Job state changed to RUNNING\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:Control channel established.\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:Initializing SDKHarness with unbounded number of workers.\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:Creating insecure state channel for localhost:58229.\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:State channel established.\n", |
| "INFO:apache_beam.runners.worker.data_plane:Creating client data channel for localhost:58229\n", |
| "INFO:apache_beam.runners.worker.data_plane:Data channel established.\n", |
| "INFO:root:{\"id\": 1, \"firstname\": \"Reeba\", \"age\": 58, \"country\": \"Belgium\", \"profession\": \"unemployed\"}\n", |
| "INFO:root:{\"id\": 2, \"firstname\": \"Maud\", \"age\": 45, \"country\": \"Spain\", \"profession\": \"firefighter\"}\n", |
| "INFO:root:{\"id\": 4, \"firstname\": \"Rani\", \"age\": 53, \"country\": \"Spain\", \"profession\": \"doctor\"}\n", |
| "INFO:root:{\"id\": 5, \"firstname\": \"Natka\", \"age\": 26, \"country\": \"France\", \"profession\": \"doctor\"}\n", |
| "INFO:root:{\"id\": 6, \"firstname\": \"Aurore\", \"age\": 32, \"country\": \"Italy\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 7, \"firstname\": \"Elvira\", \"age\": 39, \"country\": \"Italy\", \"profession\": \"doctor\"}\n", |
| "INFO:root:{\"id\": 9, \"firstname\": \"Lesly\", \"age\": 35, \"country\": \"Spain\", \"profession\": \"firefighter\"}\n", |
| "INFO:root:{\"id\": 10, \"firstname\": \"Orelia\", \"age\": 31, \"country\": \"Germany\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 12, \"firstname\": \"Viviene\", \"age\": 44, \"country\": \"Germany\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 13, \"firstname\": \"Teriann\", \"age\": 50, \"country\": \"Belgium\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 14, \"firstname\": \"Carol-Jean\", \"age\": 23, \"country\": \"Germany\", \"profession\": \"unemployed\"}\n", |
| "INFO:root:{\"id\": 17, \"firstname\": \"Shaylyn\", \"age\": 34, \"country\": \"Belgium\", \"profession\": \"worker\"}\n", |
| "INFO:root:{\"id\": 18, \"firstname\": \"Fayre\", \"age\": 33, \"country\": \"Spain\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 19, \"firstname\": \"Sabina\", \"age\": 52, \"country\": \"Germany\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 20, \"firstname\": \"Aryn\", \"age\": 20, \"country\": \"Belgium\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 21, \"firstname\": \"Darlleen\", \"age\": 49, \"country\": \"Spain\", \"profession\": \"worker\"}\n", |
| "INFO:root:{\"id\": 22, \"firstname\": \"Jere\", \"age\": 18, \"country\": \"Italy\", \"profession\": \"worker\"}\n", |
| "INFO:root:{\"id\": 23, \"firstname\": \"Candi\", \"age\": 60, \"country\": \"Germany\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 24, \"firstname\": \"Sindee\", \"age\": 40, \"country\": \"Germany\", \"profession\": \"firefighter\"}\n", |
| "INFO:root:{\"id\": 25, \"firstname\": \"Selma\", \"age\": 20, \"country\": \"Spain\", \"profession\": \"worker\"}\n", |
| "INFO:root:{\"id\": 26, \"firstname\": \"Vonny\", \"age\": 35, \"country\": \"Germany\", \"profession\": \"doctor\"}\n", |
| "INFO:root:{\"id\": 27, \"firstname\": \"Kate\", \"age\": 53, \"country\": \"Spain\", \"profession\": \"worker\"}\n", |
| "INFO:root:{\"id\": 28, \"firstname\": \"Annabela\", \"age\": 48, \"country\": \"Belgium\", \"profession\": \"worker\"}\n", |
| "INFO:root:{\"id\": 29, \"firstname\": \"Jenilee\", \"age\": 55, \"country\": \"Germany\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 30, \"firstname\": \"Gusella\", \"age\": 44, \"country\": \"France\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 31, \"firstname\": \"Fawne\", \"age\": 35, \"country\": \"Spain\", \"profession\": \"worker\"}\n", |
| "INFO:root:{\"id\": 32, \"firstname\": \"Karolina\", \"age\": 39, \"country\": \"Spain\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 33, \"firstname\": \"Sadie\", \"age\": 58, \"country\": \"Germany\", \"profession\": \"firefighter\"}\n", |
| "INFO:root:{\"id\": 35, \"firstname\": \"Beth\", \"age\": 46, \"country\": \"Spain\", \"profession\": \"firefighter\"}\n", |
| "INFO:root:{\"id\": 36, \"firstname\": \"Adore\", \"age\": 18, \"country\": \"Italy\", \"profession\": \"firefighter\"}\n", |
| "INFO:root:{\"id\": 37, \"firstname\": \"Tarra\", \"age\": 29, \"country\": \"Spain\", \"profession\": \"doctor\"}\n", |
| "INFO:root:{\"id\": 38, \"firstname\": \"Jessamyn\", \"age\": 36, \"country\": \"France\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 39, \"firstname\": \"Deedee\", \"age\": 24, \"country\": \"Germany\", \"profession\": \"unemployed\"}\n", |
| "INFO:root:{\"id\": 40, \"firstname\": \"Patricia\", \"age\": 45, \"country\": \"Italy\", \"profession\": \"doctor\"}\n", |
| "INFO:root:{\"id\": 41, \"firstname\": \"Wileen\", \"age\": 39, \"country\": \"Spain\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 42, \"firstname\": \"Paola\", \"age\": 55, \"country\": \"Italy\", \"profession\": \"worker\"}\n", |
| "INFO:root:{\"id\": 43, \"firstname\": \"Gwyneth\", \"age\": 37, \"country\": \"Italy\", \"profession\": \"worker\"}\n", |
| "INFO:root:{\"id\": 44, \"firstname\": \"Stacey\", \"age\": 36, \"country\": \"Spain\", \"profession\": \"worker\"}\n", |
| "INFO:root:{\"id\": 45, \"firstname\": \"Camile\", \"age\": 60, \"country\": \"Germany\", \"profession\": \"unemployed\"}\n", |
| "INFO:root:{\"id\": 47, \"firstname\": \"Albertina\", \"age\": 53, \"country\": \"France\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 48, \"firstname\": \"Jinny\", \"age\": 30, \"country\": \"Spain\", \"profession\": \"firefighter\"}\n", |
| "INFO:root:{\"id\": 49, \"firstname\": \"Kayla\", \"age\": 57, \"country\": \"Italy\", \"profession\": \"firefighter\"}\n", |
| "INFO:root:{\"id\": 50, \"firstname\": \"Jaime\", \"age\": 55, \"country\": \"France\", \"profession\": \"doctor\"}\n", |
| "INFO:PrismRunner:pipeline done! (job='job-001[job]')\n", |
| "INFO:root:pipeline completed job-001[job]\n", |
| "INFO:root:terminating job-001[job]\n", |
| "INFO:apache_beam.runners.portability.portable_runner:Job state changed to DONE\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:No more requests from control plane\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:SDK Harness waiting for in-flight requests to complete\n", |
| "INFO:apache_beam.runners.worker.data_plane:Closing all cached grpc data channels.\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:Closing all cached gRPC state handlers.\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:Done consuming work.\n" |
| ] |
| } |
| ], |
| "source": [ |
| "! python -m apache_beam.yaml.main --pipeline_spec_file=pipelines/pipeline-filter-01.yaml" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "u9iSlbt8dHOy" |
| }, |
| "source": [ |
| "Similarly, we can create a condition in other languages, for example SQL. In this example we filter out people that are younger than 18 and have a profession other than being 'unemployed'." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "w8OgXr4XdHOy", |
| "outputId": "e05512d0-fe7e-496a-a0c4-f71ec999a7f4" |
| }, |
| "outputs": [ |
| { |
| "output_type": "stream", |
| "name": "stdout", |
| "text": [ |
| "Overwriting pipelines/pipeline-filter-02.yaml\n" |
| ] |
| } |
| ], |
| "source": [ |
| "%%writefile 'pipelines/pipeline-filter-02.yaml'\n", |
| "pipeline:\n", |
| " type: chain\n", |
| " transforms:\n", |
| " - type: ReadFromCsv\n", |
| " config:\n", |
| " path: data/people.csv\n", |
| " - type: Filter\n", |
| " config:\n", |
| " language: sql\n", |
| " keep: \"age >= 18 or (age < 18 and profession = 'unemployed')\"\n", |
| " - type: LogForTesting" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "fWEom8K2dHOy", |
| "outputId": "71007ecd-6f22-474e-b1c4-102ad8bb7c90" |
| }, |
| "outputs": [ |
| { |
| "output_type": "stream", |
| "name": "stdout", |
| "text": [ |
| "WARNING:root:crcmod package not found. This package is required if python-snappy or google-crc32c are not installed. To ensure crcmod is installed, install the tfrecord extra: pip install apache-beam[tfrecord]\n", |
| "INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.\n", |
| "Building pipeline...\n", |
| "2026-01-30 14:52:57.098945: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.\n", |
| "2026-01-30 14:52:57.106287: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.\n", |
| "2026-01-30 14:52:57.128412: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered\n", |
| "WARNING: All log messages before absl::InitializeLog() is called are written to STDERR\n", |
| "E0000 00:00:1769784777.167048 13179 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered\n", |
| "E0000 00:00:1769784777.178735 13179 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered\n", |
| "W0000 00:00:1769784777.206410 13179 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.\n", |
| "W0000 00:00:1769784777.206464 13179 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.\n", |
| "W0000 00:00:1769784777.206471 13179 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.\n", |
| "W0000 00:00:1769784777.206480 13179 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.\n", |
| "2026-01-30 14:52:57.214702: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.\n", |
| "To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.\n", |
| "WARNING:torchao.kernel.intmm:Warning: Detected no triton, on systems without Triton certain kernels will not work\n", |
| "INFO:datasets:TensorFlow version 2.19.0 available.\n", |
| "INFO:datasets:JAX version 0.7.2 available.\n", |
| "WARNING:root:Could not load ML transform module apache_beam.ml.transforms.tft: No module named 'tensorflow_transform'. Please install the necessary module dependencies\n", |
| "INFO:apache_beam.yaml.yaml_transform:Expanding \"ReadFromCsv\" at line 4 \n", |
| "INFO:root:Computing dataframe stage <ComputeStage(PTransform) label=[[ComputedExpression[get_column_Series_135482933162160], ComputedExpression[astype_Series_135482933161248], ComputedExpression[set_column_DataFrame_135482933161536], ComputedExpression[get_column_Series_135482933155392], ComputedExpression[astype_Series_135482933151888], ComputedExpression[set_column_DataFrame_135482932381408], ComputedExpression[get_column_Series_135482932395808], ComputedExpression[astype_Series_135482932392016], ComputedExpression[set_column_DataFrame_135482932395520]]:135482933228032]> for Stage[inputs={PlaceholderExpression[placeholder_DataFrame_135482933156208]}, partitioning=Arbitrary, ops=[ComputedExpression[get_column_Series_135482933162160], ComputedExpression[astype_Series_135482933161248], ComputedExpression[set_column_DataFrame_135482933161536], ComputedExpression[get_column_Series_135482933155392], ComputedExpression[astype_Series_135482933151888], ComputedExpression[set_column_DataFrame_135482932381408], ComputedExpression[get_column_Series_135482932395808], ComputedExpression[astype_Series_135482932392016], ComputedExpression[set_column_DataFrame_135482932395520]], outputs={PlaceholderExpression[placeholder_DataFrame_135482933156208], ComputedExpression[set_column_DataFrame_135482932395520]}]\n", |
| "INFO:apache_beam.yaml.yaml_transform:Expanding \"Filter\" at line 7 \n", |
| "INFO:apache_beam.yaml.yaml_transform:Expanding \"LogForTesting\" at line 11 \n", |
| "Running pipeline...\n", |
| "INFO:apache_beam.runners.direct.direct_runner:Running pipeline with PrismRunner.\n", |
| "INFO:apache_beam.runners.worker.worker_pool_main:Listening for workers at localhost:45067\n", |
| "INFO:apache_beam.runners.portability.prism_runner:Installing prism from local source into \"/root/.apache_beam/cache/prism/bin\".\n", |
| "INFO:apache_beam.runners.direct.direct_runner:Exception with PrismRunner:\n", |
| " [Errno 2] No such file or directory: 'go'\n", |
| "\n", |
| "INFO:apache_beam.runners.direct.direct_runner:Falling back to DirectRunner\n", |
| "INFO:apache_beam.runners.worker.statecache:Creating state cache with size 104857600\n", |
| "INFO:root:{\"id\": 1, \"firstname\": \"Reeba\", \"age\": 58, \"country\": \"Belgium\", \"profession\": \"unemployed\"}\n", |
| "INFO:root:{\"id\": 2, \"firstname\": \"Maud\", \"age\": 45, \"country\": \"Spain\", \"profession\": \"firefighter\"}\n", |
| "INFO:root:{\"id\": 3, \"firstname\": \"Meg\", \"age\": 11, \"country\": \"France\", \"profession\": \"unemployed\"}\n", |
| "INFO:root:{\"id\": 4, \"firstname\": \"Rani\", \"age\": 53, \"country\": \"Spain\", \"profession\": \"doctor\"}\n", |
| "INFO:root:{\"id\": 5, \"firstname\": \"Natka\", \"age\": 26, \"country\": \"France\", \"profession\": \"doctor\"}\n", |
| "INFO:root:{\"id\": 6, \"firstname\": \"Aurore\", \"age\": 32, \"country\": \"Italy\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 7, \"firstname\": \"Elvira\", \"age\": 39, \"country\": \"Italy\", \"profession\": \"doctor\"}\n", |
| "INFO:root:{\"id\": 9, \"firstname\": \"Lesly\", \"age\": 35, \"country\": \"Spain\", \"profession\": \"firefighter\"}\n", |
| "INFO:root:{\"id\": 10, \"firstname\": \"Orelia\", \"age\": 31, \"country\": \"Germany\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 11, \"firstname\": \"Theodora\", \"age\": 16, \"country\": \"Italy\", \"profession\": \"unemployed\"}\n", |
| "INFO:root:{\"id\": 12, \"firstname\": \"Viviene\", \"age\": 44, \"country\": \"Germany\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 13, \"firstname\": \"Teriann\", \"age\": 50, \"country\": \"Belgium\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 14, \"firstname\": \"Carol-Jean\", \"age\": 23, \"country\": \"Germany\", \"profession\": \"unemployed\"}\n", |
| "INFO:root:{\"id\": 17, \"firstname\": \"Shaylyn\", \"age\": 34, \"country\": \"Belgium\", \"profession\": \"worker\"}\n", |
| "INFO:root:{\"id\": 18, \"firstname\": \"Fayre\", \"age\": 33, \"country\": \"Spain\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 19, \"firstname\": \"Sabina\", \"age\": 52, \"country\": \"Germany\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 20, \"firstname\": \"Aryn\", \"age\": 20, \"country\": \"Belgium\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 21, \"firstname\": \"Darlleen\", \"age\": 49, \"country\": \"Spain\", \"profession\": \"worker\"}\n", |
| "INFO:root:{\"id\": 22, \"firstname\": \"Jere\", \"age\": 18, \"country\": \"Italy\", \"profession\": \"worker\"}\n", |
| "INFO:root:{\"id\": 23, \"firstname\": \"Candi\", \"age\": 60, \"country\": \"Germany\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 24, \"firstname\": \"Sindee\", \"age\": 40, \"country\": \"Germany\", \"profession\": \"firefighter\"}\n", |
| "INFO:root:{\"id\": 25, \"firstname\": \"Selma\", \"age\": 20, \"country\": \"Spain\", \"profession\": \"worker\"}\n", |
| "INFO:root:{\"id\": 26, \"firstname\": \"Vonny\", \"age\": 35, \"country\": \"Germany\", \"profession\": \"doctor\"}\n", |
| "INFO:root:{\"id\": 27, \"firstname\": \"Kate\", \"age\": 53, \"country\": \"Spain\", \"profession\": \"worker\"}\n", |
| "INFO:root:{\"id\": 28, \"firstname\": \"Annabela\", \"age\": 48, \"country\": \"Belgium\", \"profession\": \"worker\"}\n", |
| "INFO:root:{\"id\": 29, \"firstname\": \"Jenilee\", \"age\": 55, \"country\": \"Germany\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 30, \"firstname\": \"Gusella\", \"age\": 44, \"country\": \"France\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 31, \"firstname\": \"Fawne\", \"age\": 35, \"country\": \"Spain\", \"profession\": \"worker\"}\n", |
| "INFO:root:{\"id\": 32, \"firstname\": \"Karolina\", \"age\": 39, \"country\": \"Spain\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 33, \"firstname\": \"Sadie\", \"age\": 58, \"country\": \"Germany\", \"profession\": \"firefighter\"}\n", |
| "INFO:root:{\"id\": 35, \"firstname\": \"Beth\", \"age\": 46, \"country\": \"Spain\", \"profession\": \"firefighter\"}\n", |
| "INFO:root:{\"id\": 36, \"firstname\": \"Adore\", \"age\": 18, \"country\": \"Italy\", \"profession\": \"firefighter\"}\n", |
| "INFO:root:{\"id\": 37, \"firstname\": \"Tarra\", \"age\": 29, \"country\": \"Spain\", \"profession\": \"doctor\"}\n", |
| "INFO:root:{\"id\": 38, \"firstname\": \"Jessamyn\", \"age\": 36, \"country\": \"France\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 39, \"firstname\": \"Deedee\", \"age\": 24, \"country\": \"Germany\", \"profession\": \"unemployed\"}\n", |
| "INFO:root:{\"id\": 40, \"firstname\": \"Patricia\", \"age\": 45, \"country\": \"Italy\", \"profession\": \"doctor\"}\n", |
| "INFO:root:{\"id\": 41, \"firstname\": \"Wileen\", \"age\": 39, \"country\": \"Spain\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 42, \"firstname\": \"Paola\", \"age\": 55, \"country\": \"Italy\", \"profession\": \"worker\"}\n", |
| "INFO:root:{\"id\": 43, \"firstname\": \"Gwyneth\", \"age\": 37, \"country\": \"Italy\", \"profession\": \"worker\"}\n", |
| "INFO:root:{\"id\": 44, \"firstname\": \"Stacey\", \"age\": 36, \"country\": \"Spain\", \"profession\": \"worker\"}\n", |
| "INFO:root:{\"id\": 45, \"firstname\": \"Camile\", \"age\": 60, \"country\": \"Germany\", \"profession\": \"unemployed\"}\n", |
| "INFO:root:{\"id\": 46, \"firstname\": \"Sheree\", \"age\": 10, \"country\": \"Spain\", \"profession\": \"unemployed\"}\n", |
| "INFO:root:{\"id\": 47, \"firstname\": \"Albertina\", \"age\": 53, \"country\": \"France\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 48, \"firstname\": \"Jinny\", \"age\": 30, \"country\": \"Spain\", \"profession\": \"firefighter\"}\n", |
| "INFO:root:{\"id\": 49, \"firstname\": \"Kayla\", \"age\": 57, \"country\": \"Italy\", \"profession\": \"firefighter\"}\n", |
| "INFO:root:{\"id\": 50, \"firstname\": \"Jaime\", \"age\": 55, \"country\": \"France\", \"profession\": \"doctor\"}\n" |
| ] |
| } |
| ], |
| "source": [ |
| "! python -m apache_beam.yaml.main --pipeline_spec_file=pipelines/pipeline-filter-02.yaml" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "pvTQqx75dHOy" |
| }, |
| "source": [ |
| "You'll notice that the output of this pipeline is in a different format than the previous one. That's because this pipeline uses an SQL Filter transform, an example of a [multi-language transform](https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines). Multi-language pipelines are an important feature of Beam, but in this notebook we'll focus on YAML.\n", |
| "\n", |
| "To find the output of this pipeline find lines that begin with 'message' keyword and have the associated 'transform_id' set to a transform starting with 'LogForTesting'.\n", |
| "Example:\n", |
| "```\n", |
| "message: \"{\\\"id\\\":49,\\\"firstname\\\":\\\"Kayla\\\",\\\"age\\\":57,\\\"country\\\":\\\"Italy\\\",\\\"profession\\\":\\\"firefighter\\\"}\"\n", |
| "instruction_id: \"bundle_6\"\n", |
| "transform_id: \"LogForTesting/beam:schematransform:org.apache.beam:yaml:log_for_testing:v1/LogAsJson/ParMultiDo(Anonymous)\"\n", |
| "```\n", |
| "Each log entry represents one element from the output data." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "collapsed": false, |
| "id": "ZQApEwwLdHOy" |
| }, |
| "source": [ |
| "Another useful transform is `MapToFields`. This transform lets us manipulate fields of a record. For example, we can add a field to our records, which is a boolean field specifying if the person is adult or not." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "N9-tGJ7YdHOy", |
| "outputId": "16f016cc-4937-4a05-e736-d0347280c76a", |
| "scrolled": true |
| }, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "Overwriting pipelines/pipeline-map-01.yaml\n" |
| ] |
| } |
| ], |
| "source": [ |
| "%%writefile 'pipelines/pipeline-map-01.yaml'\n", |
| "pipeline:\n", |
| " type: chain\n", |
| " transforms:\n", |
| " - type: ReadFromCsv\n", |
| " config:\n", |
| " path: data/people.csv\n", |
| " - type: MapToFields\n", |
| " config:\n", |
| " language: python\n", |
| " append: true\n", |
| " fields:\n", |
| " is_adult: \"age >= 18\"\n", |
| " - type: LogForTesting\n" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "pmMBhzICdHOy", |
| "outputId": "b8f04f68-501d-4583-8861-159a00ee44ce" |
| }, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "WARNING:root:crcmod package not found. This package is required if python-snappy or google-crc32c are not installed. To ensure crcmod is installed, install the tfrecord extra: pip install apache-beam[tfrecord]\n", |
| "WARNING:apache_beam.options.pipeline_options:Bucket gs://derrickaw-test/temp used as temp_location has soft-delete policy enabled. To avoid being billed for unnecessary storage costs, turn off the soft delete feature on buckets that your Dataflow jobs use for temporary and staging storage. For more information, see https://cloud.google.com/storage/docs/use-soft-delete#remove-soft-delete-policy.\n", |
| "Building pipeline...\n", |
| "2026-01-20 19:59:24.204834: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.\n", |
| "2026-01-20 19:59:24.210121: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.\n", |
| "2026-01-20 19:59:24.223802: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered\n", |
| "WARNING: All log messages before absl::InitializeLog() is called are written to STDERR\n", |
| "E0000 00:00:1768939164.246723 42734 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered\n", |
| "E0000 00:00:1768939164.253449 42734 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered\n", |
| "W0000 00:00:1768939164.271032 42734 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.\n", |
| "W0000 00:00:1768939164.271081 42734 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.\n", |
| "W0000 00:00:1768939164.271086 42734 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.\n", |
| "W0000 00:00:1768939164.271090 42734 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.\n", |
| "2026-01-20 19:59:24.276797: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.\n", |
| "To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.\n", |
| "WARNING:torchao.kernel.intmm:Warning: Detected no triton, on systems without Triton certain kernels will not work\n", |
| "INFO:datasets:TensorFlow version 2.19.0 available.\n", |
| "INFO:datasets:JAX version 0.7.2 available.\n", |
| "/usr/local/lib/python3.12/dist-packages/google/cloud/aiplatform/models.py:52: FutureWarning: Support for google-cloud-storage < 3.0.0 will be removed in a future version of google-cloud-aiplatform. Please upgrade to google-cloud-storage >= 3.0.0.\n", |
| " from google.cloud.aiplatform.utils import gcs_utils\n", |
| "WARNING:root:Could not load ML transform module apache_beam.ml.transforms.tft: No module named 'tensorflow_transform'. Please install the necessary module dependencies\n", |
| "INFO:apache_beam.yaml.yaml_transform:Expanding \"ReadFromCsv\" at line 4 \n", |
| "INFO:root:Computing dataframe stage <ComputeStage(PTransform) label=[[ComputedExpression[get_column_Series_137262476974448], ComputedExpression[astype_Series_137262476972480], ComputedExpression[set_column_DataFrame_137262476980352], ComputedExpression[get_column_Series_137262476976320], ComputedExpression[astype_Series_137262476972624], ComputedExpression[set_column_DataFrame_137262476846304], ComputedExpression[get_column_Series_137262476846448], ComputedExpression[astype_Series_137262476852544], ComputedExpression[set_column_DataFrame_137262476846640]]:137262477220304]> for Stage[inputs={PlaceholderExpression[placeholder_DataFrame_137262476979680]}, partitioning=Arbitrary, ops=[ComputedExpression[get_column_Series_137262476974448], ComputedExpression[astype_Series_137262476972480], ComputedExpression[set_column_DataFrame_137262476980352], ComputedExpression[get_column_Series_137262476976320], ComputedExpression[astype_Series_137262476972624], ComputedExpression[set_column_DataFrame_137262476846304], ComputedExpression[get_column_Series_137262476846448], ComputedExpression[astype_Series_137262476852544], ComputedExpression[set_column_DataFrame_137262476846640]], outputs={ComputedExpression[set_column_DataFrame_137262476846640], PlaceholderExpression[placeholder_DataFrame_137262476979680]}]\n", |
| "INFO:apache_beam.yaml.yaml_transform:Expanding \"MapToFields\" at line 7 \n", |
| "INFO:apache_beam.yaml.yaml_transform:Expanding \"LogForTesting\" at line 13 \n", |
| "Running pipeline...\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:Pipeline has additional dependencies to be installed in SDK worker container, consider using the SDK container image pre-building workflow to avoid repetitive installations. Learn more on https://cloud.google.com/dataflow/docs/guides/using-custom-containers#prebuild\n", |
| "/usr/local/lib/python3.12/dist-packages/apache_beam/runners/dataflow/internal/apiclient.py:1170: UserWarning: A non-standard version of Beam SDK detected: 2.71.0rc3. Dataflow runner will use container image tag 2.71.0. This use case is not supported.\n", |
| " warnings.warn(\n", |
| "/usr/local/lib/python3.12/dist-packages/apache_beam/runners/dataflow/internal/apiclient.py:1170: UserWarning: A non-standard version of Beam SDK detected: 2.71.0rc3. Dataflow runner will use container image tag 2.71.0. This use case is not supported.\n", |
| " warnings.warn(\n", |
| "INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://derrickaw-test/temp/beamapp-root-0120195944-598177-wpyv9jas.1768939184.598369/pickled_main_session...\n", |
| "INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://derrickaw-test/temp/beamapp-root-0120195944-598177-wpyv9jas.1768939184.598369/pickled_main_session in 0 seconds.\n", |
| "INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://derrickaw-test/temp/beamapp-root-0120195944-598177-wpyv9jas.1768939184.598369/submission_environment_dependencies.txt...\n", |
| "INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://derrickaw-test/temp/beamapp-root-0120195944-598177-wpyv9jas.1768939184.598369/submission_environment_dependencies.txt in 0 seconds.\n", |
| "INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://derrickaw-test/temp/beamapp-root-0120195944-598177-wpyv9jas.1768939184.598369/pipeline.pb...\n", |
| "INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://derrickaw-test/temp/beamapp-root-0120195944-598177-wpyv9jas.1768939184.598369/pipeline.pb in 0 seconds.\n", |
| "INFO:apache_beam.runners.dataflow.internal.apiclient:Create job: <Job\n", |
| " clientRequestId: '20260120195944599317-1882'\n", |
| " createTime: '2026-01-20T19:59:46.383307Z'\n", |
| " currentStateTime: '1970-01-01T00:00:00Z'\n", |
| " id: '2026-01-20_11_59_45-3493881273162880179'\n", |
| " location: 'us-central1'\n", |
| " name: 'beamapp-root-0120195944-598177-wpyv9jas'\n", |
| " projectId: 'google.com:clouddfe'\n", |
| " stageStates: []\n", |
| " startTime: '2026-01-20T19:59:46.383307Z'\n", |
| " steps: []\n", |
| " tempFiles: []\n", |
| " type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)>\n", |
| "INFO:apache_beam.runners.dataflow.internal.apiclient:Created job with id: [2026-01-20_11_59_45-3493881273162880179]\n", |
| "INFO:apache_beam.runners.dataflow.internal.apiclient:Submitted job: 2026-01-20_11_59_45-3493881273162880179\n", |
| "INFO:apache_beam.runners.dataflow.internal.apiclient:To access the Dataflow monitoring console, please navigate to https://console.cloud.google.com/dataflow/jobs/us-central1/2026-01-20_11_59_45-3493881273162880179?project=google.com:clouddfe\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:Job 2026-01-20_11_59_45-3493881273162880179 is in state JOB_STATE_PENDING\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:00:01.709Z: JOB_MESSAGE_BASIC: Worker configuration: e2-standard-2 in us-central1-a.\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:00:03.962Z: JOB_MESSAGE_BASIC: Executing operation ReadFromCsv/_ReadFromPandas/Reshuffle/ReshufflePerKey/GroupByKey/Create\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:00:04.060Z: JOB_MESSAGE_BASIC: Starting 1 workers in us-central1-a...\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:00:04.217Z: JOB_MESSAGE_BASIC: Finished operation ReadFromCsv/_ReadFromPandas/Reshuffle/ReshufflePerKey/GroupByKey/Create\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:00:04.350Z: JOB_MESSAGE_BASIC: Executing operation ReadFromCsv/_ReadFromPandas/Create/Impulse+ReadFromCsv/_ReadFromPandas/Create/FlatMap(<lambda at core.py:4094>)+ReadFromCsv/_ReadFromPandas/Create/Map(decode)+ReadFromCsv/_ReadFromPandas/MatchAll/ParDo(_MatchAllFn)+ReadFromCsv/_ReadFromPandas/Map(<lambda at io.py:306>)+ReadFromCsv/_ReadFromPandas/Reshuffle/AddRandomKeys+ReadFromCsv/_ReadFromPandas/Reshuffle/ReshufflePerKey/Map(reify_metadata_default_window)+ReadFromCsv/_ReadFromPandas/Reshuffle/ReshufflePerKey/GroupByKey/Reify+ReadFromCsv/_ReadFromPandas/Reshuffle/ReshufflePerKey/GroupByKey/Write\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:Job 2026-01-20_11_59_45-3493881273162880179 is in state JOB_STATE_RUNNING\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:00:22.374Z: JOB_MESSAGE_BASIC: Your project already contains 100 Dataflow-created metric descriptors, so new user metrics of the form custom.googleapis.com/* will not be created. However, all user metrics are also available in the metric dataflow.googleapis.com/job/user_counter. If you rely on the custom metrics, you can delete old / unused metric descriptors. See https://developers.google.com/apis-explorer/#p/monitoring/v3/monitoring.projects.metricDescriptors.list and https://developers.google.com/apis-explorer/#p/monitoring/v3/monitoring.projects.metricDescriptors.delete\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:02:55.137Z: JOB_MESSAGE_BASIC: All workers have finished the startup processes and began to receive work requests.\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:02:56.323Z: JOB_MESSAGE_BASIC: Finished operation ReadFromCsv/_ReadFromPandas/Create/Impulse+ReadFromCsv/_ReadFromPandas/Create/FlatMap(<lambda at core.py:4094>)+ReadFromCsv/_ReadFromPandas/Create/Map(decode)+ReadFromCsv/_ReadFromPandas/MatchAll/ParDo(_MatchAllFn)+ReadFromCsv/_ReadFromPandas/Map(<lambda at io.py:306>)+ReadFromCsv/_ReadFromPandas/Reshuffle/AddRandomKeys+ReadFromCsv/_ReadFromPandas/Reshuffle/ReshufflePerKey/Map(reify_metadata_default_window)+ReadFromCsv/_ReadFromPandas/Reshuffle/ReshufflePerKey/GroupByKey/Reify+ReadFromCsv/_ReadFromPandas/Reshuffle/ReshufflePerKey/GroupByKey/Write\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:02:56.400Z: JOB_MESSAGE_BASIC: Executing operation ReadFromCsv/_ReadFromPandas/Reshuffle/ReshufflePerKey/GroupByKey/Close\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:02:56.448Z: JOB_MESSAGE_BASIC: Executing operation ReadFromCsv/_ReadFromPandas/Map(<lambda at io.py:301>)/View-python_side_input0\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:02:56.451Z: JOB_MESSAGE_BASIC: Finished operation ReadFromCsv/_ReadFromPandas/Reshuffle/ReshufflePerKey/GroupByKey/Close\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:02:56.525Z: JOB_MESSAGE_BASIC: Finished operation ReadFromCsv/_ReadFromPandas/Map(<lambda at io.py:301>)/View-python_side_input0\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:02:56.628Z: JOB_MESSAGE_BASIC: Executing operation ReadFromCsv/_ReadFromPandas/DoOnce/Impulse+ReadFromCsv/_ReadFromPandas/DoOnce/FlatMap(<lambda at core.py:4094>)+ReadFromCsv/_ReadFromPandas/DoOnce/Map(decode)+ReadFromCsv/_ReadFromPandas/Map(<lambda at io.py:301>)\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:02:58.379Z: JOB_MESSAGE_BASIC: Finished operation ReadFromCsv/_ReadFromPandas/DoOnce/Impulse+ReadFromCsv/_ReadFromPandas/DoOnce/FlatMap(<lambda at core.py:4094>)+ReadFromCsv/_ReadFromPandas/DoOnce/Map(decode)+ReadFromCsv/_ReadFromPandas/Map(<lambda at io.py:301>)\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:02:58.492Z: JOB_MESSAGE_BASIC: Executing operation ReadFromCsv/_ReadFromPandas/ParDo(_ReadFromPandasDoFn)/View-python_side_input0\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:02:58.546Z: JOB_MESSAGE_BASIC: Finished operation ReadFromCsv/_ReadFromPandas/ParDo(_ReadFromPandasDoFn)/View-python_side_input0\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:02:58.644Z: JOB_MESSAGE_BASIC: Executing operation ReadFromCsv/_ReadFromPandas/Reshuffle/ReshufflePerKey/GroupByKey/Read+ReadFromCsv/_ReadFromPandas/Reshuffle/ReshufflePerKey/GroupByKey/GroupByWindow+ReadFromCsv/_ReadFromPandas/Reshuffle/ReshufflePerKey/FlatMap(restore_metadata_default_window)+ReadFromCsv/_ReadFromPandas/Reshuffle/RemoveRandomKeys+ReadFromCsv/_ReadFromPandas/ReadMatches/ParDo(_ReadMatchesFn)+ReadFromCsv/_ReadFromPandas/ParDo(_ReadFromPandasDoFn)/PairWithRestriction+ReadFromCsv/_ReadFromPandas/ParDo(_ReadFromPandasDoFn)/SplitWithSizing\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:03:00.366Z: JOB_MESSAGE_BASIC: Finished operation ReadFromCsv/_ReadFromPandas/Reshuffle/ReshufflePerKey/GroupByKey/Read+ReadFromCsv/_ReadFromPandas/Reshuffle/ReshufflePerKey/GroupByKey/GroupByWindow+ReadFromCsv/_ReadFromPandas/Reshuffle/ReshufflePerKey/FlatMap(restore_metadata_default_window)+ReadFromCsv/_ReadFromPandas/Reshuffle/RemoveRandomKeys+ReadFromCsv/_ReadFromPandas/ReadMatches/ParDo(_ReadMatchesFn)+ReadFromCsv/_ReadFromPandas/ParDo(_ReadFromPandasDoFn)/PairWithRestriction+ReadFromCsv/_ReadFromPandas/ParDo(_ReadFromPandasDoFn)/SplitWithSizing\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:03:00.490Z: JOB_MESSAGE_BASIC: Executing operation ReadFromCsv/_ReadFromPandas/ParDo(_ReadFromPandasDoFn)/ProcessElementAndRestrictionWithSizing+ReadFromCsv/ToPCollection(df)/[ComputedExpression[get_column_Series_137262476974448], ComputedExpression[astype_Series_137262476972480], ComputedExpression[set_column_DataFrame_137262476980352], ComputedExpression[get_column_Series_137262476976320], ComputedExpression[astype_Series_137262476972624], ComputedExpression[set_column_DataFrame_137262476846304], ComputedExpression[get_column_Series_137262476846448], ComputedExpression[astype_Series_137262476852544], ComputedExpression[set_column_DataFrame_137262476846640]]:137262477220304/Map(<lambda at transforms.py:239>)+ReadFromCsv/ToPCollection(df)/[ComputedExpression[get_column_Series_137262476974448], ComputedExpression[astype_Series_137262476972480], ComputedExpression[set_column_DataFrame_137262476980352], ComputedExpression[get_column_Series_137262476976320], ComputedExpression[astype_Series_137262476972624], ComputedExpression[set_column_DataFrame_137262476846304], ComputedExpression[get_column_Series_137262476846448], ComputedExpression[astype_Series_137262476852544], ComputedExpression[set_column_DataFrame_137262476846640]]:137262477220304/FlatMap(evaluate)/FlatMap(evaluate)+ReadFromCsv/Unbatch 'set_column_DataFrame_137262476846640'+MapToFields/ToRows(id, firstname, age, country, profession, is_adult)/Map(<lambda at core.py:3605>)+log_for_testing/LogForTesting\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:03:01.159Z: JOB_MESSAGE_BASIC: Finished operation ReadFromCsv/_ReadFromPandas/ParDo(_ReadFromPandasDoFn)/ProcessElementAndRestrictionWithSizing+ReadFromCsv/ToPCollection(df)/[ComputedExpression[get_column_Series_137262476974448], ComputedExpression[astype_Series_137262476972480], ComputedExpression[set_column_DataFrame_137262476980352], ComputedExpression[get_column_Series_137262476976320], ComputedExpression[astype_Series_137262476972624], ComputedExpression[set_column_DataFrame_137262476846304], ComputedExpression[get_column_Series_137262476846448], ComputedExpression[astype_Series_137262476852544], ComputedExpression[set_column_DataFrame_137262476846640]]:137262477220304/Map(<lambda at transforms.py:239>)+ReadFromCsv/ToPCollection(df)/[ComputedExpression[get_column_Series_137262476974448], ComputedExpression[astype_Series_137262476972480], ComputedExpression[set_column_DataFrame_137262476980352], ComputedExpression[get_column_Series_137262476976320], ComputedExpression[astype_Series_137262476972624], ComputedExpression[set_column_DataFrame_137262476846304], ComputedExpression[get_column_Series_137262476846448], ComputedExpression[astype_Series_137262476852544], ComputedExpression[set_column_DataFrame_137262476846640]]:137262477220304/FlatMap(evaluate)/FlatMap(evaluate)+ReadFromCsv/Unbatch 'set_column_DataFrame_137262476846640'+MapToFields/ToRows(id, firstname, age, country, profession, is_adult)/Map(<lambda at core.py:3605>)+log_for_testing/LogForTesting\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:03:01.474Z: JOB_MESSAGE_BASIC: Stopping worker pool...\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:05:24.129Z: JOB_MESSAGE_BASIC: Worker pool stopped.\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:Job 2026-01-20_11_59_45-3493881273162880179 is in state JOB_STATE_DONE\n" |
| ] |
| } |
| ], |
| "source": [ |
| "! python -m apache_beam.yaml.main --pipeline_spec_file=pipelines/pipeline-map-01.yaml" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "HYSLONiCdHOy" |
| }, |
| "source": [ |
| "Beam will try to infer the types involved in the mappings, but sometimes this is not possible. In these cases we can explicitly denote the expected output type, e.g." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "YeBO2xJ8dHOy", |
| "outputId": "2e5ea097-933b-4650-ac12-94309064b77c", |
| "scrolled": true |
| }, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "Writing pipelines/pipeline-map-02.yaml\n" |
| ] |
| } |
| ], |
| "source": [ |
| "%%writefile 'pipelines/pipeline-map-02.yaml'\n", |
| "pipeline:\n", |
| " type: chain\n", |
| " transforms:\n", |
| " - type: ReadFromCsv\n", |
| " config:\n", |
| " path: data/people.csv\n", |
| " - type: MapToFields\n", |
| " config:\n", |
| " language: python\n", |
| " append: true\n", |
| " fields:\n", |
| " is_adult:\n", |
| " expression: \"age >= 18\"\n", |
| " output_type: boolean\n", |
| " - type: LogForTesting" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "0o-UbRsCdHOy", |
| "outputId": "b3b27ab9-b87d-46af-a173-b8c31fabf5a9" |
| }, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "WARNING:root:crcmod package not found. This package is required if python-snappy or google-crc32c are not installed. To ensure crcmod is installed, install the tfrecord extra: pip install apache-beam[tfrecord]\n", |
| "Building pipeline...\n", |
| "2026-01-20 20:09:28.226903: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.\n", |
| "2026-01-20 20:09:28.234402: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.\n", |
| "2026-01-20 20:09:28.256140: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered\n", |
| "WARNING: All log messages before absl::InitializeLog() is called are written to STDERR\n", |
| "E0000 00:00:1768939768.293682 45220 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered\n", |
| "E0000 00:00:1768939768.304832 45220 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered\n", |
| "W0000 00:00:1768939768.334826 45220 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.\n", |
| "W0000 00:00:1768939768.334876 45220 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.\n", |
| "W0000 00:00:1768939768.334885 45220 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.\n", |
| "W0000 00:00:1768939768.334893 45220 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.\n", |
| "2026-01-20 20:09:28.344084: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.\n", |
| "To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.\n", |
| "WARNING:torchao.kernel.intmm:Warning: Detected no triton, on systems without Triton certain kernels will not work\n", |
| "INFO:datasets:TensorFlow version 2.19.0 available.\n", |
| "INFO:datasets:JAX version 0.7.2 available.\n", |
| "/usr/local/lib/python3.12/dist-packages/google/cloud/aiplatform/models.py:52: FutureWarning: Support for google-cloud-storage < 3.0.0 will be removed in a future version of google-cloud-aiplatform. Please upgrade to google-cloud-storage >= 3.0.0.\n", |
| " from google.cloud.aiplatform.utils import gcs_utils\n", |
| "WARNING:root:Could not load ML transform module apache_beam.ml.transforms.tft: No module named 'tensorflow_transform'. Please install the necessary module dependencies\n", |
| "INFO:apache_beam.yaml.yaml_transform:Expanding \"ReadFromCsv\" at line 4 \n", |
| "INFO:root:Computing dataframe stage <ComputeStage(PTransform) label=[[ComputedExpression[get_column_Series_135805658476672], ComputedExpression[astype_Series_135805658464528], ComputedExpression[set_column_DataFrame_135805658471968], ComputedExpression[get_column_Series_135805658469136], ComputedExpression[astype_Series_135805658470480], ComputedExpression[set_column_DataFrame_135805658467600], ComputedExpression[get_column_Series_135805658467696], ComputedExpression[astype_Series_135805658465824], ComputedExpression[set_column_DataFrame_135805659000000]]:135805659147936]> for Stage[inputs={PlaceholderExpression[placeholder_DataFrame_135805658480560]}, partitioning=Arbitrary, ops=[ComputedExpression[get_column_Series_135805658476672], ComputedExpression[astype_Series_135805658464528], ComputedExpression[set_column_DataFrame_135805658471968], ComputedExpression[get_column_Series_135805658469136], ComputedExpression[astype_Series_135805658470480], ComputedExpression[set_column_DataFrame_135805658467600], ComputedExpression[get_column_Series_135805658467696], ComputedExpression[astype_Series_135805658465824], ComputedExpression[set_column_DataFrame_135805659000000]], outputs={ComputedExpression[set_column_DataFrame_135805659000000], PlaceholderExpression[placeholder_DataFrame_135805658480560]}]\n", |
| "INFO:apache_beam.yaml.yaml_transform:Expanding \"MapToFields\" at line 7 \n", |
| "INFO:apache_beam.yaml.yaml_transform:Expanding \"LogForTesting\" at line 15 \n", |
| "Running pipeline...\n", |
| "INFO:apache_beam.runners.worker.worker_pool_main:Listening for workers at localhost:38505\n", |
| "INFO:apache_beam.runners.portability.prism_runner:Using cached prism binary/zip from /root/.apache_beam/cache/prism/bin/apache_beam-v2.71.0-prism-linux-amd64.zip for https://github.com/apache/beam/releases/download/v2.71.0-RC3/apache_beam-v2.71.0-prism-linux-amd64.zip\n", |
| "INFO:apache_beam.runners.portability.prism_runner:Using cached prism binary from /root/.apache_beam/cache/prism/bin/apache_beam-v2.71.0-prism-linux-amd64 for /root/.apache_beam/cache/prism/bin/apache_beam-v2.71.0-prism-linux-amd64.zip\n", |
| "INFO:apache_beam.runners.portability.prism_runner:Prism binary path resolved to: /root/.apache_beam/cache/prism/bin/apache_beam-v2.71.0-prism-linux-amd64\n", |
| "INFO:apache_beam.utils.subprocess_server:Starting service with ('/root/.apache_beam/cache/prism/bin/apache_beam-v2.71.0-prism-linux-amd64' '--job_port' '53783' '--log_level' 'info' '--log_kind' 'json' '--serve_http=false')\n", |
| "INFO:PrismRunner:Serving JobManagement (endpoint='localhost:53783')\n", |
| "INFO:apache_beam.runners.portability.portable_runner:Environment \"LOOPBACK\" has started a component necessary for the execution. Be sure to run the pipeline using\n", |
| " with Pipeline() as p:\n", |
| " p.apply(..)\n", |
| "This ensures that the pipeline finishes before this program exits.\n", |
| "INFO:apache_beam.runners.portability.portable_runner:Job state changed to STOPPED\n", |
| "INFO:root:starting job-001[job]\n", |
| "INFO:apache_beam.runners.worker.statecache:Creating state cache with size 0\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:Creating insecure control channel for localhost:53783.\n", |
| "INFO:root:running job-001[job]\n", |
| "INFO:apache_beam.runners.portability.portable_runner:Job state changed to RUNNING\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:Control channel established.\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:Initializing SDKHarness with unbounded number of workers.\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:Creating insecure state channel for localhost:53783.\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:State channel established.\n", |
| "INFO:apache_beam.runners.worker.data_plane:Creating client data channel for localhost:53783\n", |
| "INFO:apache_beam.runners.worker.data_plane:Data channel established.\n", |
| "INFO:root:{\"id\": 1, \"firstname\": \"Reeba\", \"age\": 58, \"country\": \"Belgium\", \"profession\": \"unemployed\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 2, \"firstname\": \"Maud\", \"age\": 45, \"country\": \"Spain\", \"profession\": \"firefighter\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 3, \"firstname\": \"Meg\", \"age\": 11, \"country\": \"France\", \"profession\": \"unemployed\", \"is_adult\": false}\n", |
| "INFO:root:{\"id\": 4, \"firstname\": \"Rani\", \"age\": 53, \"country\": \"Spain\", \"profession\": \"doctor\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 5, \"firstname\": \"Natka\", \"age\": 26, \"country\": \"France\", \"profession\": \"doctor\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 6, \"firstname\": \"Aurore\", \"age\": 32, \"country\": \"Italy\", \"profession\": \"police officer\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 7, \"firstname\": \"Elvira\", \"age\": 39, \"country\": \"Italy\", \"profession\": \"doctor\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 8, \"firstname\": \"Asia\", \"age\": 10, \"country\": \"Belgium\", \"profession\": \"doctor\", \"is_adult\": false}\n", |
| "INFO:root:{\"id\": 9, \"firstname\": \"Lesly\", \"age\": 35, \"country\": \"Spain\", \"profession\": \"firefighter\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 10, \"firstname\": \"Orelia\", \"age\": 31, \"country\": \"Germany\", \"profession\": \"police officer\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 11, \"firstname\": \"Theodora\", \"age\": 16, \"country\": \"Italy\", \"profession\": \"unemployed\", \"is_adult\": false}\n", |
| "INFO:root:{\"id\": 12, \"firstname\": \"Viviene\", \"age\": 44, \"country\": \"Germany\", \"profession\": \"police officer\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 13, \"firstname\": \"Teriann\", \"age\": 50, \"country\": \"Belgium\", \"profession\": \"police officer\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 14, \"firstname\": \"Carol-Jean\", \"age\": 23, \"country\": \"Germany\", \"profession\": \"unemployed\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 15, \"firstname\": \"Drucie\", \"age\": 15, \"country\": \"Spain\", \"profession\": \"police officer\", \"is_adult\": false}\n", |
| "INFO:root:{\"id\": 16, \"firstname\": \"Elie\", \"age\": 10, \"country\": \"Italy\", \"profession\": \"doctor\", \"is_adult\": false}\n", |
| "INFO:root:{\"id\": 17, \"firstname\": \"Shaylyn\", \"age\": 34, \"country\": \"Belgium\", \"profession\": \"worker\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 18, \"firstname\": \"Fayre\", \"age\": 33, \"country\": \"Spain\", \"profession\": \"police officer\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 19, \"firstname\": \"Sabina\", \"age\": 52, \"country\": \"Germany\", \"profession\": \"police officer\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 20, \"firstname\": \"Aryn\", \"age\": 20, \"country\": \"Belgium\", \"profession\": \"police officer\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 21, \"firstname\": \"Darlleen\", \"age\": 49, \"country\": \"Spain\", \"profession\": \"worker\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 22, \"firstname\": \"Jere\", \"age\": 18, \"country\": \"Italy\", \"profession\": \"worker\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 23, \"firstname\": \"Candi\", \"age\": 60, \"country\": \"Germany\", \"profession\": \"police officer\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 24, \"firstname\": \"Sindee\", \"age\": 40, \"country\": \"Germany\", \"profession\": \"firefighter\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 25, \"firstname\": \"Selma\", \"age\": 20, \"country\": \"Spain\", \"profession\": \"worker\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 26, \"firstname\": \"Vonny\", \"age\": 35, \"country\": \"Germany\", \"profession\": \"doctor\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 27, \"firstname\": \"Kate\", \"age\": 53, \"country\": \"Spain\", \"profession\": \"worker\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 28, \"firstname\": \"Annabela\", \"age\": 48, \"country\": \"Belgium\", \"profession\": \"worker\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 29, \"firstname\": \"Jenilee\", \"age\": 55, \"country\": \"Germany\", \"profession\": \"police officer\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 30, \"firstname\": \"Gusella\", \"age\": 44, \"country\": \"France\", \"profession\": \"police officer\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 31, \"firstname\": \"Fawne\", \"age\": 35, \"country\": \"Spain\", \"profession\": \"worker\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 32, \"firstname\": \"Karolina\", \"age\": 39, \"country\": \"Spain\", \"profession\": \"police officer\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 33, \"firstname\": \"Sadie\", \"age\": 58, \"country\": \"Germany\", \"profession\": \"firefighter\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 34, \"firstname\": \"Clo\", \"age\": 10, \"country\": \"Italy\", \"profession\": \"police officer\", \"is_adult\": false}\n", |
| "INFO:root:{\"id\": 35, \"firstname\": \"Beth\", \"age\": 46, \"country\": \"Spain\", \"profession\": \"firefighter\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 36, \"firstname\": \"Adore\", \"age\": 18, \"country\": \"Italy\", \"profession\": \"firefighter\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 37, \"firstname\": \"Tarra\", \"age\": 29, \"country\": \"Spain\", \"profession\": \"doctor\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 38, \"firstname\": \"Jessamyn\", \"age\": 36, \"country\": \"France\", \"profession\": \"police officer\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 39, \"firstname\": \"Deedee\", \"age\": 24, \"country\": \"Germany\", \"profession\": \"unemployed\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 40, \"firstname\": \"Patricia\", \"age\": 45, \"country\": \"Italy\", \"profession\": \"doctor\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 41, \"firstname\": \"Wileen\", \"age\": 39, \"country\": \"Spain\", \"profession\": \"police officer\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 42, \"firstname\": \"Paola\", \"age\": 55, \"country\": \"Italy\", \"profession\": \"worker\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 43, \"firstname\": \"Gwyneth\", \"age\": 37, \"country\": \"Italy\", \"profession\": \"worker\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 44, \"firstname\": \"Stacey\", \"age\": 36, \"country\": \"Spain\", \"profession\": \"worker\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 45, \"firstname\": \"Camile\", \"age\": 60, \"country\": \"Germany\", \"profession\": \"unemployed\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 46, \"firstname\": \"Sheree\", \"age\": 10, \"country\": \"Spain\", \"profession\": \"unemployed\", \"is_adult\": false}\n", |
| "INFO:root:{\"id\": 47, \"firstname\": \"Albertina\", \"age\": 53, \"country\": \"France\", \"profession\": \"police officer\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 48, \"firstname\": \"Jinny\", \"age\": 30, \"country\": \"Spain\", \"profession\": \"firefighter\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 49, \"firstname\": \"Kayla\", \"age\": 57, \"country\": \"Italy\", \"profession\": \"firefighter\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 50, \"firstname\": \"Jaime\", \"age\": 55, \"country\": \"France\", \"profession\": \"doctor\", \"is_adult\": true}\n", |
| "INFO:PrismRunner:pipeline done! (job='job-001[job]')\n", |
| "INFO:root:pipeline completed job-001[job]\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:No more requests from control plane\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:SDK Harness waiting for in-flight requests to complete\n", |
| "INFO:root:terminating job-001[job]\n", |
| "INFO:apache_beam.runners.worker.data_plane:Closing all cached grpc data channels.\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:Closing all cached gRPC state handlers.\n", |
| "INFO:apache_beam.runners.portability.portable_runner:Job state changed to DONE\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:Done consuming work.\n" |
| ] |
| } |
| ], |
| "source": [ |
| "! python -m apache_beam.yaml.main --pipeline_spec_file=pipelines/pipeline-map-02.yaml" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "eYBJyrp0dHOy" |
| }, |
| "source": [ |
| "When the `append` field is specified, one can `drop` fields as well, e.g." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "M4Fw0BZodHOy", |
| "outputId": "3ce0ca08-88f3-4451-d0dd-2cf5e193d31d", |
| "scrolled": true |
| }, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "Writing pipelines/pipeline-map-03.yaml\n" |
| ] |
| } |
| ], |
| "source": [ |
| "%%writefile 'pipelines/pipeline-map-03.yaml'\n", |
| "pipeline:\n", |
| " type: chain\n", |
| " transforms:\n", |
| " - type: ReadFromCsv\n", |
| " config:\n", |
| " path: data/people.csv\n", |
| " - type: MapToFields\n", |
| " config:\n", |
| " language: python\n", |
| " append: true\n", |
| " fields:\n", |
| " is_adult: \"age >= 18\"\n", |
| " drop:\n", |
| " - age\n", |
| " - type: LogForTesting" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "uIIr5xsVdHOy", |
| "outputId": "5dd65222-ce0d-4179-8e37-0053aedcefdd" |
| }, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "WARNING:root:crcmod package not found. This package is required if python-snappy or google-crc32c are not installed. To ensure crcmod is installed, install the tfrecord extra: pip install apache-beam[tfrecord]\n", |
| "INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.\n", |
| "Building pipeline...\n", |
| "2026-01-20 18:04:32.570256: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.\n", |
| "2026-01-20 18:04:32.575158: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.\n", |
| "2026-01-20 18:04:32.588710: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered\n", |
| "WARNING: All log messages before absl::InitializeLog() is called are written to STDERR\n", |
| "E0000 00:00:1768932272.611562 13383 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered\n", |
| "E0000 00:00:1768932272.619408 13383 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered\n", |
| "W0000 00:00:1768932272.636749 13383 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.\n", |
| "W0000 00:00:1768932272.636793 13383 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.\n", |
| "W0000 00:00:1768932272.636798 13383 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.\n", |
| "W0000 00:00:1768932272.636803 13383 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.\n", |
| "2026-01-20 18:04:32.642311: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.\n", |
| "To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.\n", |
| "WARNING:torchao.kernel.intmm:Warning: Detected no triton, on systems without Triton certain kernels will not work\n", |
| "INFO:datasets:TensorFlow version 2.19.0 available.\n", |
| "INFO:datasets:JAX version 0.7.2 available.\n", |
| "WARNING:root:Could not load ML transform module apache_beam.ml.transforms.tft: No module named 'tensorflow_transform'. Please install the necessary module dependencies\n", |
| "INFO:apache_beam.yaml.yaml_transform:Expanding \"ReadFromCsv\" at line 4 \n", |
| "WARNING:apache_beam.transforms.core:Using yield and return in the process method of <class 'apache_beam.io.fileio._ReadMatchesFn'> can lead to unexpected behavior, see:https://github.com/apache/beam/issues/22969.\n", |
| "INFO:root:Computing dataframe stage <ComputeStage(PTransform) label=[[ComputedExpression[get_column_Series_20636135056992], ComputedExpression[astype_Series_20636135058192], ComputedExpression[set_column_DataFrame_20636135049072], ComputedExpression[get_column_Series_20636135056800], ComputedExpression[astype_Series_20636135056416], ComputedExpression[set_column_DataFrame_20636136617312], ComputedExpression[get_column_Series_20636136607952], ComputedExpression[astype_Series_20636136617264], ComputedExpression[set_column_DataFrame_20636136606320]]:20636136619136]> for Stage[inputs={PlaceholderExpression[placeholder_DataFrame_20636135062032]}, partitioning=Arbitrary, ops=[ComputedExpression[get_column_Series_20636135056992], ComputedExpression[astype_Series_20636135058192], ComputedExpression[set_column_DataFrame_20636135049072], ComputedExpression[get_column_Series_20636135056800], ComputedExpression[astype_Series_20636135056416], ComputedExpression[set_column_DataFrame_20636136617312], ComputedExpression[get_column_Series_20636136607952], ComputedExpression[astype_Series_20636136617264], ComputedExpression[set_column_DataFrame_20636136606320]], outputs={ComputedExpression[set_column_DataFrame_20636136606320], PlaceholderExpression[placeholder_DataFrame_20636135062032]}]\n", |
| "INFO:apache_beam.yaml.yaml_transform:Expanding \"MapToFields\" at line 7 \n", |
| "INFO:apache_beam.yaml.yaml_transform:Expanding \"LogForTesting\" at line 15 \n", |
| "Running pipeline...\n", |
| "INFO:apache_beam.runners.direct.direct_runner:Running pipeline with PrismRunner.\n", |
| "INFO:apache_beam.runners.worker.worker_pool_main:Listening for workers at localhost:33695\n", |
| "INFO:apache_beam.runners.portability.prism_runner:Using cached prism binary/zip from /root/.apache_beam/cache/prism/bin/apache_beam-v2.70.0-prism-linux-amd64.zip for https://github.com/apache/beam/releases/download/v2.70.0/apache_beam-v2.70.0-prism-linux-amd64.zip\n", |
| "INFO:apache_beam.runners.portability.prism_runner:Using cached prism binary from /root/.apache_beam/cache/prism/bin/apache_beam-v2.70.0-prism-linux-amd64 for /root/.apache_beam/cache/prism/bin/apache_beam-v2.70.0-prism-linux-amd64.zip\n", |
| "INFO:apache_beam.runners.portability.prism_runner:Prism binary path resolved to: /root/.apache_beam/cache/prism/bin/apache_beam-v2.70.0-prism-linux-amd64\n", |
| "INFO:apache_beam.utils.subprocess_server:Starting service with ('/root/.apache_beam/cache/prism/bin/apache_beam-v2.70.0-prism-linux-amd64' '--job_port' '45693' '--log_level' 'info' '--log_kind' 'json' '--serve_http=false')\n", |
| "INFO:PrismRunner:Serving JobManagement (endpoint='localhost:45693')\n", |
| "INFO:apache_beam.runners.portability.portable_runner:Environment \"LOOPBACK\" has started a component necessary for the execution. Be sure to run the pipeline using\n", |
| " with Pipeline() as p:\n", |
| " p.apply(..)\n", |
| "This ensures that the pipeline finishes before this program exits.\n", |
| "INFO:apache_beam.runners.portability.portable_runner:Job state changed to STOPPED\n", |
| "INFO:root:starting job-001[job]\n", |
| "INFO:apache_beam.runners.worker.statecache:Creating state cache with size 0\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:Creating insecure control channel for localhost:45693.\n", |
| "INFO:root:running job-001[job]\n", |
| "INFO:apache_beam.runners.portability.portable_runner:Job state changed to RUNNING\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:Control channel established.\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:Initializing SDKHarness with unbounded number of workers.\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:Creating insecure state channel for localhost:45693.\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:State channel established.\n", |
| "INFO:apache_beam.runners.worker.data_plane:Creating client data channel for localhost:45693\n", |
| "INFO:apache_beam.runners.worker.data_plane:Data channel established.\n", |
| "INFO:root:{\"id\": 1, \"firstname\": \"Reeba\", \"country\": \"Belgium\", \"profession\": \"unemployed\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 2, \"firstname\": \"Maud\", \"country\": \"Spain\", \"profession\": \"firefighter\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 3, \"firstname\": \"Meg\", \"country\": \"France\", \"profession\": \"unemployed\", \"is_adult\": false}\n", |
| "INFO:root:{\"id\": 4, \"firstname\": \"Rani\", \"country\": \"Spain\", \"profession\": \"doctor\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 5, \"firstname\": \"Natka\", \"country\": \"France\", \"profession\": \"doctor\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 6, \"firstname\": \"Aurore\", \"country\": \"Italy\", \"profession\": \"police officer\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 7, \"firstname\": \"Elvira\", \"country\": \"Italy\", \"profession\": \"doctor\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 8, \"firstname\": \"Asia\", \"country\": \"Belgium\", \"profession\": \"doctor\", \"is_adult\": false}\n", |
| "INFO:root:{\"id\": 9, \"firstname\": \"Lesly\", \"country\": \"Spain\", \"profession\": \"firefighter\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 10, \"firstname\": \"Orelia\", \"country\": \"Germany\", \"profession\": \"police officer\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 11, \"firstname\": \"Theodora\", \"country\": \"Italy\", \"profession\": \"unemployed\", \"is_adult\": false}\n", |
| "INFO:root:{\"id\": 12, \"firstname\": \"Viviene\", \"country\": \"Germany\", \"profession\": \"police officer\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 13, \"firstname\": \"Teriann\", \"country\": \"Belgium\", \"profession\": \"police officer\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 14, \"firstname\": \"Carol-Jean\", \"country\": \"Germany\", \"profession\": \"unemployed\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 15, \"firstname\": \"Drucie\", \"country\": \"Spain\", \"profession\": \"police officer\", \"is_adult\": false}\n", |
| "INFO:root:{\"id\": 16, \"firstname\": \"Elie\", \"country\": \"Italy\", \"profession\": \"doctor\", \"is_adult\": false}\n", |
| "INFO:root:{\"id\": 17, \"firstname\": \"Shaylyn\", \"country\": \"Belgium\", \"profession\": \"worker\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 18, \"firstname\": \"Fayre\", \"country\": \"Spain\", \"profession\": \"police officer\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 19, \"firstname\": \"Sabina\", \"country\": \"Germany\", \"profession\": \"police officer\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 20, \"firstname\": \"Aryn\", \"country\": \"Belgium\", \"profession\": \"police officer\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 21, \"firstname\": \"Darlleen\", \"country\": \"Spain\", \"profession\": \"worker\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 22, \"firstname\": \"Jere\", \"country\": \"Italy\", \"profession\": \"worker\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 23, \"firstname\": \"Candi\", \"country\": \"Germany\", \"profession\": \"police officer\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 24, \"firstname\": \"Sindee\", \"country\": \"Germany\", \"profession\": \"firefighter\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 25, \"firstname\": \"Selma\", \"country\": \"Spain\", \"profession\": \"worker\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 26, \"firstname\": \"Vonny\", \"country\": \"Germany\", \"profession\": \"doctor\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 27, \"firstname\": \"Kate\", \"country\": \"Spain\", \"profession\": \"worker\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 28, \"firstname\": \"Annabela\", \"country\": \"Belgium\", \"profession\": \"worker\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 29, \"firstname\": \"Jenilee\", \"country\": \"Germany\", \"profession\": \"police officer\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 30, \"firstname\": \"Gusella\", \"country\": \"France\", \"profession\": \"police officer\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 31, \"firstname\": \"Fawne\", \"country\": \"Spain\", \"profession\": \"worker\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 32, \"firstname\": \"Karolina\", \"country\": \"Spain\", \"profession\": \"police officer\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 33, \"firstname\": \"Sadie\", \"country\": \"Germany\", \"profession\": \"firefighter\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 34, \"firstname\": \"Clo\", \"country\": \"Italy\", \"profession\": \"police officer\", \"is_adult\": false}\n", |
| "INFO:root:{\"id\": 35, \"firstname\": \"Beth\", \"country\": \"Spain\", \"profession\": \"firefighter\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 36, \"firstname\": \"Adore\", \"country\": \"Italy\", \"profession\": \"firefighter\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 37, \"firstname\": \"Tarra\", \"country\": \"Spain\", \"profession\": \"doctor\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 38, \"firstname\": \"Jessamyn\", \"country\": \"France\", \"profession\": \"police officer\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 39, \"firstname\": \"Deedee\", \"country\": \"Germany\", \"profession\": \"unemployed\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 40, \"firstname\": \"Patricia\", \"country\": \"Italy\", \"profession\": \"doctor\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 41, \"firstname\": \"Wileen\", \"country\": \"Spain\", \"profession\": \"police officer\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 42, \"firstname\": \"Paola\", \"country\": \"Italy\", \"profession\": \"worker\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 43, \"firstname\": \"Gwyneth\", \"country\": \"Italy\", \"profession\": \"worker\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 44, \"firstname\": \"Stacey\", \"country\": \"Spain\", \"profession\": \"worker\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 45, \"firstname\": \"Camile\", \"country\": \"Germany\", \"profession\": \"unemployed\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 46, \"firstname\": \"Sheree\", \"country\": \"Spain\", \"profession\": \"unemployed\", \"is_adult\": false}\n", |
| "INFO:root:{\"id\": 47, \"firstname\": \"Albertina\", \"country\": \"France\", \"profession\": \"police officer\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 48, \"firstname\": \"Jinny\", \"country\": \"Spain\", \"profession\": \"firefighter\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 49, \"firstname\": \"Kayla\", \"country\": \"Italy\", \"profession\": \"firefighter\", \"is_adult\": true}\n", |
| "INFO:root:{\"id\": 50, \"firstname\": \"Jaime\", \"country\": \"France\", \"profession\": \"doctor\", \"is_adult\": true}\n", |
| "INFO:PrismRunner:pipeline done! (job='job-001[job]')\n", |
| "INFO:root:pipeline completed job-001[job]\n", |
| "INFO:root:terminating job-001[job]\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:No more requests from control plane\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:SDK Harness waiting for in-flight requests to complete\n", |
| "INFO:apache_beam.runners.worker.data_plane:Closing all cached grpc data channels.\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:Closing all cached gRPC state handlers.\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:Done consuming work.\n", |
| "INFO:apache_beam.runners.portability.portable_runner:Job state changed to DONE\n" |
| ] |
| } |
| ], |
| "source": [ |
| "! python -m apache_beam.yaml.main --pipeline_spec_file=pipelines/pipeline-map-03.yaml" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "3rlGG1DbdHOy" |
| }, |
| "source": [ |
| "We can also create simple UDFs (User Defined Functions) using Python or other languages. In the example below we add a field `random_number` which value is a random number not bigger than the age of the person." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "0JYaBqi9dHOy", |
| "outputId": "59250be7-0142-42c7-d9e2-aba7b1917225", |
| "scrolled": true |
| }, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "Writing pipelines/pipeline-map-04.yaml\n" |
| ] |
| } |
| ], |
| "source": [ |
| "%%writefile 'pipelines/pipeline-map-04.yaml'\n", |
| "pipeline:\n", |
| " type: chain\n", |
| " transforms:\n", |
| " - type: ReadFromCsv\n", |
| " config:\n", |
| " path: data/people.csv\n", |
| " - type: MapToFields\n", |
| " config:\n", |
| " language: python\n", |
| " append: true\n", |
| " fields:\n", |
| " random_number:\n", |
| " callable: |\n", |
| " import random\n", |
| " def my_mapping(row):\n", |
| " return random.randrange(row.age)\n", |
| " - type: LogForTesting" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "4O66q6pAdHOy", |
| "outputId": "e70a0102-8ad0-4dd5-80ed-05f4b9cbe7cf" |
| }, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "WARNING:root:crcmod package not found. This package is required if python-snappy or google-crc32c are not installed. To ensure crcmod is installed, install the tfrecord extra: pip install apache-beam[tfrecord]\n", |
| "INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.\n", |
| "Building pipeline...\n", |
| "2026-01-20 18:06:01.099794: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.\n", |
| "2026-01-20 18:06:01.107670: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.\n", |
| "2026-01-20 18:06:01.137814: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered\n", |
| "WARNING: All log messages before absl::InitializeLog() is called are written to STDERR\n", |
| "E0000 00:00:1768932361.175862 13789 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered\n", |
| "E0000 00:00:1768932361.185337 13789 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered\n", |
| "W0000 00:00:1768932361.217535 13789 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.\n", |
| "W0000 00:00:1768932361.217584 13789 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.\n", |
| "W0000 00:00:1768932361.217589 13789 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.\n", |
| "W0000 00:00:1768932361.217596 13789 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.\n", |
| "2026-01-20 18:06:01.227451: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.\n", |
| "To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.\n", |
| "WARNING:torchao.kernel.intmm:Warning: Detected no triton, on systems without Triton certain kernels will not work\n", |
| "INFO:datasets:TensorFlow version 2.19.0 available.\n", |
| "INFO:datasets:JAX version 0.7.2 available.\n", |
| "WARNING:root:Could not load ML transform module apache_beam.ml.transforms.tft: No module named 'tensorflow_transform'. Please install the necessary module dependencies\n", |
| "INFO:apache_beam.yaml.yaml_transform:Expanding \"ReadFromCsv\" at line 4 \n", |
| "WARNING:apache_beam.transforms.core:Using yield and return in the process method of <class 'apache_beam.io.fileio._ReadMatchesFn'> can lead to unexpected behavior, see:https://github.com/apache/beam/issues/22969.\n", |
| "INFO:root:Computing dataframe stage <ComputeStage(PTransform) label=[[ComputedExpression[get_column_Series_19759301598560], ComputedExpression[astype_Series_19759301595200], ComputedExpression[set_column_DataFrame_19759301607392], ComputedExpression[get_column_Series_19759301607056], ComputedExpression[astype_Series_19759301594432], ComputedExpression[set_column_DataFrame_19759301604224], ComputedExpression[get_column_Series_19759301788736], ComputedExpression[astype_Series_19759301600096], ComputedExpression[set_column_DataFrame_19759301795936]]:19759305663616]> for Stage[inputs={PlaceholderExpression[placeholder_DataFrame_19759301595872]}, partitioning=Arbitrary, ops=[ComputedExpression[get_column_Series_19759301598560], ComputedExpression[astype_Series_19759301595200], ComputedExpression[set_column_DataFrame_19759301607392], ComputedExpression[get_column_Series_19759301607056], ComputedExpression[astype_Series_19759301594432], ComputedExpression[set_column_DataFrame_19759301604224], ComputedExpression[get_column_Series_19759301788736], ComputedExpression[astype_Series_19759301600096], ComputedExpression[set_column_DataFrame_19759301795936]], outputs={PlaceholderExpression[placeholder_DataFrame_19759301595872], ComputedExpression[set_column_DataFrame_19759301795936]}]\n", |
| "INFO:apache_beam.yaml.yaml_transform:Expanding \"MapToFields\" at line 7 \n", |
| "INFO:apache_beam.yaml.yaml_transform:Expanding \"LogForTesting\" at line 17 \n", |
| "Running pipeline...\n", |
| "INFO:apache_beam.runners.direct.direct_runner:Running pipeline with PrismRunner.\n", |
| "INFO:apache_beam.runners.worker.worker_pool_main:Listening for workers at localhost:34373\n", |
| "INFO:apache_beam.runners.portability.prism_runner:Using cached prism binary/zip from /root/.apache_beam/cache/prism/bin/apache_beam-v2.70.0-prism-linux-amd64.zip for https://github.com/apache/beam/releases/download/v2.70.0/apache_beam-v2.70.0-prism-linux-amd64.zip\n", |
| "INFO:apache_beam.runners.portability.prism_runner:Using cached prism binary from /root/.apache_beam/cache/prism/bin/apache_beam-v2.70.0-prism-linux-amd64 for /root/.apache_beam/cache/prism/bin/apache_beam-v2.70.0-prism-linux-amd64.zip\n", |
| "INFO:apache_beam.runners.portability.prism_runner:Prism binary path resolved to: /root/.apache_beam/cache/prism/bin/apache_beam-v2.70.0-prism-linux-amd64\n", |
| "INFO:apache_beam.utils.subprocess_server:Starting service with ('/root/.apache_beam/cache/prism/bin/apache_beam-v2.70.0-prism-linux-amd64' '--job_port' '44267' '--log_level' 'info' '--log_kind' 'json' '--serve_http=false')\n", |
| "INFO:PrismRunner:Serving JobManagement (endpoint='localhost:44267')\n", |
| "INFO:apache_beam.runners.portability.portable_runner:Environment \"LOOPBACK\" has started a component necessary for the execution. Be sure to run the pipeline using\n", |
| " with Pipeline() as p:\n", |
| " p.apply(..)\n", |
| "This ensures that the pipeline finishes before this program exits.\n", |
| "INFO:apache_beam.runners.worker.statecache:Creating state cache with size 0\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:Creating insecure control channel for localhost:44267.\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:Control channel established.\n", |
| "INFO:apache_beam.runners.portability.portable_runner:Job state changed to STOPPED\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:Initializing SDKHarness with unbounded number of workers.\n", |
| "INFO:root:starting job-001[job]\n", |
| "INFO:root:running job-001[job]\n", |
| "INFO:apache_beam.runners.portability.portable_runner:Job state changed to RUNNING\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:Creating insecure state channel for localhost:44267.\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:State channel established.\n", |
| "INFO:apache_beam.runners.worker.data_plane:Creating client data channel for localhost:44267\n", |
| "INFO:apache_beam.runners.worker.data_plane:Data channel established.\n", |
| "INFO:root:{\"id\": 1, \"firstname\": \"Reeba\", \"age\": 58, \"country\": \"Belgium\", \"profession\": \"unemployed\", \"random_number\": 31}\n", |
| "INFO:root:{\"id\": 2, \"firstname\": \"Maud\", \"age\": 45, \"country\": \"Spain\", \"profession\": \"firefighter\", \"random_number\": 16}\n", |
| "INFO:root:{\"id\": 3, \"firstname\": \"Meg\", \"age\": 11, \"country\": \"France\", \"profession\": \"unemployed\", \"random_number\": 2}\n", |
| "INFO:root:{\"id\": 4, \"firstname\": \"Rani\", \"age\": 53, \"country\": \"Spain\", \"profession\": \"doctor\", \"random_number\": 18}\n", |
| "INFO:root:{\"id\": 5, \"firstname\": \"Natka\", \"age\": 26, \"country\": \"France\", \"profession\": \"doctor\", \"random_number\": 2}\n", |
| "INFO:root:{\"id\": 6, \"firstname\": \"Aurore\", \"age\": 32, \"country\": \"Italy\", \"profession\": \"police officer\", \"random_number\": 28}\n", |
| "INFO:root:{\"id\": 7, \"firstname\": \"Elvira\", \"age\": 39, \"country\": \"Italy\", \"profession\": \"doctor\", \"random_number\": 32}\n", |
| "INFO:root:{\"id\": 8, \"firstname\": \"Asia\", \"age\": 10, \"country\": \"Belgium\", \"profession\": \"doctor\", \"random_number\": 3}\n", |
| "INFO:root:{\"id\": 9, \"firstname\": \"Lesly\", \"age\": 35, \"country\": \"Spain\", \"profession\": \"firefighter\", \"random_number\": 27}\n", |
| "INFO:root:{\"id\": 10, \"firstname\": \"Orelia\", \"age\": 31, \"country\": \"Germany\", \"profession\": \"police officer\", \"random_number\": 22}\n", |
| "INFO:root:{\"id\": 11, \"firstname\": \"Theodora\", \"age\": 16, \"country\": \"Italy\", \"profession\": \"unemployed\", \"random_number\": 5}\n", |
| "INFO:root:{\"id\": 12, \"firstname\": \"Viviene\", \"age\": 44, \"country\": \"Germany\", \"profession\": \"police officer\", \"random_number\": 28}\n", |
| "INFO:root:{\"id\": 13, \"firstname\": \"Teriann\", \"age\": 50, \"country\": \"Belgium\", \"profession\": \"police officer\", \"random_number\": 17}\n", |
| "INFO:root:{\"id\": 14, \"firstname\": \"Carol-Jean\", \"age\": 23, \"country\": \"Germany\", \"profession\": \"unemployed\", \"random_number\": 12}\n", |
| "INFO:root:{\"id\": 15, \"firstname\": \"Drucie\", \"age\": 15, \"country\": \"Spain\", \"profession\": \"police officer\", \"random_number\": 13}\n", |
| "INFO:root:{\"id\": 16, \"firstname\": \"Elie\", \"age\": 10, \"country\": \"Italy\", \"profession\": \"doctor\", \"random_number\": 4}\n", |
| "INFO:root:{\"id\": 17, \"firstname\": \"Shaylyn\", \"age\": 34, \"country\": \"Belgium\", \"profession\": \"worker\", \"random_number\": 17}\n", |
| "INFO:root:{\"id\": 18, \"firstname\": \"Fayre\", \"age\": 33, \"country\": \"Spain\", \"profession\": \"police officer\", \"random_number\": 10}\n", |
| "INFO:root:{\"id\": 19, \"firstname\": \"Sabina\", \"age\": 52, \"country\": \"Germany\", \"profession\": \"police officer\", \"random_number\": 15}\n", |
| "INFO:root:{\"id\": 20, \"firstname\": \"Aryn\", \"age\": 20, \"country\": \"Belgium\", \"profession\": \"police officer\", \"random_number\": 15}\n", |
| "INFO:root:{\"id\": 21, \"firstname\": \"Darlleen\", \"age\": 49, \"country\": \"Spain\", \"profession\": \"worker\", \"random_number\": 30}\n", |
| "INFO:root:{\"id\": 22, \"firstname\": \"Jere\", \"age\": 18, \"country\": \"Italy\", \"profession\": \"worker\", \"random_number\": 4}\n", |
| "INFO:root:{\"id\": 23, \"firstname\": \"Candi\", \"age\": 60, \"country\": \"Germany\", \"profession\": \"police officer\", \"random_number\": 27}\n", |
| "INFO:root:{\"id\": 24, \"firstname\": \"Sindee\", \"age\": 40, \"country\": \"Germany\", \"profession\": \"firefighter\", \"random_number\": 24}\n", |
| "INFO:root:{\"id\": 25, \"firstname\": \"Selma\", \"age\": 20, \"country\": \"Spain\", \"profession\": \"worker\", \"random_number\": 5}\n", |
| "INFO:root:{\"id\": 26, \"firstname\": \"Vonny\", \"age\": 35, \"country\": \"Germany\", \"profession\": \"doctor\", \"random_number\": 21}\n", |
| "INFO:root:{\"id\": 27, \"firstname\": \"Kate\", \"age\": 53, \"country\": \"Spain\", \"profession\": \"worker\", \"random_number\": 6}\n", |
| "INFO:root:{\"id\": 28, \"firstname\": \"Annabela\", \"age\": 48, \"country\": \"Belgium\", \"profession\": \"worker\", \"random_number\": 28}\n", |
| "INFO:root:{\"id\": 29, \"firstname\": \"Jenilee\", \"age\": 55, \"country\": \"Germany\", \"profession\": \"police officer\", \"random_number\": 36}\n", |
| "INFO:root:{\"id\": 30, \"firstname\": \"Gusella\", \"age\": 44, \"country\": \"France\", \"profession\": \"police officer\", \"random_number\": 14}\n", |
| "INFO:root:{\"id\": 31, \"firstname\": \"Fawne\", \"age\": 35, \"country\": \"Spain\", \"profession\": \"worker\", \"random_number\": 18}\n", |
| "INFO:root:{\"id\": 32, \"firstname\": \"Karolina\", \"age\": 39, \"country\": \"Spain\", \"profession\": \"police officer\", \"random_number\": 18}\n", |
| "INFO:root:{\"id\": 33, \"firstname\": \"Sadie\", \"age\": 58, \"country\": \"Germany\", \"profession\": \"firefighter\", \"random_number\": 0}\n", |
| "INFO:root:{\"id\": 34, \"firstname\": \"Clo\", \"age\": 10, \"country\": \"Italy\", \"profession\": \"police officer\", \"random_number\": 1}\n", |
| "INFO:root:{\"id\": 35, \"firstname\": \"Beth\", \"age\": 46, \"country\": \"Spain\", \"profession\": \"firefighter\", \"random_number\": 38}\n", |
| "INFO:root:{\"id\": 36, \"firstname\": \"Adore\", \"age\": 18, \"country\": \"Italy\", \"profession\": \"firefighter\", \"random_number\": 4}\n", |
| "INFO:root:{\"id\": 37, \"firstname\": \"Tarra\", \"age\": 29, \"country\": \"Spain\", \"profession\": \"doctor\", \"random_number\": 18}\n", |
| "INFO:root:{\"id\": 38, \"firstname\": \"Jessamyn\", \"age\": 36, \"country\": \"France\", \"profession\": \"police officer\", \"random_number\": 10}\n", |
| "INFO:root:{\"id\": 39, \"firstname\": \"Deedee\", \"age\": 24, \"country\": \"Germany\", \"profession\": \"unemployed\", \"random_number\": 18}\n", |
| "INFO:root:{\"id\": 40, \"firstname\": \"Patricia\", \"age\": 45, \"country\": \"Italy\", \"profession\": \"doctor\", \"random_number\": 41}\n", |
| "INFO:root:{\"id\": 41, \"firstname\": \"Wileen\", \"age\": 39, \"country\": \"Spain\", \"profession\": \"police officer\", \"random_number\": 19}\n", |
| "INFO:root:{\"id\": 42, \"firstname\": \"Paola\", \"age\": 55, \"country\": \"Italy\", \"profession\": \"worker\", \"random_number\": 19}\n", |
| "INFO:root:{\"id\": 43, \"firstname\": \"Gwyneth\", \"age\": 37, \"country\": \"Italy\", \"profession\": \"worker\", \"random_number\": 33}\n", |
| "INFO:root:{\"id\": 44, \"firstname\": \"Stacey\", \"age\": 36, \"country\": \"Spain\", \"profession\": \"worker\", \"random_number\": 11}\n", |
| "INFO:root:{\"id\": 45, \"firstname\": \"Camile\", \"age\": 60, \"country\": \"Germany\", \"profession\": \"unemployed\", \"random_number\": 16}\n", |
| "INFO:root:{\"id\": 46, \"firstname\": \"Sheree\", \"age\": 10, \"country\": \"Spain\", \"profession\": \"unemployed\", \"random_number\": 2}\n", |
| "INFO:root:{\"id\": 47, \"firstname\": \"Albertina\", \"age\": 53, \"country\": \"France\", \"profession\": \"police officer\", \"random_number\": 43}\n", |
| "INFO:root:{\"id\": 48, \"firstname\": \"Jinny\", \"age\": 30, \"country\": \"Spain\", \"profession\": \"firefighter\", \"random_number\": 12}\n", |
| "INFO:root:{\"id\": 49, \"firstname\": \"Kayla\", \"age\": 57, \"country\": \"Italy\", \"profession\": \"firefighter\", \"random_number\": 41}\n", |
| "INFO:root:{\"id\": 50, \"firstname\": \"Jaime\", \"age\": 55, \"country\": \"France\", \"profession\": \"doctor\", \"random_number\": 46}\n", |
| "INFO:PrismRunner:pipeline done! (job='job-001[job]')\n", |
| "INFO:root:pipeline completed job-001[job]\n", |
| "INFO:root:terminating job-001[job]\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:No more requests from control plane\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:SDK Harness waiting for in-flight requests to complete\n", |
| "INFO:apache_beam.runners.worker.data_plane:Closing all cached grpc data channels.\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:Closing all cached gRPC state handlers.\n", |
| "INFO:apache_beam.runners.portability.portable_runner:Job state changed to DONE\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:Done consuming work.\n" |
| ] |
| } |
| ], |
| "source": [ |
| "! python -m apache_beam.yaml.main --pipeline_spec_file=pipelines/pipeline-map-04.yaml" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "894BDApDdHOy" |
| }, |
| "source": [ |
| "Beam YAML has the ability to do aggregations to group and combine values across records. The is accomplished via the `Combine` transform type.\n", |
| "\n", |
| "In this example we'll aggregate our records based on the `is_adult` classification. We'll calculate an average age for each of the groups." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "6R02-hrPdHOy", |
| "outputId": "ab08a334-84cb-407c-e652-9bed6d2f6f50", |
| "scrolled": true |
| }, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "Writing pipelines/pipeline-combine-01.yaml\n" |
| ] |
| } |
| ], |
| "source": [ |
| "%%writefile 'pipelines/pipeline-combine-01.yaml'\n", |
| "pipeline:\n", |
| " type: chain\n", |
| " transforms:\n", |
| " - type: ReadFromCsv\n", |
| " config:\n", |
| " path: data/people.csv\n", |
| " - type: MapToFields\n", |
| " config:\n", |
| " language: python\n", |
| " append: true\n", |
| " fields:\n", |
| " is_adult: \"age >= 18\"\n", |
| " - type: Combine\n", |
| " config:\n", |
| " group_by: is_adult\n", |
| " combine:\n", |
| " total:\n", |
| " value: age\n", |
| " fn: mean\n", |
| " - type: LogForTesting" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "FpGLIyn3dHOy", |
| "outputId": "d274eae5-a1f0-4cf9-b41f-0a18c304277f" |
| }, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "WARNING:root:crcmod package not found. This package is required if python-snappy or google-crc32c are not installed. To ensure crcmod is installed, install the tfrecord extra: pip install apache-beam[tfrecord]\n", |
| "INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.\n", |
| "Building pipeline...\n", |
| "2026-01-20 18:06:54.386725: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.\n", |
| "2026-01-20 18:06:54.391871: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.\n", |
| "2026-01-20 18:06:54.407244: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered\n", |
| "WARNING: All log messages before absl::InitializeLog() is called are written to STDERR\n", |
| "E0000 00:00:1768932414.430450 14062 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered\n", |
| "E0000 00:00:1768932414.437055 14062 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered\n", |
| "W0000 00:00:1768932414.453914 14062 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.\n", |
| "W0000 00:00:1768932414.453958 14062 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.\n", |
| "W0000 00:00:1768932414.453962 14062 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.\n", |
| "W0000 00:00:1768932414.453968 14062 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.\n", |
| "2026-01-20 18:06:54.459125: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.\n", |
| "To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.\n", |
| "WARNING:torchao.kernel.intmm:Warning: Detected no triton, on systems without Triton certain kernels will not work\n", |
| "INFO:datasets:TensorFlow version 2.19.0 available.\n", |
| "INFO:datasets:JAX version 0.7.2 available.\n", |
| "WARNING:root:Could not load ML transform module apache_beam.ml.transforms.tft: No module named 'tensorflow_transform'. Please install the necessary module dependencies\n", |
| "INFO:apache_beam.yaml.yaml_transform:Expanding \"ReadFromCsv\" at line 4 \n", |
| "WARNING:apache_beam.transforms.core:Using yield and return in the process method of <class 'apache_beam.io.fileio._ReadMatchesFn'> can lead to unexpected behavior, see:https://github.com/apache/beam/issues/22969.\n", |
| "INFO:root:Computing dataframe stage <ComputeStage(PTransform) label=[[ComputedExpression[get_column_Series_14728419535968], ComputedExpression[astype_Series_14728419529008], ComputedExpression[set_column_DataFrame_14728421191808], ComputedExpression[get_column_Series_14728421190608], ComputedExpression[astype_Series_14728421188832], ComputedExpression[set_column_DataFrame_14728421189408], ComputedExpression[get_column_Series_14728421191088], ComputedExpression[astype_Series_14728421180000], ComputedExpression[set_column_DataFrame_14728421191712]]:14728421307264]> for Stage[inputs={PlaceholderExpression[placeholder_DataFrame_14728419535920]}, partitioning=Arbitrary, ops=[ComputedExpression[get_column_Series_14728419535968], ComputedExpression[astype_Series_14728419529008], ComputedExpression[set_column_DataFrame_14728421191808], ComputedExpression[get_column_Series_14728421190608], ComputedExpression[astype_Series_14728421188832], ComputedExpression[set_column_DataFrame_14728421189408], ComputedExpression[get_column_Series_14728421191088], ComputedExpression[astype_Series_14728421180000], ComputedExpression[set_column_DataFrame_14728421191712]], outputs={ComputedExpression[set_column_DataFrame_14728421191712], PlaceholderExpression[placeholder_DataFrame_14728419535920]}]\n", |
| "INFO:apache_beam.yaml.yaml_transform:Expanding \"MapToFields\" at line 7 \n", |
| "INFO:apache_beam.yaml.yaml_transform:Expanding \"Combine\" at line 13 \n", |
| "INFO:apache_beam.yaml.yaml_transform:Expanding \"LogForTesting\" at line 20 \n", |
| "Running pipeline...\n", |
| "INFO:apache_beam.runners.direct.direct_runner:Running pipeline with PrismRunner.\n", |
| "INFO:apache_beam.runners.worker.worker_pool_main:Listening for workers at localhost:33183\n", |
| "INFO:apache_beam.runners.portability.prism_runner:Using cached prism binary/zip from /root/.apache_beam/cache/prism/bin/apache_beam-v2.70.0-prism-linux-amd64.zip for https://github.com/apache/beam/releases/download/v2.70.0/apache_beam-v2.70.0-prism-linux-amd64.zip\n", |
| "INFO:apache_beam.runners.portability.prism_runner:Using cached prism binary from /root/.apache_beam/cache/prism/bin/apache_beam-v2.70.0-prism-linux-amd64 for /root/.apache_beam/cache/prism/bin/apache_beam-v2.70.0-prism-linux-amd64.zip\n", |
| "INFO:apache_beam.runners.portability.prism_runner:Prism binary path resolved to: /root/.apache_beam/cache/prism/bin/apache_beam-v2.70.0-prism-linux-amd64\n", |
| "INFO:apache_beam.utils.subprocess_server:Starting service with ('/root/.apache_beam/cache/prism/bin/apache_beam-v2.70.0-prism-linux-amd64' '--job_port' '43631' '--log_level' 'info' '--log_kind' 'json' '--serve_http=false')\n", |
| "INFO:PrismRunner:Serving JobManagement (endpoint='localhost:43631')\n", |
| "INFO:apache_beam.runners.portability.portable_runner:Environment \"LOOPBACK\" has started a component necessary for the execution. Be sure to run the pipeline using\n", |
| " with Pipeline() as p:\n", |
| " p.apply(..)\n", |
| "This ensures that the pipeline finishes before this program exits.\n", |
| "INFO:apache_beam.runners.portability.portable_runner:Job state changed to STOPPED\n", |
| "INFO:root:starting job-001[job]\n", |
| "INFO:apache_beam.runners.worker.statecache:Creating state cache with size 0\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:Creating insecure control channel for localhost:43631.\n", |
| "INFO:root:running job-001[job]\n", |
| "INFO:apache_beam.runners.portability.portable_runner:Job state changed to RUNNING\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:Control channel established.\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:Initializing SDKHarness with unbounded number of workers.\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:Creating insecure state channel for localhost:43631.\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:State channel established.\n", |
| "INFO:apache_beam.runners.worker.data_plane:Creating client data channel for localhost:43631\n", |
| "INFO:apache_beam.runners.worker.data_plane:Data channel established.\n", |
| "INFO:root:{\"is_adult\": false, \"total\": 11.714285714285714}\n", |
| "INFO:root:{\"is_adult\": true, \"total\": 40.674418604651166}\n", |
| "INFO:PrismRunner:pipeline done! (job='job-001[job]')\n", |
| "INFO:root:pipeline completed job-001[job]\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:No more requests from control plane\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:SDK Harness waiting for in-flight requests to complete\n", |
| "INFO:apache_beam.runners.worker.data_plane:Closing all cached grpc data channels.\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:Closing all cached gRPC state handlers.\n", |
| "INFO:root:terminating job-001[job]\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:Done consuming work.\n", |
| "INFO:apache_beam.runners.portability.portable_runner:Job state changed to DONE\n" |
| ] |
| } |
| ], |
| "source": [ |
| "! python -m apache_beam.yaml.main --pipeline_spec_file=pipelines/pipeline-combine-01.yaml" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "sqSHf4YmdHOz" |
| }, |
| "source": [ |
| "If all was executed correctly, you should see the following lines at the bottom of the output log:\n", |
| "```\n", |
| "INFO:root:Result(is_adult=True, total=40.674418604651166)\n", |
| "INFO:root:Result(is_adult=False, total=11.714285714285714\n", |
| "```" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "eClN1vSZdHOz" |
| }, |
| "source": [ |
| "All the previous pipelines were linear - output of one transform was an input to the next transform. This is also known as a `chain` pipeline. This is designated in the top-level pipeline configuration, for example:\n", |
| "```\n", |
| "pipeline:\n", |
| " type: chain\n", |
| " transforms:\n", |
| " ...\n", |
| "```\n", |
| "In YAML we can also create nonlinear pipelines. To do this, we can specify `type: composite`, or omit this line completely (this is the default pipeline type). In these pipelines, we must specify the `input` in each of the transforms that take the output of previous transforms. This `input` is the name, or collection of names, of the transform(s) that feed into the receiving transform.\n", |
| "The specification below will create the following pipeline:\n", |
| "```\n", |
| " +----> Doctors -----------> SaveDoctors\n", |
| "InputData ---+\n", |
| " +----> OtherProfessions --> SaveOtherProfessions\n", |
| "```" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "aoz7gWDydHO1", |
| "outputId": "20483d96-c8a1-4ab7-a478-6646dd7082c9", |
| "scrolled": true |
| }, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "Overwriting pipelines/pipeline-nonlinear-01.yaml\n" |
| ] |
| } |
| ], |
| "source": [ |
| "%%writefile pipelines/pipeline-nonlinear-01.yaml\n", |
| "pipeline:\n", |
| " type: composite\n", |
| " transforms:\n", |
| " - type: ReadFromCsv\n", |
| " name: InputData\n", |
| " config:\n", |
| " path: data/people.csv\n", |
| " - type: Filter\n", |
| " name: Doctors\n", |
| " input: InputData\n", |
| " config:\n", |
| " language: python\n", |
| " keep: \"profession == 'doctor'\"\n", |
| " - type: Filter\n", |
| " name: OtherProfessions\n", |
| " input: InputData\n", |
| " config:\n", |
| " language: python\n", |
| " keep: \"profession != 'doctor'\"\n", |
| " - type: WriteToCsv\n", |
| " name: SaveDoctors\n", |
| " input: Doctors\n", |
| " config:\n", |
| " path: results/doctors\n", |
| " - type: WriteToCsv\n", |
| " name: SaveOtherProfessions\n", |
| " input: OtherProfessions\n", |
| " config:\n", |
| " path: results/other-professions" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "xY14uSlPdHO1", |
| "outputId": "00feb09e-7a9f-433f-ac8d-f8ba853d4b2d" |
| }, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "WARNING:root:crcmod package not found. This package is required if python-snappy or google-crc32c are not installed. To ensure crcmod is installed, install the tfrecord extra: pip install apache-beam[tfrecord]\n", |
| "WARNING:apache_beam.options.pipeline_options:Bucket gs://derrickaw-test/temp used as temp_location has soft-delete policy enabled. To avoid being billed for unnecessary storage costs, turn off the soft delete feature on buckets that your Dataflow jobs use for temporary and staging storage. For more information, see https://cloud.google.com/storage/docs/use-soft-delete#remove-soft-delete-policy.\n", |
| "Building pipeline...\n", |
| "2026-01-20 20:26:42.923901: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.\n", |
| "2026-01-20 20:26:42.931428: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.\n", |
| "2026-01-20 20:26:42.945437: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered\n", |
| "WARNING: All log messages before absl::InitializeLog() is called are written to STDERR\n", |
| "E0000 00:00:1768940802.970388 49594 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered\n", |
| "E0000 00:00:1768940802.977827 49594 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered\n", |
| "W0000 00:00:1768940802.996256 49594 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.\n", |
| "W0000 00:00:1768940802.996307 49594 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.\n", |
| "W0000 00:00:1768940802.996312 49594 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.\n", |
| "W0000 00:00:1768940802.996318 49594 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.\n", |
| "2026-01-20 20:26:43.002370: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.\n", |
| "To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.\n", |
| "WARNING:torchao.kernel.intmm:Warning: Detected no triton, on systems without Triton certain kernels will not work\n", |
| "INFO:datasets:TensorFlow version 2.19.0 available.\n", |
| "INFO:datasets:JAX version 0.7.2 available.\n", |
| "/usr/local/lib/python3.12/dist-packages/google/cloud/aiplatform/models.py:52: FutureWarning: Support for google-cloud-storage < 3.0.0 will be removed in a future version of google-cloud-aiplatform. Please upgrade to google-cloud-storage >= 3.0.0.\n", |
| " from google.cloud.aiplatform.utils import gcs_utils\n", |
| "WARNING:root:Could not load ML transform module apache_beam.ml.transforms.tft: No module named 'tensorflow_transform'. Please install the necessary module dependencies\n", |
| "INFO:apache_beam.yaml.yaml_transform:Expanding \"InputData\" at line 4 \n", |
| "INFO:root:Computing dataframe stage <ComputeStage(PTransform) label=[[ComputedExpression[get_column_Series_136694749903424], ComputedExpression[astype_Series_136694749895312], ComputedExpression[set_column_DataFrame_136694749904144], ComputedExpression[get_column_Series_136694749894496], ComputedExpression[astype_Series_136694749890320], ComputedExpression[set_column_DataFrame_136694749892384], ComputedExpression[get_column_Series_136694749895936], ComputedExpression[astype_Series_136694749891184], ComputedExpression[set_column_DataFrame_136694749896944]]:136694749484320]> for Stage[inputs={PlaceholderExpression[placeholder_DataFrame_136694749894448]}, partitioning=Arbitrary, ops=[ComputedExpression[get_column_Series_136694749903424], ComputedExpression[astype_Series_136694749895312], ComputedExpression[set_column_DataFrame_136694749904144], ComputedExpression[get_column_Series_136694749894496], ComputedExpression[astype_Series_136694749890320], ComputedExpression[set_column_DataFrame_136694749892384], ComputedExpression[get_column_Series_136694749895936], ComputedExpression[astype_Series_136694749891184], ComputedExpression[set_column_DataFrame_136694749896944]], outputs={PlaceholderExpression[placeholder_DataFrame_136694749894448], ComputedExpression[set_column_DataFrame_136694749896944]}]\n", |
| "INFO:apache_beam.yaml.yaml_transform:Expanding \"Doctors\" at line 8 \n", |
| "INFO:apache_beam.yaml.yaml_transform:Expanding \"OtherProfessions\" at line 14 \n", |
| "INFO:apache_beam.yaml.yaml_transform:Expanding \"SaveDoctors\" at line 20 \n", |
| "INFO:apache_beam.io.fileio:Added temporary directory gs://derrickaw-test/temp/.temp68b6e7b8-d318-4c87-a1eb-5631930fc0d9\n", |
| "INFO:apache_beam.yaml.yaml_transform:Expanding \"SaveOtherProfessions\" at line 25 \n", |
| "INFO:apache_beam.io.fileio:Added temporary directory gs://derrickaw-test/temp/.temp107bfbe9-d493-4b7b-a56d-53976cea6a11\n", |
| "Running pipeline...\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:Pipeline has additional dependencies to be installed in SDK worker container, consider using the SDK container image pre-building workflow to avoid repetitive installations. Learn more on https://cloud.google.com/dataflow/docs/guides/using-custom-containers#prebuild\n", |
| "/usr/local/lib/python3.12/dist-packages/apache_beam/runners/dataflow/internal/apiclient.py:1170: UserWarning: A non-standard version of Beam SDK detected: 2.71.0rc3. Dataflow runner will use container image tag 2.71.0. This use case is not supported.\n", |
| " warnings.warn(\n", |
| "/usr/local/lib/python3.12/dist-packages/apache_beam/runners/dataflow/internal/apiclient.py:1170: UserWarning: A non-standard version of Beam SDK detected: 2.71.0rc3. Dataflow runner will use container image tag 2.71.0. This use case is not supported.\n", |
| " warnings.warn(\n", |
| "INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://derrickaw-test/temp/beamapp-root-0120202703-720437-ibvhaw6u.1768940823.720644/pickled_main_session...\n", |
| "INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://derrickaw-test/temp/beamapp-root-0120202703-720437-ibvhaw6u.1768940823.720644/pickled_main_session in 0 seconds.\n", |
| "INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://derrickaw-test/temp/beamapp-root-0120202703-720437-ibvhaw6u.1768940823.720644/submission_environment_dependencies.txt...\n", |
| "INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://derrickaw-test/temp/beamapp-root-0120202703-720437-ibvhaw6u.1768940823.720644/submission_environment_dependencies.txt in 0 seconds.\n", |
| "INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://derrickaw-test/temp/beamapp-root-0120202703-720437-ibvhaw6u.1768940823.720644/pipeline.pb...\n", |
| "INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://derrickaw-test/temp/beamapp-root-0120202703-720437-ibvhaw6u.1768940823.720644/pipeline.pb in 0 seconds.\n", |
| "INFO:apache_beam.runners.dataflow.internal.apiclient:Create job: <Job\n", |
| " clientRequestId: '20260120202703721537-2025'\n", |
| " createTime: '2026-01-20T20:27:05.394578Z'\n", |
| " currentStateTime: '1970-01-01T00:00:00Z'\n", |
| " id: '2026-01-20_12_27_04-4951132020218181608'\n", |
| " location: 'us-central1'\n", |
| " name: 'beamapp-root-0120202703-720437-ibvhaw6u'\n", |
| " projectId: 'google.com:clouddfe'\n", |
| " stageStates: []\n", |
| " startTime: '2026-01-20T20:27:05.394578Z'\n", |
| " steps: []\n", |
| " tempFiles: []\n", |
| " type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)>\n", |
| "INFO:apache_beam.runners.dataflow.internal.apiclient:Created job with id: [2026-01-20_12_27_04-4951132020218181608]\n", |
| "INFO:apache_beam.runners.dataflow.internal.apiclient:Submitted job: 2026-01-20_12_27_04-4951132020218181608\n", |
| "INFO:apache_beam.runners.dataflow.internal.apiclient:To access the Dataflow monitoring console, please navigate to https://console.cloud.google.com/dataflow/jobs/us-central1/2026-01-20_12_27_04-4951132020218181608?project=google.com:clouddfe\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:Job 2026-01-20_12_27_04-4951132020218181608 is in state JOB_STATE_PENDING\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:27:09.900Z: JOB_MESSAGE_BASIC: Worker configuration: e2-standard-2 in us-central1-a.\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:27:12.703Z: JOB_MESSAGE_BASIC: Executing operation InputData/_ReadFromPandas/Reshuffle/ReshufflePerKey/GroupByKey/Create\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:27:12.726Z: JOB_MESSAGE_BASIC: Executing operation SaveDoctors/WriteToPandas(df) - results/doctors/WriteToFiles/GroupRecordsByDestinationAndShard/Create\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:27:12.754Z: JOB_MESSAGE_BASIC: Executing operation SaveDoctors/WriteToPandas(df) - results/doctors/WriteToFiles/GroupTempFilesByDestination/Create\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:27:12.766Z: JOB_MESSAGE_BASIC: Starting 1 workers in us-central1-a...\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:27:12.780Z: JOB_MESSAGE_BASIC: Executing operation SaveOtherProfessions/WriteToPandas(df) - results/other-professions/WriteToFiles/GroupRecordsByDestinationAndShard/Create\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:27:12.804Z: JOB_MESSAGE_BASIC: Executing operation SaveOtherProfessions/WriteToPandas(df) - results/other-professions/WriteToFiles/GroupTempFilesByDestination/Create\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:27:12.894Z: JOB_MESSAGE_BASIC: Finished operation InputData/_ReadFromPandas/Reshuffle/ReshufflePerKey/GroupByKey/Create\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:27:12.998Z: JOB_MESSAGE_BASIC: Executing operation InputData/_ReadFromPandas/Create/Impulse+InputData/_ReadFromPandas/Create/FlatMap(<lambda at core.py:4094>)+InputData/_ReadFromPandas/Create/Map(decode)+InputData/_ReadFromPandas/MatchAll/ParDo(_MatchAllFn)+InputData/_ReadFromPandas/Map(<lambda at io.py:306>)+InputData/_ReadFromPandas/Reshuffle/AddRandomKeys+InputData/_ReadFromPandas/Reshuffle/ReshufflePerKey/Map(reify_metadata_default_window)+InputData/_ReadFromPandas/Reshuffle/ReshufflePerKey/GroupByKey/Reify+InputData/_ReadFromPandas/Reshuffle/ReshufflePerKey/GroupByKey/Write\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:Job 2026-01-20_12_27_04-4951132020218181608 is in state JOB_STATE_RUNNING\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:27:21.911Z: JOB_MESSAGE_BASIC: Finished operation SaveDoctors/WriteToPandas(df) - results/doctors/WriteToFiles/GroupRecordsByDestinationAndShard/Create\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:27:21.911Z: JOB_MESSAGE_BASIC: Finished operation SaveDoctors/WriteToPandas(df) - results/doctors/WriteToFiles/GroupTempFilesByDestination/Create\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:27:21.911Z: JOB_MESSAGE_BASIC: Finished operation SaveOtherProfessions/WriteToPandas(df) - results/other-professions/WriteToFiles/GroupTempFilesByDestination/Create\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:27:21.912Z: JOB_MESSAGE_BASIC: Finished operation SaveOtherProfessions/WriteToPandas(df) - results/other-professions/WriteToFiles/GroupRecordsByDestinationAndShard/Create\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:27:46.574Z: JOB_MESSAGE_BASIC: Your project already contains 100 Dataflow-created metric descriptors, so new user metrics of the form custom.googleapis.com/* will not be created. However, all user metrics are also available in the metric dataflow.googleapis.com/job/user_counter. If you rely on the custom metrics, you can delete old / unused metric descriptors. See https://developers.google.com/apis-explorer/#p/monitoring/v3/monitoring.projects.metricDescriptors.list and https://developers.google.com/apis-explorer/#p/monitoring/v3/monitoring.projects.metricDescriptors.delete\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:30:22.558Z: JOB_MESSAGE_BASIC: All workers have finished the startup processes and began to receive work requests.\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:30:23.770Z: JOB_MESSAGE_BASIC: Finished operation InputData/_ReadFromPandas/Create/Impulse+InputData/_ReadFromPandas/Create/FlatMap(<lambda at core.py:4094>)+InputData/_ReadFromPandas/Create/Map(decode)+InputData/_ReadFromPandas/MatchAll/ParDo(_MatchAllFn)+InputData/_ReadFromPandas/Map(<lambda at io.py:306>)+InputData/_ReadFromPandas/Reshuffle/AddRandomKeys+InputData/_ReadFromPandas/Reshuffle/ReshufflePerKey/Map(reify_metadata_default_window)+InputData/_ReadFromPandas/Reshuffle/ReshufflePerKey/GroupByKey/Reify+InputData/_ReadFromPandas/Reshuffle/ReshufflePerKey/GroupByKey/Write\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:30:23.841Z: JOB_MESSAGE_BASIC: Executing operation InputData/_ReadFromPandas/Reshuffle/ReshufflePerKey/GroupByKey/Close\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:30:23.872Z: JOB_MESSAGE_BASIC: Executing operation InputData/_ReadFromPandas/Map(<lambda at io.py:301>)/View-python_side_input0\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:30:23.885Z: JOB_MESSAGE_BASIC: Finished operation InputData/_ReadFromPandas/Reshuffle/ReshufflePerKey/GroupByKey/Close\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:30:23.941Z: JOB_MESSAGE_BASIC: Finished operation InputData/_ReadFromPandas/Map(<lambda at io.py:301>)/View-python_side_input0\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:30:24.047Z: JOB_MESSAGE_BASIC: Executing operation InputData/_ReadFromPandas/DoOnce/Impulse+InputData/_ReadFromPandas/DoOnce/FlatMap(<lambda at core.py:4094>)+InputData/_ReadFromPandas/DoOnce/Map(decode)+InputData/_ReadFromPandas/Map(<lambda at io.py:301>)\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:30:26.041Z: JOB_MESSAGE_BASIC: Finished operation InputData/_ReadFromPandas/DoOnce/Impulse+InputData/_ReadFromPandas/DoOnce/FlatMap(<lambda at core.py:4094>)+InputData/_ReadFromPandas/DoOnce/Map(decode)+InputData/_ReadFromPandas/Map(<lambda at io.py:301>)\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:30:26.141Z: JOB_MESSAGE_BASIC: Executing operation InputData/_ReadFromPandas/ParDo(_ReadFromPandasDoFn)/View-python_side_input0\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:30:26.198Z: JOB_MESSAGE_BASIC: Finished operation InputData/_ReadFromPandas/ParDo(_ReadFromPandasDoFn)/View-python_side_input0\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:30:26.307Z: JOB_MESSAGE_BASIC: Executing operation InputData/_ReadFromPandas/Reshuffle/ReshufflePerKey/GroupByKey/Read+InputData/_ReadFromPandas/Reshuffle/ReshufflePerKey/GroupByKey/GroupByWindow+InputData/_ReadFromPandas/Reshuffle/ReshufflePerKey/FlatMap(restore_metadata_default_window)+InputData/_ReadFromPandas/Reshuffle/RemoveRandomKeys+InputData/_ReadFromPandas/ReadMatches/ParDo(_ReadMatchesFn)+InputData/_ReadFromPandas/ParDo(_ReadFromPandasDoFn)/PairWithRestriction+InputData/_ReadFromPandas/ParDo(_ReadFromPandasDoFn)/SplitWithSizing\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:30:28.077Z: JOB_MESSAGE_BASIC: Finished operation InputData/_ReadFromPandas/Reshuffle/ReshufflePerKey/GroupByKey/Read+InputData/_ReadFromPandas/Reshuffle/ReshufflePerKey/GroupByKey/GroupByWindow+InputData/_ReadFromPandas/Reshuffle/ReshufflePerKey/FlatMap(restore_metadata_default_window)+InputData/_ReadFromPandas/Reshuffle/RemoveRandomKeys+InputData/_ReadFromPandas/ReadMatches/ParDo(_ReadMatchesFn)+InputData/_ReadFromPandas/ParDo(_ReadFromPandasDoFn)/PairWithRestriction+InputData/_ReadFromPandas/ParDo(_ReadFromPandasDoFn)/SplitWithSizing\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:30:28.172Z: JOB_MESSAGE_BASIC: Executing operation InputData/_ReadFromPandas/ParDo(_ReadFromPandasDoFn)/ProcessElementAndRestrictionWithSizing+InputData/ToPCollection(df)/[ComputedExpression[get_column_Series_136694749903424], ComputedExpression[astype_Series_136694749895312], ComputedExpression[set_column_DataFrame_136694749904144], ComputedExpression[get_column_Series_136694749894496], ComputedExpression[astype_Series_136694749890320], ComputedExpression[set_column_DataFrame_136694749892384], ComputedExpression[get_column_Series_136694749895936], ComputedExpression[astype_Series_136694749891184], ComputedExpression[set_column_DataFrame_136694749896944]]:136694749484320/Map(<lambda at transforms.py:239>)+InputData/ToPCollection(df)/[ComputedExpression[get_column_Series_136694749903424], ComputedExpression[astype_Series_136694749895312], ComputedExpression[set_column_DataFrame_136694749904144], ComputedExpression[get_column_Series_136694749894496], ComputedExpression[astype_Series_136694749890320], ComputedExpression[set_column_DataFrame_136694749892384], ComputedExpression[get_column_Series_136694749895936], ComputedExpression[astype_Series_136694749891184], ComputedExpression[set_column_DataFrame_136694749896944]]:136694749484320/FlatMap(evaluate)/FlatMap(evaluate)+InputData/Unbatch 'set_column_DataFrame_136694749896944'+Doctors/Filter(fn)+OtherProfessions/Filter(fn)+SaveDoctors/BatchElements(pcoll)+SaveOtherProfessions/BatchElements(pcoll)+SaveDoctors/WriteToPandas(df) - results/doctors/WriteToFiles/ParDo(_WriteUnshardedRecordsFn)/ParDo(_WriteUnshardedRecordsFn)+SaveDoctors/WriteToPandas(df) - results/doctors/WriteToFiles/Map(<lambda at fileio.py:633>)+SaveDoctors/WriteToPandas(df) - results/doctors/WriteToFiles/GroupTempFilesByDestination/Write+SaveDoctors/WriteToPandas(df) - results/doctors/WriteToFiles/ParDo(_AppendShardedDestination)+SaveDoctors/WriteToPandas(df) - results/doctors/WriteToFiles/GroupRecordsByDestinationAndShard/Write+SaveOtherProfessions/WriteToPandas(df) - results/other-professions/WriteToFiles/ParDo(_WriteUnshardedRecordsFn)/ParDo(_WriteUnshardedRecordsFn)+SaveOtherProfessions/WriteToPandas(df) - results/other-professions/WriteToFiles/Map(<lambda at fileio.py:633>)+SaveOtherProfessions/WriteToPandas(df) - results/other-professions/WriteToFiles/GroupTempFilesByDestination/Write+SaveOtherProfessions/WriteToPandas(df) - results/other-professions/WriteToFiles/ParDo(_AppendShardedDestination)+SaveOtherProfessions/WriteToPandas(df) - results/other-professions/WriteToFiles/GroupRecordsByDestinationAndShard/Write\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:30:28.837Z: JOB_MESSAGE_BASIC: Finished operation InputData/_ReadFromPandas/ParDo(_ReadFromPandasDoFn)/ProcessElementAndRestrictionWithSizing+InputData/ToPCollection(df)/[ComputedExpression[get_column_Series_136694749903424], ComputedExpression[astype_Series_136694749895312], ComputedExpression[set_column_DataFrame_136694749904144], ComputedExpression[get_column_Series_136694749894496], ComputedExpression[astype_Series_136694749890320], ComputedExpression[set_column_DataFrame_136694749892384], ComputedExpression[get_column_Series_136694749895936], ComputedExpression[astype_Series_136694749891184], ComputedExpression[set_column_DataFrame_136694749896944]]:136694749484320/Map(<lambda at transforms.py:239>)+InputData/ToPCollection(df)/[ComputedExpression[get_column_Series_136694749903424], ComputedExpression[astype_Series_136694749895312], ComputedExpression[set_column_DataFrame_136694749904144], ComputedExpression[get_column_Series_136694749894496], ComputedExpression[astype_Series_136694749890320], ComputedExpression[set_column_DataFrame_136694749892384], ComputedExpression[get_column_Series_136694749895936], ComputedExpression[astype_Series_136694749891184], ComputedExpression[set_column_DataFrame_136694749896944]]:136694749484320/FlatMap(evaluate)/FlatMap(evaluate)+InputData/Unbatch 'set_column_DataFrame_136694749896944'+Doctors/Filter(fn)+OtherProfessions/Filter(fn)+SaveDoctors/BatchElements(pcoll)+SaveOtherProfessions/BatchElements(pcoll)+SaveDoctors/WriteToPandas(df) - results/doctors/WriteToFiles/ParDo(_WriteUnshardedRecordsFn)/ParDo(_WriteUnshardedRecordsFn)+SaveDoctors/WriteToPandas(df) - results/doctors/WriteToFiles/Map(<lambda at fileio.py:633>)+SaveDoctors/WriteToPandas(df) - results/doctors/WriteToFiles/GroupTempFilesByDestination/Write+SaveDoctors/WriteToPandas(df) - results/doctors/WriteToFiles/ParDo(_AppendShardedDestination)+SaveDoctors/WriteToPandas(df) - results/doctors/WriteToFiles/GroupRecordsByDestinationAndShard/Write+SaveOtherProfessions/WriteToPandas(df) - results/other-professions/WriteToFiles/ParDo(_WriteUnshardedRecordsFn)/ParDo(_WriteUnshardedRecordsFn)+SaveOtherProfessions/WriteToPandas(df) - results/other-professions/WriteToFiles/Map(<lambda at fileio.py:633>)+SaveOtherProfessions/WriteToPandas(df) - results/other-professions/WriteToFiles/GroupTempFilesByDestination/Write+SaveOtherProfessions/WriteToPandas(df) - results/other-professions/WriteToFiles/ParDo(_AppendShardedDestination)+SaveOtherProfessions/WriteToPandas(df) - results/other-professions/WriteToFiles/GroupRecordsByDestinationAndShard/Write\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:30:28.891Z: JOB_MESSAGE_BASIC: Executing operation SaveDoctors/WriteToPandas(df) - results/doctors/WriteToFiles/GroupRecordsByDestinationAndShard/Close\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:30:28.916Z: JOB_MESSAGE_BASIC: Executing operation SaveOtherProfessions/WriteToPandas(df) - results/other-professions/WriteToFiles/GroupRecordsByDestinationAndShard/Close\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:30:28.936Z: JOB_MESSAGE_BASIC: Finished operation SaveDoctors/WriteToPandas(df) - results/doctors/WriteToFiles/GroupRecordsByDestinationAndShard/Close\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:30:28.960Z: JOB_MESSAGE_BASIC: Finished operation SaveOtherProfessions/WriteToPandas(df) - results/other-professions/WriteToFiles/GroupRecordsByDestinationAndShard/Close\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:30:28.990Z: JOB_MESSAGE_BASIC: Executing operation SaveDoctors/WriteToPandas(df) - results/doctors/WriteToFiles/GroupRecordsByDestinationAndShard/Read+SaveDoctors/WriteToPandas(df) - results/doctors/WriteToFiles/ParDo(_WriteShardedRecordsFn)+SaveDoctors/WriteToPandas(df) - results/doctors/WriteToFiles/Map(<lambda at fileio.py:633>)+SaveDoctors/WriteToPandas(df) - results/doctors/WriteToFiles/GroupTempFilesByDestination/Write\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:30:29.018Z: JOB_MESSAGE_BASIC: Executing operation SaveOtherProfessions/WriteToPandas(df) - results/other-professions/WriteToFiles/GroupRecordsByDestinationAndShard/Read+SaveOtherProfessions/WriteToPandas(df) - results/other-professions/WriteToFiles/ParDo(_WriteShardedRecordsFn)+SaveOtherProfessions/WriteToPandas(df) - results/other-professions/WriteToFiles/Map(<lambda at fileio.py:633>)+SaveOtherProfessions/WriteToPandas(df) - results/other-professions/WriteToFiles/GroupTempFilesByDestination/Write\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:30:29.196Z: JOB_MESSAGE_BASIC: Finished operation SaveDoctors/WriteToPandas(df) - results/doctors/WriteToFiles/GroupRecordsByDestinationAndShard/Read+SaveDoctors/WriteToPandas(df) - results/doctors/WriteToFiles/ParDo(_WriteShardedRecordsFn)+SaveDoctors/WriteToPandas(df) - results/doctors/WriteToFiles/Map(<lambda at fileio.py:633>)+SaveDoctors/WriteToPandas(df) - results/doctors/WriteToFiles/GroupTempFilesByDestination/Write\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:30:29.219Z: JOB_MESSAGE_BASIC: Finished operation SaveOtherProfessions/WriteToPandas(df) - results/other-professions/WriteToFiles/GroupRecordsByDestinationAndShard/Read+SaveOtherProfessions/WriteToPandas(df) - results/other-professions/WriteToFiles/ParDo(_WriteShardedRecordsFn)+SaveOtherProfessions/WriteToPandas(df) - results/other-professions/WriteToFiles/Map(<lambda at fileio.py:633>)+SaveOtherProfessions/WriteToPandas(df) - results/other-professions/WriteToFiles/GroupTempFilesByDestination/Write\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:30:29.245Z: JOB_MESSAGE_BASIC: Executing operation SaveDoctors/WriteToPandas(df) - results/doctors/WriteToFiles/GroupTempFilesByDestination/Close\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:30:29.271Z: JOB_MESSAGE_BASIC: Executing operation SaveOtherProfessions/WriteToPandas(df) - results/other-professions/WriteToFiles/GroupTempFilesByDestination/Close\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:30:29.290Z: JOB_MESSAGE_BASIC: Finished operation SaveDoctors/WriteToPandas(df) - results/doctors/WriteToFiles/GroupTempFilesByDestination/Close\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:30:29.318Z: JOB_MESSAGE_BASIC: Finished operation SaveOtherProfessions/WriteToPandas(df) - results/other-professions/WriteToFiles/GroupTempFilesByDestination/Close\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:30:29.348Z: JOB_MESSAGE_BASIC: Executing operation SaveDoctors/WriteToPandas(df) - results/doctors/WriteToFiles/GroupTempFilesByDestination/Read+SaveDoctors/WriteToPandas(df) - results/doctors/WriteToFiles/ParDo(_MoveTempFilesIntoFinalDestinationFn)+SaveDoctors/Map(<lambda at io.py:822>)\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:30:29.377Z: JOB_MESSAGE_BASIC: Executing operation SaveOtherProfessions/WriteToPandas(df) - results/other-professions/WriteToFiles/GroupTempFilesByDestination/Read+SaveOtherProfessions/WriteToPandas(df) - results/other-professions/WriteToFiles/ParDo(_MoveTempFilesIntoFinalDestinationFn)+SaveOtherProfessions/Map(<lambda at io.py:822>)\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:30:29.483Z: JOB_MESSAGE_BASIC: Finished operation SaveOtherProfessions/WriteToPandas(df) - results/other-professions/WriteToFiles/GroupTempFilesByDestination/Read+SaveOtherProfessions/WriteToPandas(df) - results/other-professions/WriteToFiles/ParDo(_MoveTempFilesIntoFinalDestinationFn)+SaveOtherProfessions/Map(<lambda at io.py:822>)\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:30:29.552Z: JOB_MESSAGE_BASIC: Finished operation SaveDoctors/WriteToPandas(df) - results/doctors/WriteToFiles/GroupTempFilesByDestination/Read+SaveDoctors/WriteToPandas(df) - results/doctors/WriteToFiles/ParDo(_MoveTempFilesIntoFinalDestinationFn)+SaveDoctors/Map(<lambda at io.py:822>)\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:30:29.729Z: JOB_MESSAGE_BASIC: Stopping worker pool...\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:2026-01-20T20:32:52.036Z: JOB_MESSAGE_BASIC: Worker pool stopped.\n", |
| "INFO:apache_beam.runners.dataflow.dataflow_runner:Job 2026-01-20_12_27_04-4951132020218181608 is in state JOB_STATE_DONE\n" |
| ] |
| } |
| ], |
| "source": [ |
| "! python -m apache_beam.yaml.main --pipeline_spec_file=pipelines/pipeline-nonlinear-01.yaml" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "L0oLn-ZRdHO1" |
| }, |
| "source": [ |
| "The output are 2 files: `results/doctors-00000-of-00001` and `results/other-professions-00000-of-00001`. Let's see their contents:" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "4LSQhwhKdHO1", |
| "outputId": "0f236b07-d800-4b87-d420-81092b3366cd" |
| }, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "id,firstname,age,country,profession\n", |
| "4,Rani,53,Spain,doctor\n", |
| "5,Natka,26,France,doctor\n", |
| "7,Elvira,39,Italy,doctor\n", |
| "8,Asia,10,Belgium,doctor\n", |
| "16,Elie,10,Italy,doctor\n", |
| "26,Vonny,35,Germany,doctor\n", |
| "37,Tarra,29,Spain,doctor\n", |
| "40,Patricia,45,Italy,doctor\n", |
| "50,Jaime,55,France,doctor\n" |
| ] |
| } |
| ], |
| "source": [ |
| "! head results/doctors-00000-of-00001" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "ai9_rcMRdHO1", |
| "outputId": "ca9178dc-185c-4177-ce21-93be99478999" |
| }, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "id,firstname,age,country,profession\n", |
| "1,Reeba,58,Belgium,unemployed\n", |
| "2,Maud,45,Spain,firefighter\n", |
| "3,Meg,11,France,unemployed\n", |
| "6,Aurore,32,Italy,police officer\n", |
| "9,Lesly,35,Spain,firefighter\n", |
| "10,Orelia,31,Germany,police officer\n", |
| "11,Theodora,16,Italy,unemployed\n", |
| "12,Viviene,44,Germany,police officer\n", |
| "13,Teriann,50,Belgium,police officer\n" |
| ] |
| } |
| ], |
| "source": [ |
| "! head results/other-professions-00000-of-00001" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "JJkuKyN8dHO1" |
| }, |
| "source": [ |
| "# Summary\n", |
| "\n", |
| "Congratulations! You've just run Apache Beam pipelines using YAML.\n", |
| "\n", |
| "To learn more about Beam YAML visit [Beam YAML API documentation page](https://beam.apache.org/documentation/sdks/yaml/).\n", |
| "\n", |
| "For extra practice with different runners see below.\n" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "U1F-xwV1WyhN" |
| }, |
| "source": [ |
| "# Prism Runner\n", |
| "\n", |
| "To run your pipelines with the newer Prism Runner just add this flag - `--runner=PrismRunner` - on any execution lines above and as noted in the below example." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "2Y0t0SuTXrRi" |
| }, |
| "source": [ |
| "Save the yaml pipeline again." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "jF_ldDajXVZD", |
| "outputId": "e2d40937-46f1-414a-8dca-80b622e52fa4" |
| }, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "Overwriting pipelines/pipeline-01.yaml\n" |
| ] |
| } |
| ], |
| "source": [ |
| "%%writefile 'pipelines/pipeline-01.yaml'\n", |
| "pipeline:\n", |
| " type: chain\n", |
| " transforms:\n", |
| " - type: ReadFromCsv\n", |
| " config:\n", |
| " path: data/people.csv\n", |
| " - type: LogForTesting" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "F4v946QJXyYS" |
| }, |
| "source": [ |
| "Execute with the newer Prism Runner." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "VJ7bsPVaXXuz", |
| "outputId": "93602d35-62de-4cf2-bb34-409de5f62fbf" |
| }, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "WARNING:root:crcmod package not found. This package is required if python-snappy or google-crc32c are not installed. To ensure crcmod is installed, install the tfrecord extra: pip install apache-beam[tfrecord]\n", |
| "Building pipeline...\n", |
| "2026-01-20 21:44:58.884468: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.\n", |
| "2026-01-20 21:44:58.910055: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.\n", |
| "2026-01-20 21:44:58.961795: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered\n", |
| "WARNING: All log messages before absl::InitializeLog() is called are written to STDERR\n", |
| "E0000 00:00:1768945499.030458 68341 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered\n", |
| "E0000 00:00:1768945499.059249 68341 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered\n", |
| "W0000 00:00:1768945499.170677 68341 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.\n", |
| "W0000 00:00:1768945499.170759 68341 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.\n", |
| "W0000 00:00:1768945499.170769 68341 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.\n", |
| "W0000 00:00:1768945499.170774 68341 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.\n", |
| "2026-01-20 21:44:59.197532: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.\n", |
| "To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.\n", |
| "WARNING:torchao.kernel.intmm:Warning: Detected no triton, on systems without Triton certain kernels will not work\n", |
| "INFO:datasets:TensorFlow version 2.19.0 available.\n", |
| "INFO:datasets:JAX version 0.7.2 available.\n", |
| "/usr/local/lib/python3.12/dist-packages/google/cloud/aiplatform/models.py:52: FutureWarning: Support for google-cloud-storage < 3.0.0 will be removed in a future version of google-cloud-aiplatform. Please upgrade to google-cloud-storage >= 3.0.0.\n", |
| " from google.cloud.aiplatform.utils import gcs_utils\n", |
| "WARNING:root:Could not load ML transform module apache_beam.ml.transforms.tft: No module named 'tensorflow_transform'. Please install the necessary module dependencies\n", |
| "INFO:apache_beam.yaml.yaml_transform:Expanding \"ReadFromCsv\" at line 4 \n", |
| "INFO:root:Computing dataframe stage <ComputeStage(PTransform) label=[[ComputedExpression[get_column_Series_135663552168592], ComputedExpression[astype_Series_135663552168400], ComputedExpression[set_column_DataFrame_135663552172624], ComputedExpression[get_column_Series_135663552171136], ComputedExpression[astype_Series_135663552174688], ComputedExpression[set_column_DataFrame_135663552174928], ComputedExpression[get_column_Series_135663552173584], ComputedExpression[astype_Series_135663552170752], ComputedExpression[set_column_DataFrame_135663552572800]]:135663548886304]> for Stage[inputs={PlaceholderExpression[placeholder_DataFrame_135663552183808]}, partitioning=Arbitrary, ops=[ComputedExpression[get_column_Series_135663552168592], ComputedExpression[astype_Series_135663552168400], ComputedExpression[set_column_DataFrame_135663552172624], ComputedExpression[get_column_Series_135663552171136], ComputedExpression[astype_Series_135663552174688], ComputedExpression[set_column_DataFrame_135663552174928], ComputedExpression[get_column_Series_135663552173584], ComputedExpression[astype_Series_135663552170752], ComputedExpression[set_column_DataFrame_135663552572800]], outputs={PlaceholderExpression[placeholder_DataFrame_135663552183808], ComputedExpression[set_column_DataFrame_135663552572800]}]\n", |
| "INFO:apache_beam.yaml.yaml_transform:Expanding \"LogForTesting\" at line 7 \n", |
| "Running pipeline...\n", |
| "INFO:apache_beam.runners.worker.worker_pool_main:Listening for workers at localhost:37043\n", |
| "INFO:apache_beam.runners.portability.prism_runner:Using cached prism binary/zip from /root/.apache_beam/cache/prism/bin/apache_beam-v2.71.0-prism-linux-amd64.zip for https://github.com/apache/beam/releases/download/v2.71.0-RC3/apache_beam-v2.71.0-prism-linux-amd64.zip\n", |
| "INFO:apache_beam.runners.portability.prism_runner:Using cached prism binary from /root/.apache_beam/cache/prism/bin/apache_beam-v2.71.0-prism-linux-amd64 for /root/.apache_beam/cache/prism/bin/apache_beam-v2.71.0-prism-linux-amd64.zip\n", |
| "INFO:apache_beam.runners.portability.prism_runner:Prism binary path resolved to: /root/.apache_beam/cache/prism/bin/apache_beam-v2.71.0-prism-linux-amd64\n", |
| "INFO:apache_beam.utils.subprocess_server:Starting service with ('/root/.apache_beam/cache/prism/bin/apache_beam-v2.71.0-prism-linux-amd64' '--job_port' '60405' '--log_level' 'info' '--log_kind' 'json' '--serve_http=false')\n", |
| "INFO:PrismRunner:Serving JobManagement (endpoint='localhost:60405')\n", |
| "INFO:apache_beam.runners.portability.portable_runner:Environment \"LOOPBACK\" has started a component necessary for the execution. Be sure to run the pipeline using\n", |
| " with Pipeline() as p:\n", |
| " p.apply(..)\n", |
| "This ensures that the pipeline finishes before this program exits.\n", |
| "INFO:apache_beam.runners.portability.portable_runner:Job state changed to STOPPED\n", |
| "INFO:root:starting job-001[job]\n", |
| "INFO:apache_beam.runners.portability.portable_runner:Job state changed to STARTING\n", |
| "INFO:root:running job-001[job]\n", |
| "INFO:apache_beam.runners.worker.statecache:Creating state cache with size 0\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:Creating insecure control channel for localhost:60405.\n", |
| "INFO:apache_beam.runners.portability.portable_runner:Job state changed to RUNNING\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:Control channel established.\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:Initializing SDKHarness with unbounded number of workers.\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:Creating insecure state channel for localhost:60405.\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:State channel established.\n", |
| "INFO:apache_beam.runners.worker.data_plane:Creating client data channel for localhost:60405\n", |
| "INFO:apache_beam.runners.worker.data_plane:Data channel established.\n", |
| "INFO:root:{\"id\": 1, \"firstname\": \"Reeba\", \"age\": 58, \"country\": \"Belgium\", \"profession\": \"unemployed\"}\n", |
| "INFO:root:{\"id\": 2, \"firstname\": \"Maud\", \"age\": 45, \"country\": \"Spain\", \"profession\": \"firefighter\"}\n", |
| "INFO:root:{\"id\": 3, \"firstname\": \"Meg\", \"age\": 11, \"country\": \"France\", \"profession\": \"unemployed\"}\n", |
| "INFO:root:{\"id\": 4, \"firstname\": \"Rani\", \"age\": 53, \"country\": \"Spain\", \"profession\": \"doctor\"}\n", |
| "INFO:root:{\"id\": 5, \"firstname\": \"Natka\", \"age\": 26, \"country\": \"France\", \"profession\": \"doctor\"}\n", |
| "INFO:root:{\"id\": 6, \"firstname\": \"Aurore\", \"age\": 32, \"country\": \"Italy\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 7, \"firstname\": \"Elvira\", \"age\": 39, \"country\": \"Italy\", \"profession\": \"doctor\"}\n", |
| "INFO:root:{\"id\": 8, \"firstname\": \"Asia\", \"age\": 10, \"country\": \"Belgium\", \"profession\": \"doctor\"}\n", |
| "INFO:root:{\"id\": 9, \"firstname\": \"Lesly\", \"age\": 35, \"country\": \"Spain\", \"profession\": \"firefighter\"}\n", |
| "INFO:root:{\"id\": 10, \"firstname\": \"Orelia\", \"age\": 31, \"country\": \"Germany\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 11, \"firstname\": \"Theodora\", \"age\": 16, \"country\": \"Italy\", \"profession\": \"unemployed\"}\n", |
| "INFO:root:{\"id\": 12, \"firstname\": \"Viviene\", \"age\": 44, \"country\": \"Germany\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 13, \"firstname\": \"Teriann\", \"age\": 50, \"country\": \"Belgium\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 14, \"firstname\": \"Carol-Jean\", \"age\": 23, \"country\": \"Germany\", \"profession\": \"unemployed\"}\n", |
| "INFO:root:{\"id\": 15, \"firstname\": \"Drucie\", \"age\": 15, \"country\": \"Spain\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 16, \"firstname\": \"Elie\", \"age\": 10, \"country\": \"Italy\", \"profession\": \"doctor\"}\n", |
| "INFO:root:{\"id\": 17, \"firstname\": \"Shaylyn\", \"age\": 34, \"country\": \"Belgium\", \"profession\": \"worker\"}\n", |
| "INFO:root:{\"id\": 18, \"firstname\": \"Fayre\", \"age\": 33, \"country\": \"Spain\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 19, \"firstname\": \"Sabina\", \"age\": 52, \"country\": \"Germany\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 20, \"firstname\": \"Aryn\", \"age\": 20, \"country\": \"Belgium\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 21, \"firstname\": \"Darlleen\", \"age\": 49, \"country\": \"Spain\", \"profession\": \"worker\"}\n", |
| "INFO:root:{\"id\": 22, \"firstname\": \"Jere\", \"age\": 18, \"country\": \"Italy\", \"profession\": \"worker\"}\n", |
| "INFO:root:{\"id\": 23, \"firstname\": \"Candi\", \"age\": 60, \"country\": \"Germany\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 24, \"firstname\": \"Sindee\", \"age\": 40, \"country\": \"Germany\", \"profession\": \"firefighter\"}\n", |
| "INFO:root:{\"id\": 25, \"firstname\": \"Selma\", \"age\": 20, \"country\": \"Spain\", \"profession\": \"worker\"}\n", |
| "INFO:root:{\"id\": 26, \"firstname\": \"Vonny\", \"age\": 35, \"country\": \"Germany\", \"profession\": \"doctor\"}\n", |
| "INFO:root:{\"id\": 27, \"firstname\": \"Kate\", \"age\": 53, \"country\": \"Spain\", \"profession\": \"worker\"}\n", |
| "INFO:root:{\"id\": 28, \"firstname\": \"Annabela\", \"age\": 48, \"country\": \"Belgium\", \"profession\": \"worker\"}\n", |
| "INFO:root:{\"id\": 29, \"firstname\": \"Jenilee\", \"age\": 55, \"country\": \"Germany\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 30, \"firstname\": \"Gusella\", \"age\": 44, \"country\": \"France\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 31, \"firstname\": \"Fawne\", \"age\": 35, \"country\": \"Spain\", \"profession\": \"worker\"}\n", |
| "INFO:root:{\"id\": 32, \"firstname\": \"Karolina\", \"age\": 39, \"country\": \"Spain\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 33, \"firstname\": \"Sadie\", \"age\": 58, \"country\": \"Germany\", \"profession\": \"firefighter\"}\n", |
| "INFO:root:{\"id\": 34, \"firstname\": \"Clo\", \"age\": 10, \"country\": \"Italy\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 35, \"firstname\": \"Beth\", \"age\": 46, \"country\": \"Spain\", \"profession\": \"firefighter\"}\n", |
| "INFO:root:{\"id\": 36, \"firstname\": \"Adore\", \"age\": 18, \"country\": \"Italy\", \"profession\": \"firefighter\"}\n", |
| "INFO:root:{\"id\": 37, \"firstname\": \"Tarra\", \"age\": 29, \"country\": \"Spain\", \"profession\": \"doctor\"}\n", |
| "INFO:root:{\"id\": 38, \"firstname\": \"Jessamyn\", \"age\": 36, \"country\": \"France\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 39, \"firstname\": \"Deedee\", \"age\": 24, \"country\": \"Germany\", \"profession\": \"unemployed\"}\n", |
| "INFO:root:{\"id\": 40, \"firstname\": \"Patricia\", \"age\": 45, \"country\": \"Italy\", \"profession\": \"doctor\"}\n", |
| "INFO:root:{\"id\": 41, \"firstname\": \"Wileen\", \"age\": 39, \"country\": \"Spain\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 42, \"firstname\": \"Paola\", \"age\": 55, \"country\": \"Italy\", \"profession\": \"worker\"}\n", |
| "INFO:root:{\"id\": 43, \"firstname\": \"Gwyneth\", \"age\": 37, \"country\": \"Italy\", \"profession\": \"worker\"}\n", |
| "INFO:root:{\"id\": 44, \"firstname\": \"Stacey\", \"age\": 36, \"country\": \"Spain\", \"profession\": \"worker\"}\n", |
| "INFO:root:{\"id\": 45, \"firstname\": \"Camile\", \"age\": 60, \"country\": \"Germany\", \"profession\": \"unemployed\"}\n", |
| "INFO:root:{\"id\": 46, \"firstname\": \"Sheree\", \"age\": 10, \"country\": \"Spain\", \"profession\": \"unemployed\"}\n", |
| "INFO:root:{\"id\": 47, \"firstname\": \"Albertina\", \"age\": 53, \"country\": \"France\", \"profession\": \"police officer\"}\n", |
| "INFO:root:{\"id\": 48, \"firstname\": \"Jinny\", \"age\": 30, \"country\": \"Spain\", \"profession\": \"firefighter\"}\n", |
| "INFO:root:{\"id\": 49, \"firstname\": \"Kayla\", \"age\": 57, \"country\": \"Italy\", \"profession\": \"firefighter\"}\n", |
| "INFO:root:{\"id\": 50, \"firstname\": \"Jaime\", \"age\": 55, \"country\": \"France\", \"profession\": \"doctor\"}\n", |
| "INFO:PrismRunner:pipeline done! (job='job-001[job]')\n", |
| "INFO:root:pipeline completed job-001[job]\n", |
| "INFO:root:terminating job-001[job]\n", |
| "INFO:apache_beam.runners.portability.portable_runner:Job state changed to DONE\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:No more requests from control plane\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:SDK Harness waiting for in-flight requests to complete\n", |
| "INFO:apache_beam.runners.worker.data_plane:Closing all cached grpc data channels.\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:Closing all cached gRPC state handlers.\n", |
| "INFO:apache_beam.runners.worker.sdk_worker:Done consuming work.\n" |
| ] |
| } |
| ], |
| "source": [ |
| "! python -m apache_beam.yaml.main --pipeline_spec_file=pipelines/pipeline-01.yaml --runner=PrismRunner" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "J4pHxZhXYAkT" |
| }, |
| "source": [ |
| "# Dataflow Runner\n", |
| "\n", |
| "To run your pipeline in Dataflow, you'll need to set up your Google Cloud and run the pipeline with the `--runner=DataflowRunner`. Fill in the required fields below to do the general setup.\n", |
| "\n", |
| "For additional context, follow https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#run-on-dataflow." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "id": "xd5TeiAs6ULR" |
| }, |
| "outputs": [], |
| "source": [ |
| "%pip install --quiet apache-beam[gcp]\n", |
| "!gcloud auth login <YOUR_LOGIN>\n", |
| "!gcloud config set project <YOUR_GCP_PROJECT>\n", |
| "!gcloud auth application-default login\n", |
| "!gcloud auth application-default set-quota-project <YOUR_GCP_PROJECT>" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "PdLkavCB6j27" |
| }, |
| "source": [ |
| "Then save your pipeline." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "id": "OCNPo10b64qZ" |
| }, |
| "outputs": [], |
| "source": [ |
| "%%writefile 'pipelines/pipeline-01.yaml'\n", |
| "pipeline:\n", |
| " type: chain\n", |
| " transforms:\n", |
| " - type: ReadFromCsv\n", |
| " config:\n", |
| " path: data/people.csv\n", |
| " - type: LogForTesting" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "nm3_mvnv7kfo" |
| }, |
| "source": [ |
| "Then run your pipeline with the `--runner=DataflowRunner` flag and make sure to fill in the required other flags." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "id": "iksVknz37ACW" |
| }, |
| "outputs": [], |
| "source": [ |
| "! python -m apache_beam.yaml.main --pipeline_spec_file=pipelines/pipeline-01.yaml --runner=DataflowRunner --project=<YOUR_GCP_PROJECT> --region=<YOUR_REGION> --temp_location=gs://<YOUR_TEMP_BUCKET> --staging_location=gs://<YOUR_STAGING_BUCKET>" |
| ] |
| } |
| ], |
| "metadata": { |
| "colab": { |
| "provenance": [], |
| "toc_visible": true, |
| "include_colab_link": true |
| }, |
| "kernelspec": { |
| "display_name": "Python 3 (ipykernel)", |
| "language": "python", |
| "name": "python3" |
| }, |
| "language_info": { |
| "codemirror_mode": { |
| "name": "ipython", |
| "version": 3 |
| }, |
| "file_extension": ".py", |
| "mimetype": "text/x-python", |
| "name": "python", |
| "nbconvert_exporter": "python", |
| "pygments_lexer": "ipython3", |
| "version": "3.8.9" |
| } |
| }, |
| "nbformat": 4, |
| "nbformat_minor": 0 |
| } |