| { |
| "nbformat": 4, |
| "nbformat_minor": 2, |
| "metadata": { |
| "colab": { |
| "name": "Beam RunInference", |
| "provenance": [], |
| "collapsed_sections": [], |
| "toc_visible": true |
| }, |
| "kernelspec": { |
| "name": "python3", |
| "display_name": "Python 3" |
| }, |
| "language_info": { |
| "name": "python" |
| } |
| }, |
| "cells": [ |
| { |
| "cell_type": "code", |
| "source": [ |
| "#@title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n", |
| "\n", |
| "# Licensed to the Apache Software Foundation (ASF) under one\n", |
| "# or more contributor license agreements. See the NOTICE file\n", |
| "# distributed with this work for additional information\n", |
| "# regarding copyright ownership. The ASF licenses this file\n", |
| "# to you under the Apache License, Version 2.0 (the\n", |
| "# \"License\"); you may not use this file except in compliance\n", |
| "# with the License. You may obtain a copy of the License at\n", |
| "#\n", |
| "# http://www.apache.org/licenses/LICENSE-2.0\n", |
| "#\n", |
| "# Unless required by applicable law or agreed to in writing,\n", |
| "# software distributed under the License is distributed on an\n", |
| "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n", |
| "# KIND, either express or implied. See the License for the\n", |
| "# specific language governing permissions and limitations\n", |
| "# under the License." |
| ], |
| "metadata": { |
| "id": "C1rAsD2L-hSO", |
| "cellView": "form" |
| }, |
| "id": "C1rAsD2L-hSO", |
| "execution_count": null, |
| "outputs": [] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "b6f8f3af-744e-4eaa-8a30-6d03e8e4d21e", |
| "metadata": { |
| "id": "b6f8f3af-744e-4eaa-8a30-6d03e8e4d21e" |
| }, |
| "source": [ |
| "# RunInference\n", |
| "\n", |
| "<button>\n", |
| " <a href=\"https://beam.apache.org/documentation/sdks/python-machine-learning/\">\n", |
| " <img src=\"https://beam.apache.org/images/favicon.ico\" alt=\"Open the docs\" height=\"16\"/>\n", |
| " Beam RunInference\n", |
| " </a>\n", |
| "</button>\n", |
| "\n", |
| "In this notebook, we walk through the use of the RunInference transform.\n", |
| "The transform and its accompanying [ModelHandler](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.ModelHandler) classes handle the following tasks:\n", |
| "\n", |
| "\n", |
| "* Optimizing loading models from popular frameworks.\n", |
| "* Batching examples in a scalable fashion.\n", |
| "\n", |
| "\n", |
| "This notebook illustrates common RunInference patterns such as the following:\n", |
| "* Generating predictions using both Pytorch and Scikit-learn.\n", |
| "* Post processing results after RunInference.\n", |
| "* Inference with multiple models in the same pipeline.\n", |
| "\n", |
| "The linear regression models used in these samples are trained on data that correspondes to the 5 and 10 times table; that is,`y = 5x` and `y = 10x` respectively." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "299af9bb-b2fc-405c-96e7-ee0a6ae24bdd", |
| "metadata": { |
| "id": "299af9bb-b2fc-405c-96e7-ee0a6ae24bdd" |
| }, |
| "source": [ |
| "### Dependencies\n", |
| "\n", |
| "The RunInference library is available in Apache Beam version <b>2.40</b> or later.\n", |
| "\n", |
| "Pytorch module is needed to use Pytorch RunInference API. use `pip` to install Pytorch." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "source": [ |
| "# issue: https://github.com/apache/beam/issues/22218. Becuase of the updates to the Google cloud APIs, Beam SDK from 2.34.0 till 2.40.0 has some dependency conflicts. See the issue for more details.\n", |
| "# Workaround to install the apache beam without getting stuck for long time. Runtime might need to restart after this step.\n", |
| "!pip install google-api-core==1.31.6 --quiet\n", |
| "!pip install google-cloud-pubsub==2.13.1 google-cloud-bigquery-storage==2.13.2 --quiet\n", |
| "!pip install apache-beam[gcp,dataframe] --quiet" |
| ], |
| "metadata": { |
| "id": "loxD-rOVchRn", |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "outputId": "661baa2d-6e0f-4478-b7c1-db911593d5ff" |
| }, |
| "id": "loxD-rOVchRn", |
| "execution_count": null, |
| "outputs": [ |
| { |
| "output_type": "stream", |
| "name": "stdout", |
| "text": [ |
| "\u001b[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.\n", |
| "pandas-gbq 0.13.3 requires google-cloud-bigquery[bqstorage,pandas]<2.0.0dev,>=1.11.1, but you have google-cloud-bigquery 2.34.4 which is incompatible.\u001b[0m\n" |
| ] |
| } |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "7f841596-f217-46d2-b64e-1952db4de4cb", |
| "metadata": { |
| "id": "7f841596-f217-46d2-b64e-1952db4de4cb", |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "outputId": "da04ccb9-0801-47f6-ec9e-e87f0ca4569f" |
| }, |
| "outputs": [ |
| { |
| "output_type": "stream", |
| "name": "stdout", |
| "text": [ |
| "Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/\n", |
| "Requirement already satisfied: torch in /usr/local/lib/python3.7/dist-packages (1.12.0+cu113)\n", |
| "Requirement already satisfied: typing-extensions in /usr/local/lib/python3.7/dist-packages (from torch) (4.1.1)\n" |
| ] |
| } |
| ], |
| "source": [ |
| "%pip install torch" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "9a92e3a7-beb6-46ae-a5b0-53c15487de38", |
| "metadata": { |
| "id": "9a92e3a7-beb6-46ae-a5b0-53c15487de38" |
| }, |
| "outputs": [], |
| "source": [ |
| "import argparse\n", |
| "import csv\n", |
| "import json\n", |
| "import os\n", |
| "import torch\n", |
| "from typing import Tuple\n", |
| "\n", |
| "import apache_beam as beam\n", |
| "import numpy\n", |
| "from apache_beam.io.gcp.bigquery import ReadFromBigQuery\n", |
| "from apache_beam.ml.inference.base import KeyedModelHandler\n", |
| "from apache_beam.ml.inference.base import PredictionResult\n", |
| "from apache_beam.ml.inference.base import RunInference\n", |
| "from apache_beam.dataframe.convert import to_pcollection\n", |
| "from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor\n", |
| "from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerKeyedTensor\n", |
| "from apache_beam.options.pipeline_options import PipelineOptions\n", |
| "\n", |
| "import warnings\n", |
| "warnings.filterwarnings('ignore')" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "source": [ |
| "from google.colab import auth\n", |
| "auth.authenticate_user()" |
| ], |
| "metadata": { |
| "id": "V0E35R5Ka2cE" |
| }, |
| "id": "V0E35R5Ka2cE", |
| "execution_count": null, |
| "outputs": [] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "248458a6-cfd8-474d-ad0e-f37f7ae981ae", |
| "metadata": { |
| "id": "248458a6-cfd8-474d-ad0e-f37f7ae981ae" |
| }, |
| "outputs": [], |
| "source": [ |
| "# Constants\n", |
| "project = \"<your-project>\"\n", |
| "bucket = \"<your-bucket>\"\n", |
| "\n", |
| "# set the project to avoid warnings.\n", |
| "os.environ['GOOGLE_CLOUD_PROJECT'] = project\n", |
| "\n", |
| "save_model_dir_multiply_five = 'five_times_table_torch.pt'\n", |
| "save_model_dir_multiply_ten = 'ten_times_table_torch.pt'" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "b2b7cedc-79f5-4599-8178-e5da35dba032", |
| "metadata": { |
| "tags": [], |
| "id": "b2b7cedc-79f5-4599-8178-e5da35dba032" |
| }, |
| "source": [ |
| "## Create data and Pytorch models for RunInference transform" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "202e5a3e-4ccd-4ae3-9852-e47de0721839", |
| "metadata": { |
| "id": "202e5a3e-4ccd-4ae3-9852-e47de0721839" |
| }, |
| "source": [ |
| "### Linear regression model in Pytorch." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "68bf8bf0-f735-45ee-8ebb-a2d8bb8a6bc7", |
| "metadata": { |
| "id": "68bf8bf0-f735-45ee-8ebb-a2d8bb8a6bc7" |
| }, |
| "outputs": [], |
| "source": [ |
| "class LinearRegression(torch.nn.Module):\n", |
| " def __init__(self, input_dim=1, output_dim=1):\n", |
| " super().__init__()\n", |
| " self.linear = torch.nn.Linear(input_dim, output_dim) \n", |
| " def forward(self, x):\n", |
| " out = self.linear(x)\n", |
| " return out" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "1918435c-0029-4eb6-8eee-bda5470eb2ff", |
| "metadata": { |
| "id": "1918435c-0029-4eb6-8eee-bda5470eb2ff" |
| }, |
| "source": [ |
| "### Prepare train and test data to train a 5 times model.\n", |
| "* `x` contains values in the range from 0 to 99.\n", |
| "* `y` is a list of 5 * `x`. \n", |
| "* `value_to_predict` includes values outside of the training data." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "9302917f-6200-4af4-a410-4bd6f2a070b8", |
| "metadata": { |
| "id": "9302917f-6200-4af4-a410-4bd6f2a070b8" |
| }, |
| "outputs": [], |
| "source": [ |
| "x = numpy.arange(0, 100, dtype=numpy.float32).reshape(-1, 1)\n", |
| "y = (x * 5).reshape(-1, 1)\n", |
| "value_to_predict = numpy.array([20, 40, 60, 90], dtype=numpy.float32).reshape(-1, 1)" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "9dc22aec-08c3-43ab-a5ce-451cb63c485a", |
| "metadata": { |
| "id": "9dc22aec-08c3-43ab-a5ce-451cb63c485a" |
| }, |
| "source": [ |
| "### Train the linear regression mode on 5 times data." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "0a8b7924-ff06-4584-8f41-079268387a67", |
| "metadata": { |
| "id": "0a8b7924-ff06-4584-8f41-079268387a67" |
| }, |
| "outputs": [], |
| "source": [ |
| "five_times_model = LinearRegression()\n", |
| "optimizer = torch.optim.Adam(five_times_model.parameters())\n", |
| "loss_fn = torch.nn.L1Loss()\n", |
| "\n", |
| "\"\"\"\n", |
| "Train the five_times_model\n", |
| "\"\"\"\n", |
| "epochs = 10000\n", |
| "tensor_x = torch.from_numpy(x)\n", |
| "tensor_y = torch.from_numpy(y)\n", |
| "for epoch in range(epochs):\n", |
| " y_pred = five_times_model(tensor_x)\n", |
| " loss = loss_fn(y_pred, tensor_y)\n", |
| " five_times_model.zero_grad()\n", |
| " loss.backward()\n", |
| " optimizer.step()" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "bd106b29-6187-42c1-9743-1666c147b5e3", |
| "metadata": { |
| "id": "bd106b29-6187-42c1-9743-1666c147b5e3" |
| }, |
| "source": [ |
| "Save the model using `torch.save()` and verify if the saved model file exists." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "882bbada-4f6d-4370-a047-c5961e564ee8", |
| "metadata": { |
| "id": "882bbada-4f6d-4370-a047-c5961e564ee8", |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "outputId": "3002ed41-dbd5-4a87-d2d1-d1c7908be2f2" |
| }, |
| "outputs": [ |
| { |
| "output_type": "stream", |
| "name": "stdout", |
| "text": [ |
| "True\n" |
| ] |
| } |
| ], |
| "source": [ |
| "torch.save(five_times_model.state_dict(), save_model_dir_multiply_five)\n", |
| "print(os.path.exists(save_model_dir_multiply_five)) # verify if the model is saved" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "fa84cfca-83c6-4a91-aea1-3dd034c42ae0", |
| "metadata": { |
| "id": "fa84cfca-83c6-4a91-aea1-3dd034c42ae0" |
| }, |
| "source": [ |
| "### Prepare train and test data to train a 10 times model.\n", |
| "* `x` contains values in the range from 0 to 99.\n", |
| "* `y` is a list of 10 * `x`. " |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "27e0d1f6-2c3e-4418-8fb0-b5b89ffa66d5", |
| "metadata": { |
| "id": "27e0d1f6-2c3e-4418-8fb0-b5b89ffa66d5" |
| }, |
| "outputs": [], |
| "source": [ |
| "x = numpy.arange(0, 100, dtype=numpy.float32).reshape(-1, 1)\n", |
| "y = (x * 10).reshape(-1, 1)" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "24d946dc-4fe0-4030-8f6a-aa8d27fd353d", |
| "metadata": { |
| "id": "24d946dc-4fe0-4030-8f6a-aa8d27fd353d" |
| }, |
| "source": [ |
| "### Train the linear regression model on 10 times data." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "2b352313-b791-48fd-9b9d-b54233176416", |
| "metadata": { |
| "id": "2b352313-b791-48fd-9b9d-b54233176416" |
| }, |
| "outputs": [], |
| "source": [ |
| "ten_times_model = LinearRegression()\n", |
| "optimizer = torch.optim.Adam(ten_times_model.parameters())\n", |
| "loss_fn = torch.nn.L1Loss()\n", |
| "\n", |
| "epochs = 10000\n", |
| "tensor_x = torch.from_numpy(x)\n", |
| "tensor_y = torch.from_numpy(y)\n", |
| "for epoch in range(epochs):\n", |
| " y_pred = ten_times_model(tensor_x)\n", |
| " loss = loss_fn(y_pred, tensor_y)\n", |
| " ten_times_model.zero_grad()\n", |
| " loss.backward()\n", |
| " optimizer.step()" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "6f959e3b-230b-45e2-9df3-dd1f11acacd7", |
| "metadata": { |
| "id": "6f959e3b-230b-45e2-9df3-dd1f11acacd7" |
| }, |
| "source": [ |
| "Save the model using `torch.save()`" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "42b2ca0f-5d44-4d15-a313-f3d56ae7f675", |
| "metadata": { |
| "id": "42b2ca0f-5d44-4d15-a313-f3d56ae7f675", |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "outputId": "ed9f51c1-8dfe-44bc-c861-28d660ad3799" |
| }, |
| "outputs": [ |
| { |
| "output_type": "stream", |
| "name": "stdout", |
| "text": [ |
| "True\n" |
| ] |
| } |
| ], |
| "source": [ |
| "torch.save(ten_times_model.state_dict(), save_model_dir_multiply_ten)\n", |
| "print(os.path.exists(save_model_dir_multiply_ten)) # verify if the model is saved" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "2e20efc4-13e8-46e2-9848-c0347deaa5af", |
| "metadata": { |
| "id": "2e20efc4-13e8-46e2-9848-c0347deaa5af" |
| }, |
| "source": [ |
| "# Pattern 1: RunInference for predictions." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "1099fe94-d4cf-422e-a0d3-0cfba8af64d5", |
| "metadata": { |
| "id": "1099fe94-d4cf-422e-a0d3-0cfba8af64d5" |
| }, |
| "source": [ |
| "### Step 1 - Use RunInference within the pipeline.\n", |
| "\n", |
| "1. Create pytorch model handler object by passing required arguments such as `state_dict_path`, `model_class`, `model_params` to the `PytorchModelHandlerTensor` class.\n", |
| "2. Pass the `PytorchModelHandlerTensor` object to the RunInference transform to peform prediction on unkeyed data." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "e488a821-3b70-4284-96f3-ddee4dcb9d71", |
| "metadata": { |
| "id": "e488a821-3b70-4284-96f3-ddee4dcb9d71", |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "outputId": "6f4e4136-aa6c-4fd4-8be6-2ed8d7ca4545" |
| }, |
| "outputs": [ |
| { |
| "output_type": "stream", |
| "name": "stdout", |
| "text": [ |
| "PredictionResult(example=tensor([20.]), inference=tensor([99.9943], grad_fn=<UnbindBackward0>))\n", |
| "PredictionResult(example=tensor([40.]), inference=tensor([199.9889], grad_fn=<UnbindBackward0>))\n", |
| "PredictionResult(example=tensor([60.]), inference=tensor([299.9835], grad_fn=<UnbindBackward0>))\n", |
| "PredictionResult(example=tensor([90.]), inference=tensor([449.9753], grad_fn=<UnbindBackward0>))\n" |
| ] |
| } |
| ], |
| "source": [ |
| "torch_five_times_model_handler = PytorchModelHandlerTensor(\n", |
| " state_dict_path=save_model_dir_multiply_five,\n", |
| " model_class=LinearRegression,\n", |
| " model_params={'input_dim': 1,\n", |
| " 'output_dim': 1}\n", |
| " )\n", |
| "pipeline = beam.Pipeline()\n", |
| "\n", |
| "with pipeline as p:\n", |
| " (\n", |
| " p \n", |
| " | \"ReadInputData\" >> beam.Create(value_to_predict)\n", |
| " | \"ConvertNumpyToTensor\" >> beam.Map(torch.Tensor)\n", |
| " | \"RunInferenceTorch\" >> RunInference(torch_five_times_model_handler)\n", |
| " | beam.Map(print)\n", |
| " )" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "9d95e69b-203f-4abb-9abb-360bdf4d769a", |
| "metadata": { |
| "id": "9d95e69b-203f-4abb-9abb-360bdf4d769a" |
| }, |
| "source": [ |
| "# Pattern 2: Post-process RunInference results.\n", |
| "Add a `PredictionProcessor` to the pipeline after `RunInference`. `PredictionProcessor` processes the output of the `RunInference` transform." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "96f38a5a-4db0-4c39-8ce7-80d9f9911b48", |
| "metadata": { |
| "id": "96f38a5a-4db0-4c39-8ce7-80d9f9911b48", |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "outputId": "1bfa4cc6-ef01-4020-c739-df1efdc632c4" |
| }, |
| "outputs": [ |
| { |
| "output_type": "stream", |
| "name": "stdout", |
| "text": [ |
| "input is 20.0 output is 99.99430084228516\n", |
| "input is 40.0 output is 199.98887634277344\n", |
| "input is 60.0 output is 299.98345947265625\n", |
| "input is 90.0 output is 449.9753112792969\n" |
| ] |
| } |
| ], |
| "source": [ |
| "class PredictionProcessor(beam.DoFn):\n", |
| " \"\"\"\n", |
| " A processor to format the output of the RunInference transform.\n", |
| " \"\"\"\n", |
| " def process(\n", |
| " self,\n", |
| " element: PredictionResult):\n", |
| " input_value = element.example\n", |
| " output_value = element.inference\n", |
| " yield (f\"input is {input_value.item()} output is {output_value.item()}\")\n", |
| "\n", |
| "pipeline = beam.Pipeline()\n", |
| "\n", |
| "with pipeline as p:\n", |
| " (\n", |
| " p\n", |
| " | \"ReadInputData\" >> beam.Create(value_to_predict)\n", |
| " | \"ConvertNumpyToTensor\" >> beam.Map(torch.Tensor)\n", |
| " | \"RunInferenceTorch\" >> RunInference(torch_five_times_model_handler)\n", |
| " | \"PostProcessPredictions\" >> beam.ParDo(PredictionProcessor())\n", |
| " | beam.Map(print)\n", |
| " )" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "2be80463-cf79-481c-9d6a-81e500f1707b", |
| "metadata": { |
| "id": "2be80463-cf79-481c-9d6a-81e500f1707b" |
| }, |
| "source": [ |
| "# Pattern 3: Attach a key" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "f22da313-5bf8-4334-865b-bbfafc374e63", |
| "metadata": { |
| "id": "f22da313-5bf8-4334-865b-bbfafc374e63" |
| }, |
| "source": [ |
| "## Step 1 - Create a source with attached key.\n" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "746b67a7-3562-467f-bea3-d8cd18c14927", |
| "metadata": { |
| "id": "746b67a7-3562-467f-bea3-d8cd18c14927" |
| }, |
| "source": [ |
| "## Step 2 - Modify model handler and post processor.\n", |
| "* Modify the pipeline to read from sources like CSV files and BigQuery.\n", |
| "\n", |
| "In this step we:\n", |
| "\n", |
| "* Wrap the `PytorchModelHandlerTensor` object around `KeyedModelHandler` to handle keyed data.\n", |
| "* Add a map transform, which converts a table row into `Tuple[str, float]`.\n", |
| "* Add a map transform which converts `Tuple[str, float]` from to `Tuple[str, torch.Tensor]`.\n", |
| "* Modify the post inference processor to output results along with the key." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "90b263fc-97a5-43dc-8874-083d7e65e96d", |
| "metadata": { |
| "id": "90b263fc-97a5-43dc-8874-083d7e65e96d" |
| }, |
| "outputs": [], |
| "source": [ |
| "class PredictionWithKeyProcessor(beam.DoFn):\n", |
| " def __init__(self):\n", |
| " beam.DoFn.__init__(self)\n", |
| "\n", |
| " def process(\n", |
| " self,\n", |
| " element: Tuple[str, PredictionResult]):\n", |
| " key = element[0]\n", |
| " input_value = element[1].example\n", |
| " output_value = element[1].inference\n", |
| " yield (f\"key: {key}, input: {input_value.item()} output: {output_value.item()}\" )" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "c9b0fb49-d605-4f26-931a-57f42b0ad253", |
| "metadata": { |
| "id": "c9b0fb49-d605-4f26-931a-57f42b0ad253" |
| }, |
| "source": [ |
| "#### Use BigQuery as the source." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "45ce4330-7d33-4c53-8033-f4fa02383894", |
| "metadata": { |
| "id": "45ce4330-7d33-4c53-8033-f4fa02383894" |
| }, |
| "source": [ |
| "Install Google Cloud BigQuery API using `pip`." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "4eb859dd-ba54-45a1-916b-5bbe4dc3f16e", |
| "metadata": { |
| "id": "4eb859dd-ba54-45a1-916b-5bbe4dc3f16e", |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "outputId": "c01594f4-443e-434a-b61a-a38beb00f1a9" |
| }, |
| "outputs": [ |
| { |
| "output_type": "stream", |
| "name": "stdout", |
| "text": [ |
| "\u001b[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.\n", |
| "pandas-gbq 0.13.3 requires google-cloud-bigquery[bqstorage,pandas]<2.0.0dev,>=1.11.1, but you have google-cloud-bigquery 3.3.0 which is incompatible.\u001b[0m\n" |
| ] |
| } |
| ], |
| "source": [ |
| "%pip install --upgrade google-cloud-bigquery --quiet" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "6e869347-dd49-40be-b1e5-749699dc0d83", |
| "metadata": { |
| "id": "6e869347-dd49-40be-b1e5-749699dc0d83" |
| }, |
| "source": [ |
| "Create a table in the BigQuery using the snippet below, which has two columns: One holds the key and the second holds the test value. To use BiqQuery, a Google Cloud account with the BigQuery API enabled is required." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "source": [ |
| "!gcloud config set project $project" |
| ], |
| "metadata": { |
| "id": "7mgnryX-Zlfs", |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "outputId": "ebd3e9e3-9c30-4027-f571-5cd3c1951e18" |
| }, |
| "id": "7mgnryX-Zlfs", |
| "execution_count": null, |
| "outputs": [ |
| { |
| "output_type": "stream", |
| "name": "stdout", |
| "text": [ |
| "Updated property [core/project].\n" |
| ] |
| } |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "a6a984cd-2e92-4c44-821b-9bf1dd52fb7d", |
| "metadata": { |
| "id": "a6a984cd-2e92-4c44-821b-9bf1dd52fb7d", |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "outputId": "8e60b448-1384-4290-c164-cb43d876c350" |
| }, |
| "outputs": [ |
| { |
| "output_type": "execute_result", |
| "data": { |
| "text/plain": [ |
| "<google.cloud.bigquery.table._EmptyRowIterator at 0x7f47d0556a50>" |
| ] |
| }, |
| "metadata": {}, |
| "execution_count": 86 |
| } |
| ], |
| "source": [ |
| "from google.cloud import bigquery\n", |
| "\n", |
| "client = bigquery.Client(project=project)\n", |
| "\n", |
| "# Make sure the dataset_id is unique in your project.\n", |
| "dataset_id = '{project}.maths'.format(project=project)\n", |
| "dataset = bigquery.Dataset(dataset_id)\n", |
| "\n", |
| "# Modify the location based on your project configuration.\n", |
| "dataset.location = 'US'\n", |
| "dataset = client.create_dataset(dataset, exists_ok=True)\n", |
| "\n", |
| "# Table name in the BigQuery dataset.\n", |
| "table_name = 'maths_problems_1'\n", |
| "\n", |
| "query = \"\"\"\n", |
| " CREATE OR REPLACE TABLE\n", |
| " {project}.maths.{table} ( key STRING OPTIONS(description=\"A unique key for the maths problem\"),\n", |
| " value FLOAT64 OPTIONS(description=\"Our maths problem\" ) );\n", |
| " INSERT INTO maths.{table}\n", |
| " VALUES\n", |
| " (\"first_question\", 105.00),\n", |
| " (\"second_question\", 108.00),\n", |
| " (\"third_question\", 1000.00),\n", |
| " (\"fourth_question\", 1013.00)\n", |
| "\"\"\".format(project=project, table=table_name)\n", |
| "\n", |
| "create_job = client.query(query)\n", |
| "create_job.result()" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "479c9319-3295-4288-971c-dd0f0adfdd1e", |
| "metadata": { |
| "id": "479c9319-3295-4288-971c-dd0f0adfdd1e" |
| }, |
| "source": [ |
| "Use `BigQuery` as the source in the pipeline to read keyed data." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "34331897-23f5-4850-8974-67e522e956dc", |
| "metadata": { |
| "id": "34331897-23f5-4850-8974-67e522e956dc", |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "outputId": "23092c12-3370-414c-ba67-37be569cd21c" |
| }, |
| "outputs": [ |
| { |
| "output_type": "stream", |
| "name": "stdout", |
| "text": [ |
| "key: first_question, input: 105.0 output: 524.9712524414062\n", |
| "key: second_question, input: 108.0 output: 539.970458984375\n", |
| "key: third_question, input: 1000.0 output: 4999.72802734375\n", |
| "key: fourth_question, input: 1013.0 output: 5064.724609375\n" |
| ] |
| } |
| ], |
| "source": [ |
| "pipeline_options = PipelineOptions().from_dictionary({'temp_location':f'{bucket}/tmp',\n", |
| " })\n", |
| "pipeline = beam.Pipeline(options=pipeline_options)\n", |
| "\n", |
| "keyed_torch_five_times_model_handler = KeyedModelHandler(torch_five_times_model_handler)\n", |
| "\n", |
| "table_name = 'maths_problems_1'\n", |
| "table_spec = f'{project}:maths.{table_name}'\n", |
| "\n", |
| "with pipeline as p:\n", |
| " (\n", |
| " p\n", |
| " | \"ReadFromBQ\" >> beam.io.ReadFromBigQuery(table=table_spec) \n", |
| " | \"PreprocessData\" >> beam.Map(lambda x: (x['key'], x['value']))\n", |
| " | \"ConvertNumpyToTensor\" >> beam.Map(lambda x: (x[0], torch.Tensor([x[1]])))\n", |
| " | \"RunInferenceTorch\" >> RunInference(keyed_torch_five_times_model_handler)\n", |
| " | \"PostProcessPredictions\" >> beam.ParDo(PredictionWithKeyProcessor())\n", |
| " | beam.Map(print)\n", |
| " )" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "53ee7f24-5625-475a-b8cc-9c031591f304", |
| "metadata": { |
| "id": "53ee7f24-5625-475a-b8cc-9c031591f304" |
| }, |
| "source": [ |
| "### Using CSV file as the source." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "06bc4396-ee2e-4228-8548-f953b5020c4e", |
| "metadata": { |
| "id": "06bc4396-ee2e-4228-8548-f953b5020c4e" |
| }, |
| "source": [ |
| "Create a CSV file with two columns: one named `key` that holds the keys, and a second named `value` that holds the test values." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "source": [ |
| "# creates a csv file with the below values.\n", |
| "csv_values = [(\"first_question\", 105.00),\n", |
| " (\"second_question\", 108.00),\n", |
| " (\"third_question\", 1000.00),\n", |
| " (\"fourth_question\", 1013.00)]\n", |
| "input_csv_file = \"./maths_problem.csv\"\n", |
| "\n", |
| "with open(input_csv_file, 'w') as f:\n", |
| " writer = csv.writer(f)\n", |
| " writer.writerow(['key', 'value'])\n", |
| " for row in csv_values:\n", |
| " writer.writerow(row)\n", |
| "\n", |
| "assert os.path.exists(input_csv_file) == True" |
| ], |
| "metadata": { |
| "id": "exAZjP7cYAFv" |
| }, |
| "id": "exAZjP7cYAFv", |
| "execution_count": null, |
| "outputs": [] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "9a054c2d-4d84-4b37-b067-1dda5347e776", |
| "metadata": { |
| "id": "9a054c2d-4d84-4b37-b067-1dda5347e776", |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "outputId": "000c72cf-6a7f-45d8-9dec-dc9db6ce0662" |
| }, |
| "outputs": [ |
| { |
| "output_type": "stream", |
| "name": "stdout", |
| "text": [ |
| "key: first_question, input: 105.0 output: 524.9712524414062\n", |
| "key: second_question, input: 108.0 output: 539.970458984375\n", |
| "key: third_question, input: 1000.0 output: 4999.72802734375\n", |
| "key: fourth_question, input: 1013.0 output: 5064.724609375\n" |
| ] |
| } |
| ], |
| "source": [ |
| "pipeline_options = PipelineOptions().from_dictionary({'temp_location':f'{bucket}/tmp',\n", |
| " })\n", |
| "pipeline = beam.Pipeline(options=pipeline_options)\n", |
| "\n", |
| "keyed_torch_five_times_model_handler = KeyedModelHandler(torch_five_times_model_handler)\n", |
| "\n", |
| "with pipeline as p:\n", |
| " df = p | beam.dataframe.io.read_csv(input_csv_file)\n", |
| " pc = to_pcollection(df)\n", |
| " (pc\n", |
| " | \"ConvertNumpyToTensor\" >> beam.Map(lambda x: (x[0], torch.Tensor([x[1]])))\n", |
| " | \"RunInferenceTorch\" >> RunInference(keyed_torch_five_times_model_handler)\n", |
| " | \"PostProcessPredictions\" >> beam.ParDo(PredictionWithKeyProcessor())\n", |
| " | beam.Map(print)\n", |
| " )" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "742abfbb-545e-435b-8833-2410ce29d22c", |
| "metadata": { |
| "id": "742abfbb-545e-435b-8833-2410ce29d22c" |
| }, |
| "source": [ |
| "# Pattern 4: Inference with multiple models in the same pipeline.\n", |
| "\n", |
| "## Inference with multiple models in parallel. " |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "570b2f27-3beb-4295-b926-595592289c06", |
| "metadata": { |
| "id": "570b2f27-3beb-4295-b926-595592289c06" |
| }, |
| "source": [ |
| "Create a torch model handler for the 10 times model using `PytorchModelHandlerTensor`." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "73607c45-afe1-4990-9a55-e687ed40302e", |
| "metadata": { |
| "id": "73607c45-afe1-4990-9a55-e687ed40302e" |
| }, |
| "outputs": [], |
| "source": [ |
| "torch_ten_times_model_handler = PytorchModelHandlerTensor(state_dict_path=save_model_dir_multiply_ten,\n", |
| " model_class=LinearRegression,\n", |
| " model_params={'input_dim': 1,\n", |
| " 'output_dim': 1}\n", |
| " )\n", |
| "keyed_torch_ten_times_model_handler = KeyedModelHandler(torch_ten_times_model_handler)" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "70ebed52-4ead-4cae-ac96-8cf206012ce1", |
| "metadata": { |
| "id": "70ebed52-4ead-4cae-ac96-8cf206012ce1" |
| }, |
| "source": [ |
| "In this, the same data is run through two different models: the one that we’ve been using to multiply by 5 \n", |
| "and a new model, which will learn to multiply by 10." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "629d070e-9902-42c9-a1e7-56c3d1864f13", |
| "metadata": { |
| "id": "629d070e-9902-42c9-a1e7-56c3d1864f13", |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "outputId": "f798bbc7-3f45-496f-b029-3cff5599bfaa" |
| }, |
| "outputs": [ |
| { |
| "output_type": "stream", |
| "name": "stdout", |
| "text": [ |
| "key: first_question * 5, input: 105.0 output: 1046.1859130859375\n", |
| "key: second_question * 5, input: 108.0 output: 1075.8590087890625\n", |
| "key: third_question * 5, input: 1000.0 output: 9898.654296875\n", |
| "key: fourth_question * 5, input: 1013.0 output: 10027.2373046875\n", |
| "key: first_question * 10, input: 105.0 output: 1046.1859130859375\n", |
| "key: second_question * 10, input: 108.0 output: 1075.8590087890625\n", |
| "key: third_question * 10, input: 1000.0 output: 9898.654296875\n", |
| "key: fourth_question * 10, input: 1013.0 output: 10027.2373046875\n" |
| ] |
| } |
| ], |
| "source": [ |
| "pipeline_options = PipelineOptions().from_dictionary(\n", |
| " {'temp_location':f'{bucket}/tmp'})\n", |
| "\n", |
| "pipeline = beam.Pipeline(options=pipeline_options)\n", |
| "\n", |
| "read_from_bq = beam.io.ReadFromBigQuery(table=table_spec)\n", |
| "\n", |
| "with pipeline as p:\n", |
| " multiply_five = (\n", |
| " p \n", |
| " | read_from_bq\n", |
| " | \"CreateMultiplyFiveTuple\" >> beam.Map(lambda x: ('{} {}'.format(x['key'], '* 5'), x['value']))\n", |
| " | \"ConvertNumpyToTensorFiveTuple\" >> beam.Map(lambda x: (x[0], torch.Tensor([x[1]])))\n", |
| " | \"RunInferenceTorchFiveTuple\" >> RunInference(keyed_torch_ten_times_model_handler)\n", |
| " )\n", |
| " multiply_ten = (\n", |
| " p \n", |
| " | read_from_bq \n", |
| " | \"CreateMultiplyTenTuple\" >> beam.Map(lambda x: ('{} {}'.format(x['key'], '* 10'), x['value']))\n", |
| " | \"ConvertNumpyToTensorTenTuple\" >> beam.Map(lambda x: (x[0], torch.Tensor([x[1]])))\n", |
| " | \"RunInferenceTorchTenTuple\" >> RunInference(keyed_torch_ten_times_model_handler)\n", |
| " )\n", |
| "\n", |
| " inference_result = ((multiply_five, multiply_ten) | beam.Flatten() \n", |
| " | beam.ParDo(PredictionWithKeyProcessor()))\n", |
| " inference_result | beam.Map(print)" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "e71e6706-5d8d-4322-9def-ac7fb20d4a50", |
| "metadata": { |
| "id": "e71e6706-5d8d-4322-9def-ac7fb20d4a50" |
| }, |
| "source": [ |
| "## Inference with multiple models in sequence \n", |
| "\n", |
| "In a sequential pattern, data is sent to one or more models in sequence, \n", |
| "with the output from each model chaining to the next model.\n", |
| "\n", |
| "1. Read the data from BigQuery.\n", |
| "2. Map the data.\n", |
| "3. RunInference with multiply by 5 model.\n", |
| "4. Process the results.\n", |
| "5. RunInference with multiply by 10 model.\n", |
| "6. Process the results.\n" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "8db9d649-5549-4b58-a9ad-7b8592c2bcbf", |
| "metadata": { |
| "id": "8db9d649-5549-4b58-a9ad-7b8592c2bcbf", |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "outputId": "4f600937-4cb4-42dd-aa50-fa15538cc964" |
| }, |
| "outputs": [ |
| { |
| "output_type": "stream", |
| "name": "stdout", |
| "text": [ |
| "key: original input is `first_question tensor([105.])`, input: 524.9712524414062 output: 5200.13232421875\n", |
| "key: original input is `second_question tensor([108.])`, input: 539.970458984375 output: 5348.490234375\n", |
| "key: original input is `third_question tensor([1000.])`, input: 4999.72802734375 output: 49460.0703125\n", |
| "key: original input is `fourth_question tensor([1013.])`, input: 5064.724609375 output: 50102.953125\n" |
| ] |
| } |
| ], |
| "source": [ |
| "def process_interim_inference(element):\n", |
| " key, prediction_result = element\n", |
| " input_value = prediction_result.example\n", |
| " inference = prediction_result.inference\n", |
| " formatted_input_value = 'original input is `{} {}`'.format(key, input_value)\n", |
| " return formatted_input_value, inference\n", |
| "\n", |
| "\n", |
| "pipeline_options = PipelineOptions().from_dictionary(\n", |
| " {'temp_location':f'{bucket}/tmp'})\n", |
| "pipeline = beam.Pipeline(options=pipeline_options)\n", |
| "\n", |
| "with pipeline as p:\n", |
| " multiply_five = (\n", |
| " p \n", |
| " | beam.io.ReadFromBigQuery(table=table_spec) \n", |
| " | \"CreateMultiplyFiveTuple\" >> beam.Map(lambda x: (x['key'], x['value']))\n", |
| " | \"ConvertNumpyToTensorFiveTuple\" >> beam.Map(lambda x: (x[0], torch.Tensor([x[1]])))\n", |
| " | \"RunInferenceTorchFiveTuple\" >> RunInference(keyed_torch_five_times_model_handler)\n", |
| " )\n", |
| "\n", |
| " inference_result = (\n", |
| " multiply_five \n", |
| " | \"ExtractResult\" >> beam.Map(process_interim_inference) \n", |
| " | \"RunInferenceTorchTenTuple\" >> RunInference(keyed_torch_ten_times_model_handler)\n", |
| " | beam.ParDo(PredictionWithKeyProcessor())\n", |
| " )\n", |
| " inference_result | beam.Map(print)" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "32c9ba40-9396-48f4-9e7f-a2acced98bb2", |
| "metadata": { |
| "id": "32c9ba40-9396-48f4-9e7f-a2acced98bb2" |
| }, |
| "source": [ |
| "# Sklearn implementation of RunInference API.\n", |
| "\n", |
| "Here, we showcase the Sklearn implementation of the RunInference API with the unkeyed data and the keyed data.\n", |
| "\n", |
| "Sklearn is a build-dependency of Apache Beam. If a different version of sklearn needs to be installed, use `%pip install scikit-learn==<version>`" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "d6142b75-eef1-4e52-9fa4-fe02fe916b26", |
| "metadata": { |
| "id": "d6142b75-eef1-4e52-9fa4-fe02fe916b26" |
| }, |
| "outputs": [], |
| "source": [ |
| "import pickle\n", |
| "from sklearn import linear_model\n", |
| "\n", |
| "import numpy as np\n", |
| "from apache_beam.ml.inference.sklearn_inference import ModelFileType\n", |
| "from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "6695cd22-e0bf-438f-8223-4a93392e6616", |
| "metadata": { |
| "id": "6695cd22-e0bf-438f-8223-4a93392e6616" |
| }, |
| "source": [ |
| "## Create the data and the Sklearn model.\n", |
| "In this cell, we perform:\n", |
| "1. Create the data to train the Sklearn linear regression model.\n", |
| "2. Train the linear regression model.\n", |
| "3. Save the Sklearn model using `pickle`." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "c57843e8-f696-4196-ad39-827e34849976", |
| "metadata": { |
| "id": "c57843e8-f696-4196-ad39-827e34849976" |
| }, |
| "outputs": [], |
| "source": [ |
| "# Input data to train the sklearn model.\n", |
| "x = numpy.arange(0, 100, dtype=numpy.float32).reshape(-1, 1)\n", |
| "y = (x * 5).reshape(-1, 1)\n", |
| "\n", |
| "regression = linear_model.LinearRegression()\n", |
| "regression.fit(x,y)\n", |
| "\n", |
| "sklearn_model_filename = 'sklearn_5x_model.pkl'\n", |
| "with open(sklearn_model_filename, 'wb') as f:\n", |
| " pickle.dump(regression, f)" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "69008a3d-3d15-4643-828c-b0419b347d01", |
| "metadata": { |
| "id": "69008a3d-3d15-4643-828c-b0419b347d01" |
| }, |
| "source": [ |
| "### Scikit-learn RunInference pipeline.\n", |
| "\n", |
| "1. Define the Sklearn model handler that accepts array_like object as input.\n", |
| "2. Read the data from BigQuery.\n", |
| "3. Use the Sklearn trained model and the Sklearn RunInference transform on unkeyed data." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "50a648a3-794a-4286-ab2b-fc0458db04ca", |
| "metadata": { |
| "id": "50a648a3-794a-4286-ab2b-fc0458db04ca", |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "outputId": "b305d977-6549-4c01-a402-c4e14f3f2b04" |
| }, |
| "outputs": [ |
| { |
| "output_type": "stream", |
| "name": "stdout", |
| "text": [ |
| "PredictionResult(example=[105.0], inference=array([525.]))\n", |
| "PredictionResult(example=[108.0], inference=array([540.]))\n", |
| "PredictionResult(example=[1000.0], inference=array([5000.]))\n", |
| "PredictionResult(example=[1013.0], inference=array([5065.]))\n" |
| ] |
| } |
| ], |
| "source": [ |
| "# SklearnModelHandlerNumpy accepts only unkeyed examples.\n", |
| "sklearn_model_handler = SklearnModelHandlerNumpy(model_uri=sklearn_model_filename,\n", |
| " model_file_type=ModelFileType.PICKLE) # Use ModelFileType.JOBLIB if the model is seriazlized using joblib.\n", |
| "\n", |
| "\n", |
| "pipeline_options = PipelineOptions().from_dictionary(\n", |
| " {'temp_location':f'{bucket}/tmp'})\n", |
| "pipeline = beam.Pipeline(options=pipeline_options)\n", |
| "\n", |
| "with pipeline as p:\n", |
| " (\n", |
| " p \n", |
| " | \"ReadFromBQ\" >> beam.io.ReadFromBigQuery(table=table_spec)\n", |
| " | \"ExtractInputs\" >> beam.Map(lambda x: [x['value']]) \n", |
| " | \"RunInferenceSklearn\" >> RunInference(model_handler=sklearn_model_handler)\n", |
| " | beam.Map(print)\n", |
| " )" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "33e901d6-ed06-4268-8a5f-685d31b5558f", |
| "metadata": { |
| "id": "33e901d6-ed06-4268-8a5f-685d31b5558f" |
| }, |
| "source": [ |
| "### Sklearn RunInference on keyed inputs.\n", |
| "1. Wrap the `SklearnModelHandlerNumpy` object around `KeyedModelHandler` to handle keyed data.\n", |
| "2. Read the data from BigQuery.\n", |
| "3. Use the Sklearn trained model and the Sklearn RunInference transform on a keyed data." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "c212916d-b517-4589-ad15-a3a1df926fb3", |
| "metadata": { |
| "id": "c212916d-b517-4589-ad15-a3a1df926fb3", |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "outputId": "1c3ccf35-3cd7-401e-de23-c0e22b5f6ebd" |
| }, |
| "outputs": [ |
| { |
| "output_type": "stream", |
| "name": "stdout", |
| "text": [ |
| "('first_question', PredictionResult(example=[105.0], inference=array([525.])))\n", |
| "('second_question', PredictionResult(example=[108.0], inference=array([540.])))\n", |
| "('third_question', PredictionResult(example=[1000.0], inference=array([5000.])))\n", |
| "('fourth_question', PredictionResult(example=[1013.0], inference=array([5065.])))\n" |
| ] |
| } |
| ], |
| "source": [ |
| "sklearn_model_handler = SklearnModelHandlerNumpy(model_uri=sklearn_model_filename,\n", |
| " model_file_type=ModelFileType.PICKLE) # Use ModelFileType.JOBLIB if the model is serialized using joblib.\n", |
| "\n", |
| "keyed_sklearn_model_handler = KeyedModelHandler(sklearn_model_handler)\n", |
| "\n", |
| "pipeline_options = PipelineOptions().from_dictionary(\n", |
| " {'temp_location':f'{bucket}/tmp'})\n", |
| "pipeline = beam.Pipeline(options=pipeline_options)\n", |
| "\n", |
| "with pipeline as p:\n", |
| " (\n", |
| " p \n", |
| " | \"ReadFromBQ\" >> beam.io.ReadFromBigQuery(table=table_spec)\n", |
| " | \"ExtractInputs\" >> beam.Map(lambda x: (x['key'], [x['value']])) \n", |
| " | \"RunInferenceSklearn\" >> RunInference(model_handler=keyed_sklearn_model_handler)\n", |
| " | beam.Map(print)\n", |
| " )" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "f1481883-423b-4da0-8ae0-1a602b1807c6", |
| "metadata": { |
| "id": "f1481883-423b-4da0-8ae0-1a602b1807c6" |
| }, |
| "source": [ |
| "# Cross framework transforms in a single pipeline\n", |
| "\n", |
| "In this pipeline, RunInference transforms of different frameworks are used in a single pipeline sequentially. \n", |
| "\n", |
| "In the below cells, we perform the following actions:\n", |
| "1. Create `KeyedModelHandler` for Sklearn and Pytorch. \n", |
| "2. Run inference on Sklearn and perform intermediate processing using `process_interim_inference`.\n", |
| "3. Take the intermediate result from Sklearn RunInference transform and run that through Pytorch RunInference transform." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "a45d496c-4d7b-4173-b27b-253c5767bb8d", |
| "metadata": { |
| "id": "a45d496c-4d7b-4173-b27b-253c5767bb8d" |
| }, |
| "outputs": [], |
| "source": [ |
| "def process_interim_inference(element: Tuple[str, PredictionResult]):\n", |
| " \"\"\"\n", |
| " Returns the key and the prediction to the next RunInference transform.\n", |
| " \"\"\"\n", |
| " key, prediction_result = element\n", |
| " prediction = prediction_result.inference\n", |
| " return key, prediction\n", |
| "\n", |
| "class PredictionProcessor(beam.DoFn):\n", |
| " def process(self, element: Tuple[str, PredictionResult]):\n", |
| " key, prediction_result = element\n", |
| " input_from_upstream = prediction_result.example\n", |
| " prediction = prediction_result.inference\n", |
| " yield (key, prediction.item())" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "ada71e7d-cf29-4441-a921-310c05fa8576", |
| "metadata": { |
| "id": "ada71e7d-cf29-4441-a921-310c05fa8576", |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "outputId": "78eb9a0d-ace2-4c02-8970-13488dc2767c" |
| }, |
| "outputs": [ |
| { |
| "output_type": "stream", |
| "name": "stdout", |
| "text": [ |
| "('first_question', 2624.857421875)\n", |
| "('second_question', 2699.853271484375)\n", |
| "('third_question', 24998.642578125)\n", |
| "('fourth_question', 25323.625)\n" |
| ] |
| } |
| ], |
| "source": [ |
| "pipeline_options = PipelineOptions().from_dictionary(\n", |
| " {'temp_location':f'{bucket}/tmp'})\n", |
| "pipeline = beam.Pipeline(options=pipeline_options)\n", |
| "\n", |
| "read_from_bq = beam.io.ReadFromBigQuery(table=table_spec)\n", |
| "keyed_inputs = \"ExtractKeyedInputs\" >> beam.Map(lambda x: (x['key'], [x['value']]))\n", |
| "\n", |
| "keyed_sklearn_model_handler = KeyedModelHandler(SklearnModelHandlerNumpy(\n", |
| " model_uri=sklearn_model_filename,\n", |
| " model_file_type=ModelFileType.PICKLE))\n", |
| "\n", |
| "keyed_torch_model_handler = KeyedModelHandler(PytorchModelHandlerTensor(\n", |
| " state_dict_path=save_model_dir_multiply_ten,\n", |
| " model_class=LinearRegression,\n", |
| " model_params={'input_dim': 1,\n", |
| " 'output_dim': 1}))\n", |
| "\n", |
| "with pipeline as p:\n", |
| " sklearn_inference_result = (\n", |
| " p\n", |
| " | read_from_bq\n", |
| " | keyed_inputs\n", |
| " | \"RunInferenceSklearn\" >> RunInference(model_handler=keyed_sklearn_model_handler)\n", |
| " | \"ExtractOutputs\" >> beam.Map(process_interim_inference)\n", |
| " )\n", |
| "\n", |
| " torch_inference_result = (\n", |
| " sklearn_inference_result\n", |
| " | \"ConvertNumpyToTensorFiveTuple\" >> beam.Map(lambda x: (x[0], torch.Tensor([x[1]])))\n", |
| " | \"RunInferenceTorchFiveTuple\" >> RunInference(keyed_torch_five_times_model_handler)\n", |
| " | \"ProcessPredictions\" >> beam.ParDo(PredictionProcessor())\n", |
| " | beam.Map(print)\n", |
| " )\n" |
| ] |
| } |
| ] |
| } |