blob: 1b76f76df29284770a5cdff021659743d7f3d83f [file] [log] [blame]
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"id": "C1rAsD2L-hSO"
},
"outputs": [],
"source": [
"# @title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n",
"\n",
"# Licensed to the Apache Software Foundation (ASF) under one\n",
"# or more contributor license agreements. See the NOTICE file\n",
"# distributed with this work for additional information\n",
"# regarding copyright ownership. The ASF licenses this file\n",
"# to you under the Apache License, Version 2.0 (the\n",
"# \"License\"); you may not use this file except in compliance\n",
"# with the License. You may obtain a copy of the License at\n",
"#\n",
"# http://www.apache.org/licenses/LICENSE-2.0\n",
"#\n",
"# Unless required by applicable law or agreed to in writing,\n",
"# software distributed under the License is distributed on an\n",
"# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
"# KIND, either express or implied. See the License for the\n",
"# specific language governing permissions and limitations\n",
"# under the License"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "b6f8f3af-744e-4eaa-8a30-6d03e8e4d21e"
},
"source": [
"# Apache Beam RunInference for scikit-learn\n",
"\n",
"<table align=\"left\">\n",
" <td>\n",
" <a target=\"_blank\" href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_sklearn.ipynb\"><img src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/colab_32px.png\" />Run in Google Colab</a>\n",
" </td>\n",
" <td>\n",
" <a target=\"_blank\" href=\"https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_sklearn.ipynb\"><img src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/github_32px.png\" />View source on GitHub</a>\n",
" </td>\n",
"</table>\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "A8xNRyZMW1yK"
},
"source": [
"This notebook demonstrates the use of the RunInference transform for [scikit-learn](https://scikit-learn.org/), also called sklearn.\n",
"Apache Beam [RunInference](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RunInference) has implementations of the [ModelHandler](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.ModelHandler) class prebuilt for scikit-learn. For more information about using RunInference, see [Get started with AI/ML pipelines](https://beam.apache.org/documentation/ml/overview/) in the Apache Beam documentation.\n",
"\n",
"You can choose the appropriate model handler based on your input data type:\n",
"* [NumPy model handler](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.sklearn_inference.html#apache_beam.ml.inference.sklearn_inference.SklearnModelHandlerNumpy)\n",
"* [Pandas DataFrame model handler](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.sklearn_inference.html#apache_beam.ml.inference.sklearn_inference.SklearnModelHandlerNumpy)\n",
"\n",
"With RunInference, these model handlers manage batching, vectorization, and prediction optimization for your scikit-learn pipeline or model.\n",
"\n",
"This notebook demonstrates the following common RunInference patterns:\n",
"* Generate predictions.\n",
"* Postprocess results after RunInference.\n",
"* Run inference with multiple models in the same pipeline.\n",
"\n",
"The linear regression models used in these samples are trained on data that correspondes to the 5 and 10 times tables; that is,`y = 5x` and `y = 10x` respectively."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "zzwnMzzgdyPB"
},
"source": [
"## Before you begin\n",
"Complete the following setup steps:\n",
"1. Install dependencies for Apache Beam.\n",
"1. Authenticate with Google Cloud.\n",
"1. Specify your project and bucket. You use the project and bucket to save and load models."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "6vlKcT-Wev20",
"outputId": "336e8afc-6716-41dd-a438-500353189c62"
},
"outputs": [],
"source": [
"!pip install google-api-core --quiet\n",
"!pip install google-cloud-pubsub google-cloud-bigquery-storage --quiet\n",
"!pip install apache-beam[gcp,dataframe] --quiet"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "32c9ba40-9396-48f4-9e7f-a2acced98bb2"
},
"source": [
"## About scikit-learn versions\n",
"\n",
"`scikit-learn` is a build-dependency of Apache Beam. If you need to install a different version of sklearn , use `%pip install scikit-learn==<version>`"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"id": "V0E35R5Ka2cE"
},
"outputs": [],
"source": [
"from google.colab import auth\n",
"auth.authenticate_user()"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"id": "d6142b75-eef1-4e52-9fa4-fe02fe916b26"
},
"outputs": [],
"source": [
"import pickle\n",
"from sklearn import linear_model\n",
"from typing import Tuple\n",
"\n",
"import numpy as np\n",
"import apache_beam as beam\n",
"\n",
"from apache_beam.ml.inference.sklearn_inference import ModelFileType\n",
"from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy\n",
"from apache_beam.ml.inference.base import KeyedModelHandler\n",
"from apache_beam.ml.inference.base import PredictionResult\n",
"from apache_beam.ml.inference.base import RunInference\n",
"from apache_beam.options.pipeline_options import PipelineOptions\n",
"\n",
"# NOTE: If an error occurs, restart your runtime.\n"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {
"id": "248458a6-cfd8-474d-ad0e-f37f7ae981ae"
},
"outputs": [],
"source": [
"import os\n",
"\n",
"# Constants\n",
"project = \"<PROJECT_ID>\" # @param {type:'string'}\n",
"bucket = \"<BUCKET_NAME>\" # @param {type:'string'}\n",
"\n",
"# To avoid warnings, set the project.\n",
"os.environ['GOOGLE_CLOUD_PROJECT'] = project\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "6695cd22-e0bf-438f-8223-4a93392e6616"
},
"source": [
"## Create the data and the scikit-learn model\n",
"This section demonstrates the following steps:\n",
"1. Create the data to train the scikit-learn linear regression model.\n",
"2. Train the linear regression model.\n",
"3. Save the scikit-learn model using `pickle`.\n",
"\n",
"In this example, you create two models, one with the 5 times model and a second with the 10 times model."
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {
"id": "c57843e8-f696-4196-ad39-827e34849976"
},
"outputs": [],
"source": [
"# Input data to train the sklearn model for the 5 times table.\n",
"x = np.arange(0, 100, dtype=np.float32).reshape(-1, 1)\n",
"y = (x * 5).reshape(-1, 1)\n",
"\n",
"def train_and_save_model(x, y, model_file_name):\n",
" regression = linear_model.LinearRegression()\n",
" regression.fit(x,y)\n",
"\n",
" with open(model_file_name, 'wb') as f:\n",
" pickle.dump(regression, f)\n",
"\n",
"five_times_model_filename = 'sklearn_5x_model.pkl'\n",
"train_and_save_model(x, y, five_times_model_filename)\n",
"\n",
"# Change y to be 10 times, and output a 10 times table.\n",
"ten_times_model_filename = 'sklearn_10x_model.pkl'\n",
"train_and_save_model(x, y, ten_times_model_filename)\n",
"y = (x * 10).reshape(-1, 1)\n",
"train_and_save_model(x, y, 'sklearn_10x_model.pkl')"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "69008a3d-3d15-4643-828c-b0419b347d01"
},
"source": [
"### Create a scikit-learn RunInference pipeline\n",
"This section demonstrates how to do the following:\n",
"1. Define a scikit-learn model handler that accepts an `array_like` object as input.\n",
"2. Read the data from BigQuery.\n",
"3. Use the scikit-learn trained model and the scikit-learn RunInference transform on unkeyed data."
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"id": "AEGaqpMVqgRP"
},
"outputs": [],
"source": [
"%pip install --upgrade google-cloud-bigquery --quiet"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "xq5AKtRrqlUx",
"outputId": "fba8fb42-4958-451a-8aaa-9a838052a2f8"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Updated property [core/project].\n"
]
}
],
"source": [
"!gcloud config set project $project"
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "QCIjN__rpoVF",
"outputId": "0ded224f-2272-482e-80f5-bb2d21b6f5d8"
},
"outputs": [
{
"data": {
"text/plain": [
"<google.cloud.bigquery.table._EmptyRowIterator at 0x7f97abb4e850>"
]
},
"execution_count": 22,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Populated BigQuery table\n",
"\n",
"from google.cloud import bigquery\n",
"\n",
"client = bigquery.Client(project=project)\n",
"\n",
"# Make sure the dataset_id is unique in your project.\n",
"dataset_id = '{project}.maths'.format(project=project)\n",
"dataset = bigquery.Dataset(dataset_id)\n",
"\n",
"# Modify the location based on your project configuration.\n",
"dataset.location = 'US'\n",
"dataset = client.create_dataset(dataset, exists_ok=True)\n",
"\n",
"# Table name in the BigQuery dataset.\n",
"table_name = 'maths_problems_1'\n",
"\n",
"query = \"\"\"\n",
" CREATE OR REPLACE TABLE\n",
" {project}.maths.{table} ( key STRING OPTIONS(description=\"A unique key for the maths problem\"),\n",
" value FLOAT64 OPTIONS(description=\"Our maths problem\" ) );\n",
" INSERT INTO maths.{table}\n",
" VALUES\n",
" (\"first_example\", 105.00),\n",
" (\"second_example\", 108.00),\n",
" (\"third_example\", 1000.00),\n",
" (\"fourth_example\", 1013.00)\n",
"\"\"\".format(project=project, table=table_name)\n",
"\n",
"create_job = client.query(query)\n",
"create_job.result()"
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "50a648a3-794a-4286-ab2b-fc0458db04ca",
"outputId": "8eab34b4-dcc7-4df1-ec0e-8c86a34d31c6"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"PredictionResult(example=[1000.0], inference=array([5000.]))\n",
"PredictionResult(example=[1013.0], inference=array([5065.]))\n",
"PredictionResult(example=[108.0], inference=array([540.]))\n",
"PredictionResult(example=[105.0], inference=array([525.]))\n"
]
}
],
"source": [
"sklearn_model_handler = SklearnModelHandlerNumpy(model_uri=five_times_model_filename) \n",
"\n",
"\n",
"pipeline_options = PipelineOptions().from_dictionary(\n",
" {'temp_location':f'gs://{bucket}/tmp'})\n",
"\n",
"# Define the BigQuery table specification.\n",
"table_name = 'maths_problems_1'\n",
"table_spec = f'{project}:maths.{table_name}'\n",
"\n",
"with beam.Pipeline(options=pipeline_options) as p:\n",
" (\n",
" p \n",
" | \"ReadFromBQ\" >> beam.io.ReadFromBigQuery(table=table_spec)\n",
" | \"ExtractInputs\" >> beam.Map(lambda x: [x['value']]) \n",
" | \"RunInferenceSklearn\" >> RunInference(model_handler=sklearn_model_handler)\n",
" | beam.Map(print)\n",
" )"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "33e901d6-ed06-4268-8a5f-685d31b5558f"
},
"source": [
"### Use sklearn RunInference on keyed inputs\n",
"This section demonstrates how to do the following:\n",
"1. Wrap the `SklearnModelHandlerNumpy` object around `KeyedModelHandler` to handle keyed data.\n",
"2. Read the data from BigQuery.\n",
"3. Use the sklearn trained model and the sklearn RunInference transform on a keyed data."
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "c212916d-b517-4589-ad15-a3a1df926fb3",
"outputId": "61db2d76-4dfa-4b38-cf9a-645790b4c5aa"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"('third_example', PredictionResult(example=[1000.0], inference=array([5000.])))\n",
"('fourth_example', PredictionResult(example=[1013.0], inference=array([5065.])))\n",
"('second_example', PredictionResult(example=[108.0], inference=array([540.])))\n",
"('first_example', PredictionResult(example=[105.0], inference=array([525.])))\n"
]
}
],
"source": [
"sklearn_model_handler = SklearnModelHandlerNumpy(model_uri=five_times_model_filename) \n",
"keyed_sklearn_model_handler = KeyedModelHandler(sklearn_model_handler)\n",
"\n",
"pipeline_options = PipelineOptions().from_dictionary(\n",
" {'temp_location':f'gs://{bucket}/tmp'})\n",
"with beam.Pipeline(options=pipeline_options) as p:\n",
" (\n",
" p \n",
" | \"ReadFromBQ\" >> beam.io.ReadFromBigQuery(table=table_spec)\n",
" | \"ExtractInputs\" >> beam.Map(lambda x: (x['key'], [x['value']])) \n",
" | \"RunInferenceSklearn\" >> RunInference(model_handler=keyed_sklearn_model_handler)\n",
" | beam.Map(print)\n",
" )"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "JQ4zvlwsRK1W"
},
"source": [
"## Run multiple models\n",
"\n",
"This code creates a pipeline that takes two RunInference transforms with different models and then combines the output."
]
},
{
"cell_type": "code",
"execution_count": 86,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "0qMlX6SeR68D",
"outputId": "5e4a0852-3761-47da-aa08-0386fd524a78"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"key = third_example * 10, example = 1000.0 -> predictions 10000.0\n",
"key = fourth_example * 10, example = 1013.0 -> predictions 10130.0\n",
"key = second_example * 10, example = 108.0 -> predictions 1080.0\n",
"key = first_example * 10, example = 105.0 -> predictions 1050.0\n",
"key = third_example * 5, example = 1000.0 -> predictions 5000.0\n",
"key = fourth_example * 5, example = 1013.0 -> predictions 5065.0\n",
"key = second_example * 5, example = 108.0 -> predictions 540.0\n",
"key = first_example * 5, example = 105.0 -> predictions 525.0\n"
]
}
],
"source": [
"from typing import Tuple\n",
"\n",
"def format_output(run_inference_output) -> str:\n",
" \"\"\"Takes input from RunInference for scikit-learn and extracts the output.\"\"\"\n",
" key, prediction_result = run_inference_output\n",
" example = prediction_result.example[0]\n",
" prediction = prediction_result.inference[0]\n",
" return f\"key = {key}, example = {example} -> predictions {prediction}\"\n",
"\n",
"five_times_model_handler = KeyedModelHandler(\n",
" SklearnModelHandlerNumpy(model_uri=five_times_model_filename))\n",
"ten_times_model_handler = KeyedModelHandler(\n",
" SklearnModelHandlerNumpy(model_uri=ten_times_model_filename))\n",
"\n",
"pipeline_options = PipelineOptions().from_dictionary(\n",
" {'temp_location':f'gs://{bucket}/tmp'})\n",
"with beam.Pipeline(options=pipeline_options) as p:\n",
" inputs = (p \n",
" | \"ReadFromBQ\" >> beam.io.ReadFromBigQuery(table=table_spec))\n",
" five_times = (inputs\n",
" | \"Extract For 5\" >> beam.Map(lambda x: ('{} {}'.format(x['key'], '* 5'), [x['value']]))\n",
" | \"5 times\" >> RunInference(model_handler = five_times_model_handler))\n",
" ten_times = (inputs\n",
" | \"Extract For 10\" >> beam.Map(lambda x: ('{} {}'.format(x['key'], '* 10'), [x['value']]))\n",
" | \"10 times\" >> RunInference(model_handler = ten_times_model_handler))\n",
" _ = ((five_times, ten_times) | \"Flattened\" >> beam.Flatten()\n",
" | \"format output\" >> beam.Map(format_output)\n",
" | \"Print\" >> beam.Map(print))\n"
]
}
],
"metadata": {
"colab": {
"collapsed_sections": [],
"provenance": []
},
"kernelspec": {
"display_name": "Python 3",
"name": "python3"
},
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 0
}