blob: 91f89992f524f1aef80ea11fdba4aa695cdb7c47 [file] [log] [blame]
{
"nbformat": 4,
"nbformat_minor": 2,
"metadata": {
"colab": {
"name": "Beam RunInference",
"provenance": [],
"collapsed_sections": [],
"toc_visible": true
},
"kernelspec": {
"name": "python3",
"display_name": "Python 3"
},
"language_info": {
"name": "python"
}
},
"cells": [
{
"cell_type": "code",
"source": [
"#@title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n",
"\n",
"# Licensed to the Apache Software Foundation (ASF) under one\n",
"# or more contributor license agreements. See the NOTICE file\n",
"# distributed with this work for additional information\n",
"# regarding copyright ownership. The ASF licenses this file\n",
"# to you under the Apache License, Version 2.0 (the\n",
"# \"License\"); you may not use this file except in compliance\n",
"# with the License. You may obtain a copy of the License at\n",
"#\n",
"# http://www.apache.org/licenses/LICENSE-2.0\n",
"#\n",
"# Unless required by applicable law or agreed to in writing,\n",
"# software distributed under the License is distributed on an\n",
"# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
"# KIND, either express or implied. See the License for the\n",
"# specific language governing permissions and limitations\n",
"# under the License."
],
"metadata": {
"id": "C1rAsD2L-hSO",
"cellView": "form"
},
"id": "C1rAsD2L-hSO",
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"id": "b6f8f3af-744e-4eaa-8a30-6d03e8e4d21e",
"metadata": {
"id": "b6f8f3af-744e-4eaa-8a30-6d03e8e4d21e"
},
"source": [
"# RunInference\n",
"\n",
"<button>\n",
" <a href=\"https://beam.apache.org/documentation/sdks/python-machine-learning/\">\n",
" <img src=\"https://beam.apache.org/images/favicon.ico\" alt=\"Open the docs\" height=\"16\"/>\n",
" Beam RunInference\n",
" </a>\n",
"</button>\n",
"\n",
"In this notebook, we walk through the use of the RunInference transform.\n",
"The transform and its accompanying [ModelHandler](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.ModelHandler) classes handle the following tasks:\n",
"\n",
"\n",
"* Optimizing loading models from popular frameworks.\n",
"* Batching examples in a scalable fashion.\n",
"\n",
"\n",
"This notebook illustrates common RunInference patterns such as the following:\n",
"* Generating predictions using both Pytorch and Scikit-learn.\n",
"* Post processing results after RunInference.\n",
"* Inference with multiple models in the same pipeline.\n",
"\n",
"The linear regression models used in these samples are trained on data that correspondes to the 5 and 10 times table; that is,`y = 5x` and `y = 10x` respectively."
]
},
{
"cell_type": "markdown",
"id": "299af9bb-b2fc-405c-96e7-ee0a6ae24bdd",
"metadata": {
"id": "299af9bb-b2fc-405c-96e7-ee0a6ae24bdd"
},
"source": [
"### Dependencies\n",
"\n",
"The RunInference library is available in Apache Beam version <b>2.40</b> or later.\n",
"\n",
"Pytorch module is needed to use Pytorch RunInference API. use `pip` to install Pytorch."
]
},
{
"cell_type": "code",
"source": [
"# issue: https://github.com/apache/beam/issues/22218. Becuase of the updates to the Google cloud APIs, Beam SDK from 2.34.0 till 2.40.0 has some dependency conflicts. See the issue for more details.\n",
"# Workaround to install the apache beam without getting stuck for long time. Runtime might need to restart after this step.\n",
"!pip install google-api-core==1.31.6 --quiet\n",
"!pip install google-cloud-pubsub==2.13.1 google-cloud-bigquery-storage==2.13.2 --quiet\n",
"!pip install apache-beam[gcp,dataframe] --quiet"
],
"metadata": {
"id": "loxD-rOVchRn",
"colab": {
"base_uri": "https://localhost:8080/"
},
"outputId": "661baa2d-6e0f-4478-b7c1-db911593d5ff"
},
"id": "loxD-rOVchRn",
"execution_count": null,
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"\u001b[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.\n",
"pandas-gbq 0.13.3 requires google-cloud-bigquery[bqstorage,pandas]<2.0.0dev,>=1.11.1, but you have google-cloud-bigquery 2.34.4 which is incompatible.\u001b[0m\n"
]
}
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "7f841596-f217-46d2-b64e-1952db4de4cb",
"metadata": {
"id": "7f841596-f217-46d2-b64e-1952db4de4cb",
"colab": {
"base_uri": "https://localhost:8080/"
},
"outputId": "da04ccb9-0801-47f6-ec9e-e87f0ca4569f"
},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/\n",
"Requirement already satisfied: torch in /usr/local/lib/python3.7/dist-packages (1.12.0+cu113)\n",
"Requirement already satisfied: typing-extensions in /usr/local/lib/python3.7/dist-packages (from torch) (4.1.1)\n"
]
}
],
"source": [
"%pip install torch"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9a92e3a7-beb6-46ae-a5b0-53c15487de38",
"metadata": {
"id": "9a92e3a7-beb6-46ae-a5b0-53c15487de38"
},
"outputs": [],
"source": [
"import argparse\n",
"import csv\n",
"import json\n",
"import os\n",
"import torch\n",
"from typing import Tuple\n",
"\n",
"import apache_beam as beam\n",
"import numpy\n",
"from apache_beam.io.gcp.bigquery import ReadFromBigQuery\n",
"from apache_beam.ml.inference.base import KeyedModelHandler\n",
"from apache_beam.ml.inference.base import PredictionResult\n",
"from apache_beam.ml.inference.base import RunInference\n",
"from apache_beam.dataframe.convert import to_pcollection\n",
"from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor\n",
"from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerKeyedTensor\n",
"from apache_beam.options.pipeline_options import PipelineOptions\n",
"\n",
"import warnings\n",
"warnings.filterwarnings('ignore')"
]
},
{
"cell_type": "code",
"source": [
"from google.colab import auth\n",
"auth.authenticate_user()"
],
"metadata": {
"id": "V0E35R5Ka2cE"
},
"id": "V0E35R5Ka2cE",
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "248458a6-cfd8-474d-ad0e-f37f7ae981ae",
"metadata": {
"id": "248458a6-cfd8-474d-ad0e-f37f7ae981ae"
},
"outputs": [],
"source": [
"# Constants\n",
"project = \"<your-project>\"\n",
"bucket = \"<your-bucket>\"\n",
"\n",
"# set the project to avoid warnings.\n",
"os.environ['GOOGLE_CLOUD_PROJECT'] = project\n",
"\n",
"save_model_dir_multiply_five = 'five_times_table_torch.pt'\n",
"save_model_dir_multiply_ten = 'ten_times_table_torch.pt'"
]
},
{
"cell_type": "markdown",
"id": "b2b7cedc-79f5-4599-8178-e5da35dba032",
"metadata": {
"tags": [],
"id": "b2b7cedc-79f5-4599-8178-e5da35dba032"
},
"source": [
"## Create data and Pytorch models for RunInference transform"
]
},
{
"cell_type": "markdown",
"id": "202e5a3e-4ccd-4ae3-9852-e47de0721839",
"metadata": {
"id": "202e5a3e-4ccd-4ae3-9852-e47de0721839"
},
"source": [
"### Linear regression model in Pytorch."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "68bf8bf0-f735-45ee-8ebb-a2d8bb8a6bc7",
"metadata": {
"id": "68bf8bf0-f735-45ee-8ebb-a2d8bb8a6bc7"
},
"outputs": [],
"source": [
"class LinearRegression(torch.nn.Module):\n",
" def __init__(self, input_dim=1, output_dim=1):\n",
" super().__init__()\n",
" self.linear = torch.nn.Linear(input_dim, output_dim) \n",
" def forward(self, x):\n",
" out = self.linear(x)\n",
" return out"
]
},
{
"cell_type": "markdown",
"id": "1918435c-0029-4eb6-8eee-bda5470eb2ff",
"metadata": {
"id": "1918435c-0029-4eb6-8eee-bda5470eb2ff"
},
"source": [
"### Prepare train and test data to train a 5 times model.\n",
"* `x` contains values in the range from 0 to 99.\n",
"* `y` is a list of 5 * `x`. \n",
"* `value_to_predict` includes values outside of the training data."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9302917f-6200-4af4-a410-4bd6f2a070b8",
"metadata": {
"id": "9302917f-6200-4af4-a410-4bd6f2a070b8"
},
"outputs": [],
"source": [
"x = numpy.arange(0, 100, dtype=numpy.float32).reshape(-1, 1)\n",
"y = (x * 5).reshape(-1, 1)\n",
"value_to_predict = numpy.array([20, 40, 60, 90], dtype=numpy.float32).reshape(-1, 1)"
]
},
{
"cell_type": "markdown",
"id": "9dc22aec-08c3-43ab-a5ce-451cb63c485a",
"metadata": {
"id": "9dc22aec-08c3-43ab-a5ce-451cb63c485a"
},
"source": [
"### Train the linear regression mode on 5 times data."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "0a8b7924-ff06-4584-8f41-079268387a67",
"metadata": {
"id": "0a8b7924-ff06-4584-8f41-079268387a67"
},
"outputs": [],
"source": [
"five_times_model = LinearRegression()\n",
"optimizer = torch.optim.Adam(five_times_model.parameters())\n",
"loss_fn = torch.nn.L1Loss()\n",
"\n",
"\"\"\"\n",
"Train the five_times_model\n",
"\"\"\"\n",
"epochs = 10000\n",
"tensor_x = torch.from_numpy(x)\n",
"tensor_y = torch.from_numpy(y)\n",
"for epoch in range(epochs):\n",
" y_pred = five_times_model(tensor_x)\n",
" loss = loss_fn(y_pred, tensor_y)\n",
" five_times_model.zero_grad()\n",
" loss.backward()\n",
" optimizer.step()"
]
},
{
"cell_type": "markdown",
"id": "bd106b29-6187-42c1-9743-1666c147b5e3",
"metadata": {
"id": "bd106b29-6187-42c1-9743-1666c147b5e3"
},
"source": [
"Save the model using `torch.save()` and verify if the saved model file exists."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "882bbada-4f6d-4370-a047-c5961e564ee8",
"metadata": {
"id": "882bbada-4f6d-4370-a047-c5961e564ee8",
"colab": {
"base_uri": "https://localhost:8080/"
},
"outputId": "3002ed41-dbd5-4a87-d2d1-d1c7908be2f2"
},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"True\n"
]
}
],
"source": [
"torch.save(five_times_model.state_dict(), save_model_dir_multiply_five)\n",
"print(os.path.exists(save_model_dir_multiply_five)) # verify if the model is saved"
]
},
{
"cell_type": "markdown",
"id": "fa84cfca-83c6-4a91-aea1-3dd034c42ae0",
"metadata": {
"id": "fa84cfca-83c6-4a91-aea1-3dd034c42ae0"
},
"source": [
"### Prepare train and test data to train a 10 times model.\n",
"* `x` contains values in the range from 0 to 99.\n",
"* `y` is a list of 10 * `x`. "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "27e0d1f6-2c3e-4418-8fb0-b5b89ffa66d5",
"metadata": {
"id": "27e0d1f6-2c3e-4418-8fb0-b5b89ffa66d5"
},
"outputs": [],
"source": [
"x = numpy.arange(0, 100, dtype=numpy.float32).reshape(-1, 1)\n",
"y = (x * 10).reshape(-1, 1)"
]
},
{
"cell_type": "markdown",
"id": "24d946dc-4fe0-4030-8f6a-aa8d27fd353d",
"metadata": {
"id": "24d946dc-4fe0-4030-8f6a-aa8d27fd353d"
},
"source": [
"### Train the linear regression model on 10 times data."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2b352313-b791-48fd-9b9d-b54233176416",
"metadata": {
"id": "2b352313-b791-48fd-9b9d-b54233176416"
},
"outputs": [],
"source": [
"ten_times_model = LinearRegression()\n",
"optimizer = torch.optim.Adam(ten_times_model.parameters())\n",
"loss_fn = torch.nn.L1Loss()\n",
"\n",
"epochs = 10000\n",
"tensor_x = torch.from_numpy(x)\n",
"tensor_y = torch.from_numpy(y)\n",
"for epoch in range(epochs):\n",
" y_pred = ten_times_model(tensor_x)\n",
" loss = loss_fn(y_pred, tensor_y)\n",
" ten_times_model.zero_grad()\n",
" loss.backward()\n",
" optimizer.step()"
]
},
{
"cell_type": "markdown",
"id": "6f959e3b-230b-45e2-9df3-dd1f11acacd7",
"metadata": {
"id": "6f959e3b-230b-45e2-9df3-dd1f11acacd7"
},
"source": [
"Save the model using `torch.save()`"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "42b2ca0f-5d44-4d15-a313-f3d56ae7f675",
"metadata": {
"id": "42b2ca0f-5d44-4d15-a313-f3d56ae7f675",
"colab": {
"base_uri": "https://localhost:8080/"
},
"outputId": "ed9f51c1-8dfe-44bc-c861-28d660ad3799"
},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"True\n"
]
}
],
"source": [
"torch.save(ten_times_model.state_dict(), save_model_dir_multiply_ten)\n",
"print(os.path.exists(save_model_dir_multiply_ten)) # verify if the model is saved"
]
},
{
"cell_type": "markdown",
"id": "2e20efc4-13e8-46e2-9848-c0347deaa5af",
"metadata": {
"id": "2e20efc4-13e8-46e2-9848-c0347deaa5af"
},
"source": [
"# Pattern 1: RunInference for predictions."
]
},
{
"cell_type": "markdown",
"id": "1099fe94-d4cf-422e-a0d3-0cfba8af64d5",
"metadata": {
"id": "1099fe94-d4cf-422e-a0d3-0cfba8af64d5"
},
"source": [
"### Step 1 - Use RunInference within the pipeline.\n",
"\n",
"1. Create pytorch model handler object by passing required arguments such as `state_dict_path`, `model_class`, `model_params` to the `PytorchModelHandlerTensor` class.\n",
"2. Pass the `PytorchModelHandlerTensor` object to the RunInference transform to peform prediction on unkeyed data."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e488a821-3b70-4284-96f3-ddee4dcb9d71",
"metadata": {
"id": "e488a821-3b70-4284-96f3-ddee4dcb9d71",
"colab": {
"base_uri": "https://localhost:8080/"
},
"outputId": "6f4e4136-aa6c-4fd4-8be6-2ed8d7ca4545"
},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"PredictionResult(example=tensor([20.]), inference=tensor([99.9943], grad_fn=<UnbindBackward0>))\n",
"PredictionResult(example=tensor([40.]), inference=tensor([199.9889], grad_fn=<UnbindBackward0>))\n",
"PredictionResult(example=tensor([60.]), inference=tensor([299.9835], grad_fn=<UnbindBackward0>))\n",
"PredictionResult(example=tensor([90.]), inference=tensor([449.9753], grad_fn=<UnbindBackward0>))\n"
]
}
],
"source": [
"torch_five_times_model_handler = PytorchModelHandlerTensor(\n",
" state_dict_path=save_model_dir_multiply_five,\n",
" model_class=LinearRegression,\n",
" model_params={'input_dim': 1,\n",
" 'output_dim': 1}\n",
" )\n",
"pipeline = beam.Pipeline()\n",
"\n",
"with pipeline as p:\n",
" (\n",
" p \n",
" | \"ReadInputData\" >> beam.Create(value_to_predict)\n",
" | \"ConvertNumpyToTensor\" >> beam.Map(torch.Tensor)\n",
" | \"RunInferenceTorch\" >> RunInference(torch_five_times_model_handler)\n",
" | beam.Map(print)\n",
" )"
]
},
{
"cell_type": "markdown",
"id": "9d95e69b-203f-4abb-9abb-360bdf4d769a",
"metadata": {
"id": "9d95e69b-203f-4abb-9abb-360bdf4d769a"
},
"source": [
"# Pattern 2: Post-process RunInference results.\n",
"Add a `PredictionProcessor` to the pipeline after `RunInference`. `PredictionProcessor` processes the output of the `RunInference` transform."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "96f38a5a-4db0-4c39-8ce7-80d9f9911b48",
"metadata": {
"id": "96f38a5a-4db0-4c39-8ce7-80d9f9911b48",
"colab": {
"base_uri": "https://localhost:8080/"
},
"outputId": "1bfa4cc6-ef01-4020-c739-df1efdc632c4"
},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"input is 20.0 output is 99.99430084228516\n",
"input is 40.0 output is 199.98887634277344\n",
"input is 60.0 output is 299.98345947265625\n",
"input is 90.0 output is 449.9753112792969\n"
]
}
],
"source": [
"class PredictionProcessor(beam.DoFn):\n",
" \"\"\"\n",
" A processor to format the output of the RunInference transform.\n",
" \"\"\"\n",
" def process(\n",
" self,\n",
" element: PredictionResult):\n",
" input_value = element.example\n",
" output_value = element.inference\n",
" yield (f\"input is {input_value.item()} output is {output_value.item()}\")\n",
"\n",
"pipeline = beam.Pipeline()\n",
"\n",
"with pipeline as p:\n",
" (\n",
" p\n",
" | \"ReadInputData\" >> beam.Create(value_to_predict)\n",
" | \"ConvertNumpyToTensor\" >> beam.Map(torch.Tensor)\n",
" | \"RunInferenceTorch\" >> RunInference(torch_five_times_model_handler)\n",
" | \"PostProcessPredictions\" >> beam.ParDo(PredictionProcessor())\n",
" | beam.Map(print)\n",
" )"
]
},
{
"cell_type": "markdown",
"id": "2be80463-cf79-481c-9d6a-81e500f1707b",
"metadata": {
"id": "2be80463-cf79-481c-9d6a-81e500f1707b"
},
"source": [
"# Pattern 3: Attach a key"
]
},
{
"cell_type": "markdown",
"id": "f22da313-5bf8-4334-865b-bbfafc374e63",
"metadata": {
"id": "f22da313-5bf8-4334-865b-bbfafc374e63"
},
"source": [
"## Step 1 - Create a source with attached key.\n"
]
},
{
"cell_type": "markdown",
"id": "746b67a7-3562-467f-bea3-d8cd18c14927",
"metadata": {
"id": "746b67a7-3562-467f-bea3-d8cd18c14927"
},
"source": [
"## Step 2 - Modify model handler and post processor.\n",
"* Modify the pipeline to read from sources like CSV files and BigQuery.\n",
"\n",
"In this step we:\n",
"\n",
"* Wrap the `PytorchModelHandlerTensor` object around `KeyedModelHandler` to handle keyed data.\n",
"* Add a map transform, which converts a table row into `Tuple[str, float]`.\n",
"* Add a map transform which converts `Tuple[str, float]` from to `Tuple[str, torch.Tensor]`.\n",
"* Modify the post inference processor to output results along with the key."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "90b263fc-97a5-43dc-8874-083d7e65e96d",
"metadata": {
"id": "90b263fc-97a5-43dc-8874-083d7e65e96d"
},
"outputs": [],
"source": [
"class PredictionWithKeyProcessor(beam.DoFn):\n",
" def __init__(self):\n",
" beam.DoFn.__init__(self)\n",
"\n",
" def process(\n",
" self,\n",
" element: Tuple[str, PredictionResult]):\n",
" key = element[0]\n",
" input_value = element[1].example\n",
" output_value = element[1].inference\n",
" yield (f\"key: {key}, input: {input_value.item()} output: {output_value.item()}\" )"
]
},
{
"cell_type": "markdown",
"id": "c9b0fb49-d605-4f26-931a-57f42b0ad253",
"metadata": {
"id": "c9b0fb49-d605-4f26-931a-57f42b0ad253"
},
"source": [
"#### Use BigQuery as the source."
]
},
{
"cell_type": "markdown",
"id": "45ce4330-7d33-4c53-8033-f4fa02383894",
"metadata": {
"id": "45ce4330-7d33-4c53-8033-f4fa02383894"
},
"source": [
"Install Google Cloud BigQuery API using `pip`."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4eb859dd-ba54-45a1-916b-5bbe4dc3f16e",
"metadata": {
"id": "4eb859dd-ba54-45a1-916b-5bbe4dc3f16e",
"colab": {
"base_uri": "https://localhost:8080/"
},
"outputId": "c01594f4-443e-434a-b61a-a38beb00f1a9"
},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"\u001b[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.\n",
"pandas-gbq 0.13.3 requires google-cloud-bigquery[bqstorage,pandas]<2.0.0dev,>=1.11.1, but you have google-cloud-bigquery 3.3.0 which is incompatible.\u001b[0m\n"
]
}
],
"source": [
"%pip install --upgrade google-cloud-bigquery --quiet"
]
},
{
"cell_type": "markdown",
"id": "6e869347-dd49-40be-b1e5-749699dc0d83",
"metadata": {
"id": "6e869347-dd49-40be-b1e5-749699dc0d83"
},
"source": [
"Create a table in the BigQuery using the snippet below, which has two columns: One holds the key and the second holds the test value. To use BiqQuery, a Google Cloud account with the BigQuery API enabled is required."
]
},
{
"cell_type": "code",
"source": [
"!gcloud config set project $project"
],
"metadata": {
"id": "7mgnryX-Zlfs",
"colab": {
"base_uri": "https://localhost:8080/"
},
"outputId": "ebd3e9e3-9c30-4027-f571-5cd3c1951e18"
},
"id": "7mgnryX-Zlfs",
"execution_count": null,
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"Updated property [core/project].\n"
]
}
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a6a984cd-2e92-4c44-821b-9bf1dd52fb7d",
"metadata": {
"id": "a6a984cd-2e92-4c44-821b-9bf1dd52fb7d",
"colab": {
"base_uri": "https://localhost:8080/"
},
"outputId": "8e60b448-1384-4290-c164-cb43d876c350"
},
"outputs": [
{
"output_type": "execute_result",
"data": {
"text/plain": [
"<google.cloud.bigquery.table._EmptyRowIterator at 0x7f47d0556a50>"
]
},
"metadata": {},
"execution_count": 86
}
],
"source": [
"from google.cloud import bigquery\n",
"\n",
"client = bigquery.Client(project=project)\n",
"\n",
"# Make sure the dataset_id is unique in your project.\n",
"dataset_id = '{project}.maths'.format(project=project)\n",
"dataset = bigquery.Dataset(dataset_id)\n",
"\n",
"# Modify the location based on your project configuration.\n",
"dataset.location = 'US'\n",
"dataset = client.create_dataset(dataset, exists_ok=True)\n",
"\n",
"# Table name in the BigQuery dataset.\n",
"table_name = 'maths_problems_1'\n",
"\n",
"query = \"\"\"\n",
" CREATE OR REPLACE TABLE\n",
" {project}.maths.{table} ( key STRING OPTIONS(description=\"A unique key for the maths problem\"),\n",
" value FLOAT64 OPTIONS(description=\"Our maths problem\" ) );\n",
" INSERT INTO maths.{table}\n",
" VALUES\n",
" (\"first_question\", 105.00),\n",
" (\"second_question\", 108.00),\n",
" (\"third_question\", 1000.00),\n",
" (\"fourth_question\", 1013.00)\n",
"\"\"\".format(project=project, table=table_name)\n",
"\n",
"create_job = client.query(query)\n",
"create_job.result()"
]
},
{
"cell_type": "markdown",
"id": "479c9319-3295-4288-971c-dd0f0adfdd1e",
"metadata": {
"id": "479c9319-3295-4288-971c-dd0f0adfdd1e"
},
"source": [
"Use `BigQuery` as the source in the pipeline to read keyed data."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "34331897-23f5-4850-8974-67e522e956dc",
"metadata": {
"id": "34331897-23f5-4850-8974-67e522e956dc",
"colab": {
"base_uri": "https://localhost:8080/"
},
"outputId": "23092c12-3370-414c-ba67-37be569cd21c"
},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"key: first_question, input: 105.0 output: 524.9712524414062\n",
"key: second_question, input: 108.0 output: 539.970458984375\n",
"key: third_question, input: 1000.0 output: 4999.72802734375\n",
"key: fourth_question, input: 1013.0 output: 5064.724609375\n"
]
}
],
"source": [
"pipeline_options = PipelineOptions().from_dictionary({'temp_location':f'{bucket}/tmp',\n",
" })\n",
"pipeline = beam.Pipeline(options=pipeline_options)\n",
"\n",
"keyed_torch_five_times_model_handler = KeyedModelHandler(torch_five_times_model_handler)\n",
"\n",
"table_name = 'maths_problems_1'\n",
"table_spec = f'{project}:maths.{table_name}'\n",
"\n",
"with pipeline as p:\n",
" (\n",
" p\n",
" | \"ReadFromBQ\" >> beam.io.ReadFromBigQuery(table=table_spec) \n",
" | \"PreprocessData\" >> beam.Map(lambda x: (x['key'], x['value']))\n",
" | \"ConvertNumpyToTensor\" >> beam.Map(lambda x: (x[0], torch.Tensor([x[1]])))\n",
" | \"RunInferenceTorch\" >> RunInference(keyed_torch_five_times_model_handler)\n",
" | \"PostProcessPredictions\" >> beam.ParDo(PredictionWithKeyProcessor())\n",
" | beam.Map(print)\n",
" )"
]
},
{
"cell_type": "markdown",
"id": "53ee7f24-5625-475a-b8cc-9c031591f304",
"metadata": {
"id": "53ee7f24-5625-475a-b8cc-9c031591f304"
},
"source": [
"### Using CSV file as the source."
]
},
{
"cell_type": "markdown",
"id": "06bc4396-ee2e-4228-8548-f953b5020c4e",
"metadata": {
"id": "06bc4396-ee2e-4228-8548-f953b5020c4e"
},
"source": [
"Create a CSV file with two columns: one named `key` that holds the keys, and a second named `value` that holds the test values."
]
},
{
"cell_type": "code",
"source": [
"# creates a csv file with the below values.\n",
"csv_values = [(\"first_question\", 105.00),\n",
" (\"second_question\", 108.00),\n",
" (\"third_question\", 1000.00),\n",
" (\"fourth_question\", 1013.00)]\n",
"input_csv_file = \"./maths_problem.csv\"\n",
"\n",
"with open(input_csv_file, 'w') as f:\n",
" writer = csv.writer(f)\n",
" writer.writerow(['key', 'value'])\n",
" for row in csv_values:\n",
" writer.writerow(row)\n",
"\n",
"assert os.path.exists(input_csv_file) == True"
],
"metadata": {
"id": "exAZjP7cYAFv"
},
"id": "exAZjP7cYAFv",
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "9a054c2d-4d84-4b37-b067-1dda5347e776",
"metadata": {
"id": "9a054c2d-4d84-4b37-b067-1dda5347e776",
"colab": {
"base_uri": "https://localhost:8080/"
},
"outputId": "000c72cf-6a7f-45d8-9dec-dc9db6ce0662"
},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"key: first_question, input: 105.0 output: 524.9712524414062\n",
"key: second_question, input: 108.0 output: 539.970458984375\n",
"key: third_question, input: 1000.0 output: 4999.72802734375\n",
"key: fourth_question, input: 1013.0 output: 5064.724609375\n"
]
}
],
"source": [
"pipeline_options = PipelineOptions().from_dictionary({'temp_location':f'{bucket}/tmp',\n",
" })\n",
"pipeline = beam.Pipeline(options=pipeline_options)\n",
"\n",
"keyed_torch_five_times_model_handler = KeyedModelHandler(torch_five_times_model_handler)\n",
"\n",
"with pipeline as p:\n",
" df = p | beam.dataframe.io.read_csv(input_csv_file)\n",
" pc = to_pcollection(df)\n",
" (pc\n",
" | \"ConvertNumpyToTensor\" >> beam.Map(lambda x: (x[0], torch.Tensor([x[1]])))\n",
" | \"RunInferenceTorch\" >> RunInference(keyed_torch_five_times_model_handler)\n",
" | \"PostProcessPredictions\" >> beam.ParDo(PredictionWithKeyProcessor())\n",
" | beam.Map(print)\n",
" )"
]
},
{
"cell_type": "markdown",
"id": "742abfbb-545e-435b-8833-2410ce29d22c",
"metadata": {
"id": "742abfbb-545e-435b-8833-2410ce29d22c"
},
"source": [
"# Pattern 4: Inference with multiple models in the same pipeline.\n",
"\n",
"## Inference with multiple models in parallel. "
]
},
{
"cell_type": "markdown",
"id": "570b2f27-3beb-4295-b926-595592289c06",
"metadata": {
"id": "570b2f27-3beb-4295-b926-595592289c06"
},
"source": [
"Create a torch model handler for the 10 times model using `PytorchModelHandlerTensor`."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "73607c45-afe1-4990-9a55-e687ed40302e",
"metadata": {
"id": "73607c45-afe1-4990-9a55-e687ed40302e"
},
"outputs": [],
"source": [
"torch_ten_times_model_handler = PytorchModelHandlerTensor(state_dict_path=save_model_dir_multiply_ten,\n",
" model_class=LinearRegression,\n",
" model_params={'input_dim': 1,\n",
" 'output_dim': 1}\n",
" )\n",
"keyed_torch_ten_times_model_handler = KeyedModelHandler(torch_ten_times_model_handler)"
]
},
{
"cell_type": "markdown",
"id": "70ebed52-4ead-4cae-ac96-8cf206012ce1",
"metadata": {
"id": "70ebed52-4ead-4cae-ac96-8cf206012ce1"
},
"source": [
"In this, the same data is run through two different models: the one that we’ve been using to multiply by 5 \n",
"and a new model, which will learn to multiply by 10."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "629d070e-9902-42c9-a1e7-56c3d1864f13",
"metadata": {
"id": "629d070e-9902-42c9-a1e7-56c3d1864f13",
"colab": {
"base_uri": "https://localhost:8080/"
},
"outputId": "f798bbc7-3f45-496f-b029-3cff5599bfaa"
},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"key: first_question * 5, input: 105.0 output: 1046.1859130859375\n",
"key: second_question * 5, input: 108.0 output: 1075.8590087890625\n",
"key: third_question * 5, input: 1000.0 output: 9898.654296875\n",
"key: fourth_question * 5, input: 1013.0 output: 10027.2373046875\n",
"key: first_question * 10, input: 105.0 output: 1046.1859130859375\n",
"key: second_question * 10, input: 108.0 output: 1075.8590087890625\n",
"key: third_question * 10, input: 1000.0 output: 9898.654296875\n",
"key: fourth_question * 10, input: 1013.0 output: 10027.2373046875\n"
]
}
],
"source": [
"pipeline_options = PipelineOptions().from_dictionary(\n",
" {'temp_location':f'{bucket}/tmp'})\n",
"\n",
"pipeline = beam.Pipeline(options=pipeline_options)\n",
"\n",
"read_from_bq = beam.io.ReadFromBigQuery(table=table_spec)\n",
"\n",
"with pipeline as p:\n",
" multiply_five = (\n",
" p \n",
" | read_from_bq\n",
" | \"CreateMultiplyFiveTuple\" >> beam.Map(lambda x: ('{} {}'.format(x['key'], '* 5'), x['value']))\n",
" | \"ConvertNumpyToTensorFiveTuple\" >> beam.Map(lambda x: (x[0], torch.Tensor([x[1]])))\n",
" | \"RunInferenceTorchFiveTuple\" >> RunInference(keyed_torch_ten_times_model_handler)\n",
" )\n",
" multiply_ten = (\n",
" p \n",
" | read_from_bq \n",
" | \"CreateMultiplyTenTuple\" >> beam.Map(lambda x: ('{} {}'.format(x['key'], '* 10'), x['value']))\n",
" | \"ConvertNumpyToTensorTenTuple\" >> beam.Map(lambda x: (x[0], torch.Tensor([x[1]])))\n",
" | \"RunInferenceTorchTenTuple\" >> RunInference(keyed_torch_ten_times_model_handler)\n",
" )\n",
"\n",
" inference_result = ((multiply_five, multiply_ten) | beam.Flatten() \n",
" | beam.ParDo(PredictionWithKeyProcessor()))\n",
" inference_result | beam.Map(print)"
]
},
{
"cell_type": "markdown",
"id": "e71e6706-5d8d-4322-9def-ac7fb20d4a50",
"metadata": {
"id": "e71e6706-5d8d-4322-9def-ac7fb20d4a50"
},
"source": [
"## Inference with multiple models in sequence \n",
"\n",
"In a sequential pattern, data is sent to one or more models in sequence, \n",
"with the output from each model chaining to the next model.\n",
"\n",
"1. Read the data from BigQuery.\n",
"2. Map the data.\n",
"3. RunInference with multiply by 5 model.\n",
"4. Process the results.\n",
"5. RunInference with multiply by 10 model.\n",
"6. Process the results.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8db9d649-5549-4b58-a9ad-7b8592c2bcbf",
"metadata": {
"id": "8db9d649-5549-4b58-a9ad-7b8592c2bcbf",
"colab": {
"base_uri": "https://localhost:8080/"
},
"outputId": "4f600937-4cb4-42dd-aa50-fa15538cc964"
},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"key: original input is `first_question tensor([105.])`, input: 524.9712524414062 output: 5200.13232421875\n",
"key: original input is `second_question tensor([108.])`, input: 539.970458984375 output: 5348.490234375\n",
"key: original input is `third_question tensor([1000.])`, input: 4999.72802734375 output: 49460.0703125\n",
"key: original input is `fourth_question tensor([1013.])`, input: 5064.724609375 output: 50102.953125\n"
]
}
],
"source": [
"def process_interim_inference(element):\n",
" key, prediction_result = element\n",
" input_value = prediction_result.example\n",
" inference = prediction_result.inference\n",
" formatted_input_value = 'original input is `{} {}`'.format(key, input_value)\n",
" return formatted_input_value, inference\n",
"\n",
"\n",
"pipeline_options = PipelineOptions().from_dictionary(\n",
" {'temp_location':f'{bucket}/tmp'})\n",
"pipeline = beam.Pipeline(options=pipeline_options)\n",
"\n",
"with pipeline as p:\n",
" multiply_five = (\n",
" p \n",
" | beam.io.ReadFromBigQuery(table=table_spec) \n",
" | \"CreateMultiplyFiveTuple\" >> beam.Map(lambda x: (x['key'], x['value']))\n",
" | \"ConvertNumpyToTensorFiveTuple\" >> beam.Map(lambda x: (x[0], torch.Tensor([x[1]])))\n",
" | \"RunInferenceTorchFiveTuple\" >> RunInference(keyed_torch_five_times_model_handler)\n",
" )\n",
"\n",
" inference_result = (\n",
" multiply_five \n",
" | \"ExtractResult\" >> beam.Map(process_interim_inference) \n",
" | \"RunInferenceTorchTenTuple\" >> RunInference(keyed_torch_ten_times_model_handler)\n",
" | beam.ParDo(PredictionWithKeyProcessor())\n",
" )\n",
" inference_result | beam.Map(print)"
]
},
{
"cell_type": "markdown",
"id": "32c9ba40-9396-48f4-9e7f-a2acced98bb2",
"metadata": {
"id": "32c9ba40-9396-48f4-9e7f-a2acced98bb2"
},
"source": [
"# Sklearn implementation of RunInference API.\n",
"\n",
"Here, we showcase the Sklearn implementation of the RunInference API with the unkeyed data and the keyed data.\n",
"\n",
"Sklearn is a build-dependency of Apache Beam. If a different version of sklearn needs to be installed, use `%pip install scikit-learn==<version>`"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d6142b75-eef1-4e52-9fa4-fe02fe916b26",
"metadata": {
"id": "d6142b75-eef1-4e52-9fa4-fe02fe916b26"
},
"outputs": [],
"source": [
"import pickle\n",
"from sklearn import linear_model\n",
"\n",
"import numpy as np\n",
"from apache_beam.ml.inference.sklearn_inference import ModelFileType\n",
"from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy"
]
},
{
"cell_type": "markdown",
"id": "6695cd22-e0bf-438f-8223-4a93392e6616",
"metadata": {
"id": "6695cd22-e0bf-438f-8223-4a93392e6616"
},
"source": [
"## Create the data and the Sklearn model.\n",
"In this cell, we perform:\n",
"1. Create the data to train the Sklearn linear regression model.\n",
"2. Train the linear regression model.\n",
"3. Save the Sklearn model using `pickle`."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c57843e8-f696-4196-ad39-827e34849976",
"metadata": {
"id": "c57843e8-f696-4196-ad39-827e34849976"
},
"outputs": [],
"source": [
"# Input data to train the sklearn model.\n",
"x = numpy.arange(0, 100, dtype=numpy.float32).reshape(-1, 1)\n",
"y = (x * 5).reshape(-1, 1)\n",
"\n",
"regression = linear_model.LinearRegression()\n",
"regression.fit(x,y)\n",
"\n",
"sklearn_model_filename = 'sklearn_5x_model.pkl'\n",
"with open(sklearn_model_filename, 'wb') as f:\n",
" pickle.dump(regression, f)"
]
},
{
"cell_type": "markdown",
"id": "69008a3d-3d15-4643-828c-b0419b347d01",
"metadata": {
"id": "69008a3d-3d15-4643-828c-b0419b347d01"
},
"source": [
"### Scikit-learn RunInference pipeline.\n",
"\n",
"1. Define the Sklearn model handler that accepts array_like object as input.\n",
"2. Read the data from BigQuery.\n",
"3. Use the Sklearn trained model and the Sklearn RunInference transform on unkeyed data."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "50a648a3-794a-4286-ab2b-fc0458db04ca",
"metadata": {
"id": "50a648a3-794a-4286-ab2b-fc0458db04ca",
"colab": {
"base_uri": "https://localhost:8080/"
},
"outputId": "b305d977-6549-4c01-a402-c4e14f3f2b04"
},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"PredictionResult(example=[105.0], inference=array([525.]))\n",
"PredictionResult(example=[108.0], inference=array([540.]))\n",
"PredictionResult(example=[1000.0], inference=array([5000.]))\n",
"PredictionResult(example=[1013.0], inference=array([5065.]))\n"
]
}
],
"source": [
"# SklearnModelHandlerNumpy accepts only unkeyed examples.\n",
"sklearn_model_handler = SklearnModelHandlerNumpy(model_uri=sklearn_model_filename,\n",
" model_file_type=ModelFileType.PICKLE) # Use ModelFileType.JOBLIB if the model is seriazlized using joblib.\n",
"\n",
"\n",
"pipeline_options = PipelineOptions().from_dictionary(\n",
" {'temp_location':f'{bucket}/tmp'})\n",
"pipeline = beam.Pipeline(options=pipeline_options)\n",
"\n",
"with pipeline as p:\n",
" (\n",
" p \n",
" | \"ReadFromBQ\" >> beam.io.ReadFromBigQuery(table=table_spec)\n",
" | \"ExtractInputs\" >> beam.Map(lambda x: [x['value']]) \n",
" | \"RunInferenceSklearn\" >> RunInference(model_handler=sklearn_model_handler)\n",
" | beam.Map(print)\n",
" )"
]
},
{
"cell_type": "markdown",
"id": "33e901d6-ed06-4268-8a5f-685d31b5558f",
"metadata": {
"id": "33e901d6-ed06-4268-8a5f-685d31b5558f"
},
"source": [
"### Sklearn RunInference on keyed inputs.\n",
"1. Wrap the `SklearnModelHandlerNumpy` object around `KeyedModelHandler` to handle keyed data.\n",
"2. Read the data from BigQuery.\n",
"3. Use the Sklearn trained model and the Sklearn RunInference transform on a keyed data."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c212916d-b517-4589-ad15-a3a1df926fb3",
"metadata": {
"id": "c212916d-b517-4589-ad15-a3a1df926fb3",
"colab": {
"base_uri": "https://localhost:8080/"
},
"outputId": "1c3ccf35-3cd7-401e-de23-c0e22b5f6ebd"
},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"('first_question', PredictionResult(example=[105.0], inference=array([525.])))\n",
"('second_question', PredictionResult(example=[108.0], inference=array([540.])))\n",
"('third_question', PredictionResult(example=[1000.0], inference=array([5000.])))\n",
"('fourth_question', PredictionResult(example=[1013.0], inference=array([5065.])))\n"
]
}
],
"source": [
"sklearn_model_handler = SklearnModelHandlerNumpy(model_uri=sklearn_model_filename,\n",
" model_file_type=ModelFileType.PICKLE) # Use ModelFileType.JOBLIB if the model is serialized using joblib.\n",
"\n",
"keyed_sklearn_model_handler = KeyedModelHandler(sklearn_model_handler)\n",
"\n",
"pipeline_options = PipelineOptions().from_dictionary(\n",
" {'temp_location':f'{bucket}/tmp'})\n",
"pipeline = beam.Pipeline(options=pipeline_options)\n",
"\n",
"with pipeline as p:\n",
" (\n",
" p \n",
" | \"ReadFromBQ\" >> beam.io.ReadFromBigQuery(table=table_spec)\n",
" | \"ExtractInputs\" >> beam.Map(lambda x: (x['key'], [x['value']])) \n",
" | \"RunInferenceSklearn\" >> RunInference(model_handler=keyed_sklearn_model_handler)\n",
" | beam.Map(print)\n",
" )"
]
},
{
"cell_type": "markdown",
"id": "f1481883-423b-4da0-8ae0-1a602b1807c6",
"metadata": {
"id": "f1481883-423b-4da0-8ae0-1a602b1807c6"
},
"source": [
"# Cross framework transforms in a single pipeline\n",
"\n",
"In this pipeline, RunInference transforms of different frameworks are used in a single pipeline sequentially. \n",
"\n",
"In the below cells, we perform the following actions:\n",
"1. Create `KeyedModelHandler` for Sklearn and Pytorch. \n",
"2. Run inference on Sklearn and perform intermediate processing using `process_interim_inference`.\n",
"3. Take the intermediate result from Sklearn RunInference transform and run that through Pytorch RunInference transform."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a45d496c-4d7b-4173-b27b-253c5767bb8d",
"metadata": {
"id": "a45d496c-4d7b-4173-b27b-253c5767bb8d"
},
"outputs": [],
"source": [
"def process_interim_inference(element: Tuple[str, PredictionResult]):\n",
" \"\"\"\n",
" Returns the key and the prediction to the next RunInference transform.\n",
" \"\"\"\n",
" key, prediction_result = element\n",
" prediction = prediction_result.inference\n",
" return key, prediction\n",
"\n",
"class PredictionProcessor(beam.DoFn):\n",
" def process(self, element: Tuple[str, PredictionResult]):\n",
" key, prediction_result = element\n",
" input_from_upstream = prediction_result.example\n",
" prediction = prediction_result.inference\n",
" yield (key, prediction.item())"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ada71e7d-cf29-4441-a921-310c05fa8576",
"metadata": {
"id": "ada71e7d-cf29-4441-a921-310c05fa8576",
"colab": {
"base_uri": "https://localhost:8080/"
},
"outputId": "78eb9a0d-ace2-4c02-8970-13488dc2767c"
},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"('first_question', 2624.857421875)\n",
"('second_question', 2699.853271484375)\n",
"('third_question', 24998.642578125)\n",
"('fourth_question', 25323.625)\n"
]
}
],
"source": [
"pipeline_options = PipelineOptions().from_dictionary(\n",
" {'temp_location':f'{bucket}/tmp'})\n",
"pipeline = beam.Pipeline(options=pipeline_options)\n",
"\n",
"read_from_bq = beam.io.ReadFromBigQuery(table=table_spec)\n",
"keyed_inputs = \"ExtractKeyedInputs\" >> beam.Map(lambda x: (x['key'], [x['value']]))\n",
"\n",
"keyed_sklearn_model_handler = KeyedModelHandler(SklearnModelHandlerNumpy(\n",
" model_uri=sklearn_model_filename,\n",
" model_file_type=ModelFileType.PICKLE))\n",
"\n",
"keyed_torch_model_handler = KeyedModelHandler(PytorchModelHandlerTensor(\n",
" state_dict_path=save_model_dir_multiply_ten,\n",
" model_class=LinearRegression,\n",
" model_params={'input_dim': 1,\n",
" 'output_dim': 1}))\n",
"\n",
"with pipeline as p:\n",
" sklearn_inference_result = (\n",
" p\n",
" | read_from_bq\n",
" | keyed_inputs\n",
" | \"RunInferenceSklearn\" >> RunInference(model_handler=keyed_sklearn_model_handler)\n",
" | \"ExtractOutputs\" >> beam.Map(process_interim_inference)\n",
" )\n",
"\n",
" torch_inference_result = (\n",
" sklearn_inference_result\n",
" | \"ConvertNumpyToTensorFiveTuple\" >> beam.Map(lambda x: (x[0], torch.Tensor([x[1]])))\n",
" | \"RunInferenceTorchFiveTuple\" >> RunInference(keyed_torch_five_times_model_handler)\n",
" | \"ProcessPredictions\" >> beam.ParDo(PredictionProcessor())\n",
" | beam.Map(print)\n",
" )\n"
]
}
]
}