blob: c69ea0f8f4cd77ef830115695cc9bb46b03d700a [file] [log] [blame]
{
"nbformat": 4,
"nbformat_minor": 0,
"metadata": {
"colab": {
"provenance": []
},
"kernelspec": {
"name": "python3",
"display_name": "Python 3"
},
"language_info": {
"name": "python"
}
},
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "A7HK1Duzdnew"
},
"outputs": [],
"source": [
"# @title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n",
"\n",
"# Licensed to the Apache Software Foundation (ASF) under one\n",
"# or more contributor license agreements. See the NOTICE file\n",
"# distributed with this work for additional information\n",
"# regarding copyright ownership. The ASF licenses this file\n",
"# to you under the Apache License, Version 2.0 (the\n",
"# \"License\"); you may not use this file except in compliance\n",
"# with the License. You may obtain a copy of the License at\n",
"#\n",
"# http://www.apache.org/licenses/LICENSE-2.0\n",
"#\n",
"# Unless required by applicable law or agreed to in writing,\n",
"# software distributed under the License is distributed on an\n",
"# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
"# KIND, either express or implied. See the License for the\n",
"# specific language governing permissions and limitations\n",
"# under the License"
]
},
{
"cell_type": "markdown",
"source": [
"# Use MLTransform to scale data\n",
"\n",
"<table align=\"left\">\n",
" <td>\n",
" <a target=\"_blank\" href=\"https://colab.sandbox.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/mltransform/scale_data.ipynb.ipynb\"><img src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/colab_32px.png\" />Run in Google Colab</a>\n",
" </td>\n",
" <td>\n",
" <a target=\"_blank\" href=\"https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/mltransform/scale_data.ipynb\"><img src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/github_32px.png\" />View source on GitHub</a>\n",
" </td>\n",
"</table>\n"
],
"metadata": {
"id": "ZUSiAR62SgO8"
}
},
{
"cell_type": "markdown",
"source": [
"Scaling data is an important preprocessing step for training machine learning models because it helps to ensure that all features have a similar weight or influence on the model. This can be beneficial for several reasons:\n",
"\n",
"1. Improves the convergence of gradient descent algorithms: Many machine learning algorithms, such as linear regression and neural networks, use gradient descent to optimize their parameters. Gradient descent works by iteratively moving the parameters of the model in the direction that reduces the loss function. If the features are not scaled, features with larger ranges can have a much larger impact on the gradient, making it difficult for the model to converge. Scaling the features helps to ensure that all features contribute equally to the gradient, which can lead to faster and more stable convergence.\n",
"\n",
"2. Uniformity in Features: If one feature has a much larger range than the other features, it can dominate the model and make it difficult for the model to learn from the other features. This can lead to poor performance and biased predictions. Scaling the features helps to prevent this by bringing all of the features into a similar range.\n",
"\n",
"\n",
"To scale your dataset using Apache Beam, use `MLTransform` with the following scaling data\n",
"\n",
"* `ScaleTo01`: Calculates the minimum and maximum of an entire dataset, and then scales the dataset between 0 and 1 based on minimum and maximum values.\n",
"* `ScaleToZScore`: Calculates the mean and variance of an entire dataset, and then scales the dataset based on those values.\n",
"* `ScaleByMinMax`: Scales the data in a dataset, taking minimum and maximum values as input parameters.\n",
"\n",
"For each data processing transform, `MLTransform` runs in both `write` mode and `read` mode.\n",
"\n",
"## MLTransform in write mode\n",
"\n",
"When `MLTransform` is in `write` mode, it produces artifacts, such as minimum, maximum, variance etc for different data processing transforms. This allows you to ensure that you are applying the same artifacts (and any other preprocessing transforms you apply) when you are training your model and serving it in production or testing its accuracy.\n",
"\n",
"## MLTransform in read mode\n",
"\n",
"In read mode, `MLTransform` uses the artifacts generated in `write` mode to scale the entire dataset.\n",
"\n",
"For more information about using `MLTransform`, see [Preprocess data with MLTransform](https://beam.apache.org/documentation/ml/preprocess-data/) in the Apache Beam documentation."
],
"metadata": {
"id": "F8-yTcjPUHLv"
}
},
{
"cell_type": "markdown",
"source": [
"## Import the required modules\n",
"\n",
"To use `MLTransfrom`, install `tensorflow_transform` and the Apache Beam SDK version 2.53.0 or later."
],
"metadata": {
"id": "8qhhZTbIyqFb"
}
},
{
"cell_type": "code",
"source": [
"! git clone https://github.com/apache/beam.git\n",
"! cd beam/sdks/python\n",
"! pip install beam/sdks/python\n",
"! pip install tensorflow-transform --quiet"
],
"metadata": {
"id": "cFP4CQksT_Zv"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"import os\n",
"import tempfile\n",
"import apache_beam as beam\n",
"from apache_beam.ml.transforms.base import MLTransform\n",
"from apache_beam.ml.transforms.tft import ScaleTo01\n",
"from apache_beam.ml.transforms.tft import ScaleByMinMax\n",
"from apache_beam.ml.transforms.tft import ScaleToZScore"
],
"metadata": {
"id": "y-B7OZ0lyeee"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"artifact_location_scale_to_01 = tempfile.mkdtemp(prefix='scale_to_01_')\n",
"artifact_location_scale_to_zscore = tempfile.mkdtemp(prefix='scale_to_zscore_')\n",
"artifact_location_scale_by_min_max = tempfile.mkdtemp(prefix='scale_by_min_max_')"
],
"metadata": {
"id": "UyBji8pSUEJt"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"# data used in MLTransform's write mode.\n",
"data = [\n",
" {'int_feature_1' : 11, 'int_feature_2': -10},\n",
" {'int_feature_1': 34, 'int_feature_2': -33},\n",
" {'int_feature_1': 5, 'int_feature_2': -63},\n",
" {'int_feature_1': 12, 'int_feature_2': -38},\n",
" {'int_feature_1': 32, 'int_feature_2': -65},\n",
" {'int_feature_1': 63, 'int_feature_2': -21},\n",
"]\n",
"\n",
"# data used in MLTransform's read mode.\n",
"test_data = [\n",
" {'int_feature_1': 29, 'int_feature_2': -20},\n",
" {'int_feature_1': -5, 'int_feature_2': -11},\n",
" {'int_feature_1': 5, 'int_feature_2': -44},\n",
" {'int_feature_1': 29, 'int_feature_2': -12},\n",
" {'int_feature_1': 20, 'int_feature_2': -53},\n",
" {'int_feature_1': 70, 'int_feature_2': -8}\n",
"]\n"
],
"metadata": {
"id": "lNG0WI7Dy1dd"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"## Scale the data between 0 and 1\n",
"\n",
"Scale the data so that it's in the range of 0 and 1. To scale the data, the transform calculates minimum and maximum values on the whole dataset, and then performs the following calculation:\n",
"\n",
"`x = (x - x_min) / (x_max - x_min)`\n",
"\n",
"To scale the data, use the [ScaleTo01](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.transforms.tft.html#apache_beam.ml.transforms.tft.ScaleTo01) data processing transform in `MLTransform`."
],
"metadata": {
"id": "MjMPgB1T0Wzu"
}
},
{
"cell_type": "code",
"source": [
"# MLTransform in write mode.\n",
"with beam.Pipeline() as pipeline:\n",
" data_pcoll = pipeline | \"CreateData\" >> beam.Create(data)\n",
"\n",
" transformed_pcoll = (\n",
" data_pcoll\n",
" | \"MLTransform\" >> MLTransform(write_artifact_location=artifact_location_scale_to_01).with_transform(\n",
" ScaleTo01(columns=['int_feature_1', 'int_feature_2'])\n",
" )\n",
" )\n",
" transformed_pcoll | \"Print\" >> beam.Map(print)"
],
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 211
},
"id": "xuoLla2u0Cf_",
"outputId": "dab7f024-ca5e-4f2b-83b8-c09612c19450"
},
"execution_count": null,
"outputs": [
{
"output_type": "display_data",
"data": {
"application/javascript": [
"\n",
" if (typeof window.interactive_beam_jquery == 'undefined') {\n",
" var jqueryScript = document.createElement('script');\n",
" jqueryScript.src = 'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n",
" jqueryScript.type = 'text/javascript';\n",
" jqueryScript.onload = function() {\n",
" var datatableScript = document.createElement('script');\n",
" datatableScript.src = 'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n",
" datatableScript.type = 'text/javascript';\n",
" datatableScript.onload = function() {\n",
" window.interactive_beam_jquery = jQuery.noConflict(true);\n",
" window.interactive_beam_jquery(document).ready(function($){\n",
" \n",
" });\n",
" }\n",
" document.head.appendChild(datatableScript);\n",
" };\n",
" document.head.appendChild(jqueryScript);\n",
" } else {\n",
" window.interactive_beam_jquery(document).ready(function($){\n",
" \n",
" });\n",
" }"
]
},
"metadata": {}
},
{
"output_type": "stream",
"name": "stdout",
"text": [
"Row(int_feature_1=array([0.10344828], dtype=float32), int_feature_2=array([1.], dtype=float32))\n",
"Row(int_feature_1=array([0.5], dtype=float32), int_feature_2=array([0.58181816], dtype=float32))\n",
"Row(int_feature_1=array([0.], dtype=float32), int_feature_2=array([0.03636364], dtype=float32))\n",
"Row(int_feature_1=array([0.12068965], dtype=float32), int_feature_2=array([0.4909091], dtype=float32))\n",
"Row(int_feature_1=array([0.46551725], dtype=float32), int_feature_2=array([0.], dtype=float32))\n",
"Row(int_feature_1=array([1.], dtype=float32), int_feature_2=array([0.8], dtype=float32))\n"
]
}
]
},
{
"cell_type": "markdown",
"source": [
"In the above dataset, the minimum and maximum values for columns:\n",
"* int_feature_1 : 5 and 63.\n",
"* int_feature_2 : -65 and -10\n",
"\n",
"In the output for the column - `int_feature_1`, the data is scaled between 0 and 1 using the values `5` and `63`. `5` is scaled to `0` and `63` is scaled to `1` and rest are scaled between 0 and 1 using `x = (x - x_min) / (x_max - x_min)`"
],
"metadata": {
"id": "Hwvh4UyI23pz"
}
},
{
"cell_type": "code",
"source": [
"# MLTransform in read mode.\n",
"with beam.Pipeline() as pipeline:\n",
" data_pcoll = pipeline | \"CreateData\" >> beam.Create(test_data)\n",
"\n",
" transformed_pcoll = (\n",
" data_pcoll\n",
" | \"MLTransform\" >> MLTransform(read_artifact_location=artifact_location_scale_to_01)\n",
" )\n",
" transformed_pcoll | \"Print\" >> beam.Map(print)"
],
"metadata": {
"id": "M8obCCCL1TM6",
"colab": {
"base_uri": "https://localhost:8080/"
},
"outputId": "813bf5c2-6be7-49d3-d622-33f3f66fe2c9"
},
"execution_count": null,
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"Row(int_feature_1=array([0.41379312], dtype=float32), int_feature_2=array([0.8181818], dtype=float32))\n",
"Row(int_feature_1=array([-0.1724138], dtype=float32), int_feature_2=array([0.9818182], dtype=float32))\n",
"Row(int_feature_1=array([0.], dtype=float32), int_feature_2=array([0.38181818], dtype=float32))\n",
"Row(int_feature_1=array([0.41379312], dtype=float32), int_feature_2=array([0.96363634], dtype=float32))\n",
"Row(int_feature_1=array([0.25862068], dtype=float32), int_feature_2=array([0.21818182], dtype=float32))\n",
"Row(int_feature_1=array([1.1206896], dtype=float32), int_feature_2=array([1.0363636], dtype=float32))\n"
]
}
]
},
{
"cell_type": "markdown",
"source": [
"`MLTransform` learned in `write` mode that int_feature_1 ranges from 5 to 63.\n",
"\n",
"In read mode, when it encounters 29 in test_data for `int_feature_1`, it scales it using\n",
"```\n",
"(value - min) / (max - min)\n",
"```\n",
"\n",
"Now plugging the values\n",
"```\n",
"(29 - 5) / (63 - 5) = 0.41379312\n",
"```\n",
"\n",
"So 29 is scaled based on the min and max values generated during `write` mode.\n"
],
"metadata": {
"id": "jwSiMoAH4vq5"
}
},
{
"cell_type": "markdown",
"source": [
"## Scale the data by using the z-score\n",
"\n",
"Similar to `ScaleTo01`, use [ScaleToZScore](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.transforms.tft.html#apache_beam.ml.transforms.tft.ScaleToZScore) to scale the values by using the [z-score]([z-score](https://www.tensorflow.org/tfx/transform/api_docs/python/tft/scale_to_z_score#:~:text=Scaling%20to%20z%2Dscore%20subtracts%20out%20the%20mean%20and%20divides%20by%20standard%20deviation.%20Note%20that%20the%20standard%20deviation%20computed%20here%20is%20based%20on%20the%20biased%20variance%20(0%20delta%20degrees%20of%20freedom)%2C%20as%20computed%20by%20analyzers.var.).\n"
],
"metadata": {
"id": "VH1CmCGm_PtS"
}
},
{
"cell_type": "code",
"source": [
"# MLTransform in write mode.\n",
"with beam.Pipeline() as pipeline:\n",
" data_pcoll = pipeline | \"CreateData\" >> beam.Create(data)\n",
"\n",
" transformed_pcoll = (\n",
" data_pcoll\n",
" | \"MLTransform\" >> MLTransform(write_artifact_location=artifact_location_scale_to_zscore).with_transform(\n",
" ScaleToZScore(columns=['int_feature_1', 'int_feature_2'])\n",
" )\n",
" )\n",
" transformed_pcoll | \"Print\" >> beam.Map(print)"
],
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "OOrnLBbD_Qe4",
"outputId": "4754b0ac-dc40-4a55-c999-33270e785acc"
},
"execution_count": null,
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"Row(int_feature_1=array([-0.76950264], dtype=float32), int_feature_2=array([1.401755], dtype=float32))\n",
"Row(int_feature_1=array([0.3974355], dtype=float32), int_feature_2=array([0.2638597], dtype=float32))\n",
"Row(int_feature_1=array([-1.0739213], dtype=float32), int_feature_2=array([-1.2203515], dtype=float32))\n",
"Row(int_feature_1=array([-0.7187662], dtype=float32), int_feature_2=array([0.01649117], dtype=float32))\n",
"Row(int_feature_1=array([0.2959626], dtype=float32), int_feature_2=array([-1.3192989], dtype=float32))\n",
"Row(int_feature_1=array([1.8687923], dtype=float32), int_feature_2=array([0.8575442], dtype=float32))\n"
]
}
]
},
{
"cell_type": "code",
"source": [
"# MLTransform in read mode.\n",
"with beam.Pipeline() as pipeline:\n",
" data_pcoll = pipeline | \"CreateData\" >> beam.Create(test_data)\n",
"\n",
" transformed_pcoll = (\n",
" data_pcoll\n",
" | \"MLTransform\" >> MLTransform(read_artifact_location=artifact_location_scale_to_zscore)\n",
" )\n",
" transformed_pcoll | \"Print\" >> beam.Map(print)"
],
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "lLXGBaOtCRv-",
"outputId": "3ef4d142-0be2-4350-b2c4-3b676be74cee"
},
"execution_count": null,
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"Row(int_feature_1=array([0.14375328], dtype=float32), int_feature_2=array([0.9070179], dtype=float32))\n",
"Row(int_feature_1=array([-1.5812857], dtype=float32), int_feature_2=array([1.3522812], dtype=float32))\n",
"Row(int_feature_1=array([-1.0739213], dtype=float32), int_feature_2=array([-0.28035107], dtype=float32))\n",
"Row(int_feature_1=array([0.14375328], dtype=float32), int_feature_2=array([1.3028076], dtype=float32))\n",
"Row(int_feature_1=array([-0.31287467], dtype=float32), int_feature_2=array([-0.7256144], dtype=float32))\n",
"Row(int_feature_1=array([2.2239475], dtype=float32), int_feature_2=array([1.5007024], dtype=float32))\n"
]
}
]
},
{
"cell_type": "markdown",
"source": [
"## Scale the data by using ScaleByMinMax\n",
"\n",
"Use [ScaleByMinMax](https://github.com/apache/beam/blob/9e8a310f0c0faddfba28176df5893d8ad8fd10a0/sdks/python/apache_beam/ml/transforms/tft.py#L450) to scale your data into the range of `[min_value, max_value]`."
],
"metadata": {
"id": "McISy8QkC00v"
}
},
{
"cell_type": "code",
"source": [
"min_value = 1\n",
"max_value = 10\n",
"\n",
"# MLTransform in write mode.\n",
"with beam.Pipeline() as pipeline:\n",
" data_pcoll = pipeline | \"CreateData\" >> beam.Create(data)\n",
"\n",
" transformed_pcoll = (\n",
" data_pcoll\n",
" | \"MLTransform\" >> MLTransform(write_artifact_location=artifact_location_scale_by_min_max).with_transform(\n",
" ScaleByMinMax(columns=['int_feature_1', 'int_feature_2'], min_value=min_value, max_value=max_value)\n",
" )\n",
" )\n",
" transformed_pcoll | \"Print\" >> beam.Map(print)"
],
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "cpn_wKQ3Cxo0",
"outputId": "20be1e2b-36a2-4bae-dc53-5e5b5f1692be"
},
"execution_count": null,
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"Row(int_feature_1=array([1.9310346], dtype=float32), int_feature_2=array([10.], dtype=float32))\n",
"Row(int_feature_1=array([5.5], dtype=float32), int_feature_2=array([6.2363634], dtype=float32))\n",
"Row(int_feature_1=array([1.], dtype=float32), int_feature_2=array([1.3272727], dtype=float32))\n",
"Row(int_feature_1=array([2.086207], dtype=float32), int_feature_2=array([5.418182], dtype=float32))\n",
"Row(int_feature_1=array([5.1896553], dtype=float32), int_feature_2=array([1.], dtype=float32))\n",
"Row(int_feature_1=array([10.], dtype=float32), int_feature_2=array([8.200001], dtype=float32))\n"
]
}
]
},
{
"cell_type": "code",
"source": [
"# MLTransform in read mode.\n",
"with beam.Pipeline() as pipeline:\n",
" data_pcoll = pipeline | \"CreateData\" >> beam.Create(test_data)\n",
"\n",
" transformed_pcoll = (\n",
" data_pcoll\n",
" | \"MLTransform\" >> MLTransform(read_artifact_location=artifact_location_scale_by_min_max)\n",
" )\n",
" transformed_pcoll | \"Print\" >> beam.Map(print)"
],
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "3aQjFPzBI7dS",
"outputId": "e9967889-27eb-4295-dfad-a285d670826c"
},
"execution_count": null,
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"Row(int_feature_1=array([4.7241383], dtype=float32), int_feature_2=array([8.363636], dtype=float32))\n",
"Row(int_feature_1=array([-0.5517242], dtype=float32), int_feature_2=array([9.836364], dtype=float32))\n",
"Row(int_feature_1=array([1.], dtype=float32), int_feature_2=array([4.4363637], dtype=float32))\n",
"Row(int_feature_1=array([4.7241383], dtype=float32), int_feature_2=array([9.672727], dtype=float32))\n",
"Row(int_feature_1=array([3.3275862], dtype=float32), int_feature_2=array([2.9636364], dtype=float32))\n",
"Row(int_feature_1=array([11.086206], dtype=float32), int_feature_2=array([10.327272], dtype=float32))\n"
]
}
]
}
]
}