| { |
| "nbformat": 4, |
| "nbformat_minor": 0, |
| "metadata": { |
| "colab": { |
| "provenance": [], |
| "toc_visible": true |
| }, |
| "kernelspec": { |
| "name": "python3", |
| "display_name": "Python 3" |
| }, |
| "language_info": { |
| "name": "python" |
| } |
| }, |
| "cells": [ |
| { |
| "cell_type": "code", |
| "source": [ |
| "# @title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n", |
| "\n", |
| "# Licensed to the Apache Software Foundation (ASF) under one\n", |
| "# or more contributor license agreements. See the NOTICE file\n", |
| "# distributed with this work for additional information\n", |
| "# regarding copyright ownership. The ASF licenses this file\n", |
| "# to you under the Apache License, Version 2.0 (the\n", |
| "# \"License\"); you may not use this file except in compliance\n", |
| "# with the License. You may obtain a copy of the License at\n", |
| "#\n", |
| "# http://www.apache.org/licenses/LICENSE-2.0\n", |
| "#\n", |
| "# Unless required by applicable law or agreed to in writing,\n", |
| "# software distributed under the License is distributed on an\n", |
| "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n", |
| "# KIND, either express or implied. See the License for the\n", |
| "# specific language governing permissions and limitations\n", |
| "# under the License" |
| ], |
| "metadata": { |
| "id": "SGjEjVxwudf2" |
| }, |
| "execution_count": null, |
| "outputs": [] |
| }, |
| { |
| "cell_type": "markdown", |
| "source": [ |
| "# Apache Beam RunInference with Hugging Face\n", |
| "\n", |
| "<table align=\"left\">\n", |
| " <td>\n", |
| " <a target=\"_blank\" href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_huggingface.ipynb\"><img src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/colab_32px.png\" />Run in Google Colab</a>\n", |
| " </td>\n", |
| " <td>\n", |
| " <a target=\"_blank\" href=\"https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_huggingface.ipynb\"><img src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/github_32px.png\" />View source on GitHub</a>\n", |
| " </td>\n", |
| "</table>" |
| ], |
| "metadata": { |
| "id": "BJ0mTOhFucGg" |
| } |
| }, |
| { |
| "cell_type": "markdown", |
| "source": [ |
| "This notebook shows how to use models from [Hugging Face](https://huggingface.co/) and [Hugging Face pipeline](https://huggingface.co/docs/transformers/main_classes/pipelines) in Apache Beam pipelines that uses the [RunInference](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RunInference) transform.\n", |
| "\n", |
| "Apache Beam has built-in support for Hugging Face model handlers. Hugging Face has three model handlers:\n", |
| "\n", |
| "\n", |
| "\n", |
| "* Use the [`HuggingFacePipelineModelHandler`](https://github.com/apache/beam/blob/926774dd02be5eacbe899ee5eceab23afb30abca/sdks/python/apache_beam/ml/inference/huggingface_inference.py#L567) model handler to run inference with [Hugging Face pipelines](https://huggingface.co/docs/transformers/main_classes/pipelines#pipelines).\n", |
| "* Use the [`HuggingFaceModelHandlerKeyedTensor`](https://github.com/apache/beam/blob/926774dd02be5eacbe899ee5eceab23afb30abca/sdks/python/apache_beam/ml/inference/huggingface_inference.py#L208) model handler to run inference with models that uses keyed tensors as inputs. For example, you might use this model handler with language modeling tasks.\n", |
| "* Use the [`HuggingFaceModelHandlerTensor`](https://github.com/apache/beam/blob/926774dd02be5eacbe899ee5eceab23afb30abca/sdks/python/apache_beam/ml/inference/huggingface_inference.py#L392) model handler to run inference with models that uses tensor inputs, such as `tf.Tensor` or `torch.Tensor`. \n", |
| "\n", |
| "\n", |
| "For more information about using RunInference, see [Get started with AI/ML pipelines](https://beam.apache.org/documentation/ml/overview/) in the Apache Beam documentation." |
| ], |
| "metadata": { |
| "id": "GBloorZevCXC" |
| } |
| }, |
| { |
| "cell_type": "markdown", |
| "source": [ |
| "## Install dependencies" |
| ], |
| "metadata": { |
| "id": "xpylrB7_2Jzk" |
| } |
| }, |
| { |
| "cell_type": "markdown", |
| "source": [ |
| "Install both Apache Beam and the required dependencies for Hugging Face." |
| ], |
| "metadata": { |
| "id": "IBQLg8on2S40" |
| } |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "id": "yrqNSBB3qsI1" |
| }, |
| "outputs": [], |
| "source": [ |
| "!pip install torch --quiet\n", |
| "!pip install tensorflow --quiet\n", |
| "!pip install transformers==4.44.2 --quiet\n", |
| "!pip install apache-beam[gcp]>=2.50 --quiet" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "source": [ |
| "from typing import Dict\n", |
| "from typing import Iterable\n", |
| "from typing import Tuple\n", |
| "\n", |
| "import tensorflow as tf\n", |
| "import torch\n", |
| "from transformers import AutoTokenizer\n", |
| "from transformers import TFAutoModelForMaskedLM\n", |
| "\n", |
| "import apache_beam as beam\n", |
| "from apache_beam.ml.inference.base import KeyedModelHandler\n", |
| "from apache_beam.ml.inference.base import PredictionResult\n", |
| "from apache_beam.ml.inference.base import RunInference\n", |
| "from apache_beam.ml.inference.huggingface_inference import HuggingFacePipelineModelHandler\n", |
| "from apache_beam.ml.inference.huggingface_inference import HuggingFaceModelHandlerKeyedTensor\n", |
| "from apache_beam.ml.inference.huggingface_inference import HuggingFaceModelHandlerTensor\n", |
| "from apache_beam.ml.inference.huggingface_inference import PipelineTask\n" |
| ], |
| "metadata": { |
| "id": "BIDZLGFRrAmF" |
| }, |
| "execution_count": null, |
| "outputs": [] |
| }, |
| { |
| "cell_type": "markdown", |
| "source": [ |
| "## Use RunInference with Hugging Face pipelines" |
| ], |
| "metadata": { |
| "id": "OQ1wv6xk3UeV" |
| } |
| }, |
| { |
| "cell_type": "markdown", |
| "source": [ |
| "You can use [Hugging Face pipelines](https://huggingface.co/docs/transformers/main_classes/pipelines#pipelines) with `RunInference` by using the `HuggingFacePipelineModelHandler` model handler. Similar to the Hugging Face pipelines, to instantiate the model handler, the model handler needs either the pipeline `task` or the `model` that defines the task. To pass any optional arguments to load the pipeline, use `load_pipeline_args`. To pass the optional arguments for inference, use `inference_args`.\n", |
| "\n", |
| "\n", |
| "\n", |
| "You can define the pipeline task in one of the following two ways:\n", |
| "\n", |
| "\n", |
| "\n", |
| "* In the form of string, for example `\"translation\"`. This option is similar to how the pipeline task is defined when using Hugging Face.\n", |
| "* In the form of a [`PipelineTask`](https://github.com/apache/beam/blob/ac936b0b89a92d836af59f3fc04f5733ad6819b3/sdks/python/apache_beam/ml/inference/huggingface_inference.py#L75) enum object defined in Apache Beam, such as `PipelineTask.Translation`.\n" |
| ], |
| "metadata": { |
| "id": "hWZQ49Pt3ojg" |
| } |
| }, |
| { |
| "cell_type": "markdown", |
| "source": [ |
| "### Create a model handler\n", |
| "\n", |
| "This example demonstrates a task that translates text from English to Spanish." |
| ], |
| "metadata": { |
| "id": "pVVg9RfET86L" |
| } |
| }, |
| { |
| "cell_type": "code", |
| "source": [ |
| "model_handler = HuggingFacePipelineModelHandler(\n", |
| " task=PipelineTask.Translation_XX_to_YY,\n", |
| " model = \"google/flan-t5-small\",\n", |
| " load_pipeline_args={'framework': 'pt'},\n", |
| " inference_args={'max_length': 200}\n", |
| ")" |
| ], |
| "metadata": { |
| "id": "aF_BDPXk3UG4" |
| }, |
| "execution_count": null, |
| "outputs": [] |
| }, |
| { |
| "cell_type": "markdown", |
| "source": [ |
| "### Define the input examples\n", |
| "\n", |
| "Use this code to define the input examples." |
| ], |
| "metadata": { |
| "id": "lxTImFGJUBIw" |
| } |
| }, |
| { |
| "cell_type": "code", |
| "source": [ |
| "text = [\"translate English to Spanish: How are you doing?\",\n", |
| " \"translate English to Spanish: This is the Apache Beam project.\"]" |
| ], |
| "metadata": { |
| "id": "POAuIFS_UDgE" |
| }, |
| "execution_count": null, |
| "outputs": [] |
| }, |
| { |
| "cell_type": "markdown", |
| "source": [ |
| "### Postprocess the results\n", |
| "\n", |
| "The output from the `RunInference` transform is a `PredictionResult` object. Use that output to extract inferences, and then format and print the results." |
| ], |
| "metadata": { |
| "id": "Ay6-7DD5TZLn" |
| } |
| }, |
| { |
| "cell_type": "code", |
| "source": [ |
| "class FormatOutput(beam.DoFn):\n", |
| " \"\"\"\n", |
| " Extract the results from PredictionResult and print the results.\n", |
| " \"\"\"\n", |
| " def process(self, element):\n", |
| " example = element.example\n", |
| " translated_text = element.inference['translation_text']\n", |
| " print(f'Example: {example}')\n", |
| " print(f'Translated text: {translated_text}')\n", |
| " print('-' * 80)\n" |
| ], |
| "metadata": { |
| "id": "74I3U1JsrG0R" |
| }, |
| "execution_count": null, |
| "outputs": [] |
| }, |
| { |
| "cell_type": "markdown", |
| "source": [ |
| "### Run the pipeline\n", |
| "\n", |
| "Use the following code to run the pipeline." |
| ], |
| "metadata": { |
| "id": "Ve2cpHZ_UH0o" |
| } |
| }, |
| { |
| "cell_type": "code", |
| "source": [ |
| "with beam.Pipeline() as beam_pipeline:\n", |
| " examples = (\n", |
| " beam_pipeline\n", |
| " | \"CreateExamples\" >> beam.Create(text)\n", |
| " )\n", |
| " inferences = (\n", |
| " examples\n", |
| " | \"RunInference\" >> RunInference(model_handler)\n", |
| " | \"Print\" >> beam.ParDo(FormatOutput())\n", |
| " )" |
| ], |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "_xStwO3qubqr", |
| "outputId": "5aeef601-c3e5-4b0f-e982-183ff36dc56e" |
| }, |
| "execution_count": null, |
| "outputs": [ |
| { |
| "output_type": "stream", |
| "name": "stdout", |
| "text": [ |
| "Example: translate English to Spanish: How are you doing?\n", |
| "Translated text: Cómo está acerca?\n", |
| "--------------------------------------------------------------------------------\n", |
| "Example: translate English to Spanish: This is the Apache Beam project.\n", |
| "Translated text: Esto es el proyecto Apache Beam.\n", |
| "--------------------------------------------------------------------------------\n" |
| ] |
| } |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "source": [ |
| "### Try with a different model\n", |
| "One of the best parts of using RunInference is how easy it is to swap in different models. For example, if we wanted to use a larger model like DeepSeek-R1-Distill-Llama-8B outside of Colab (which has very tight memory constraints and limited GPU access), all we need to change is our ModelHandler:\n", |
| "\n", |
| "```\n", |
| "model_handler = HuggingFacePipelineModelHandler(\n", |
| " task=PipelineTask.Translation_XX_to_YY,\n", |
| " model = \"deepseek-ai/DeepSeek-R1-Distill-Llama-8B\",\n", |
| " load_pipeline_args={'framework': 'pt'},\n", |
| " inference_args={'max_length': 400}\n", |
| ")\n", |
| "```\n", |
| "\n", |
| "We can then run the exact same pipeline code and Beam will take care of the rest." |
| ], |
| "metadata": { |
| "id": "LWNM81ivoZcF" |
| } |
| }, |
| { |
| "cell_type": "markdown", |
| "source": [ |
| "## RunInference with a pretrained model from Hugging Face Hub\n" |
| ], |
| "metadata": { |
| "id": "KJEsPkXnUS5y" |
| } |
| }, |
| { |
| "cell_type": "markdown", |
| "source": [ |
| "To use pretrained models directly from [Hugging Face Hub](https://huggingface.co/docs/hub/models), use either the `HuggingFaceModelHandlerTensor` model handler or the `HuggingFaceModelHandlerKeyedTensor` model handler. Which model handler you use depends on your input type:\n", |
| "\n", |
| "\n", |
| "* Use `HuggingFaceModelHandlerKeyedTensor` to run inference with models that uses keyed tensors as inputs.\n", |
| "* Use `HuggingFaceModelHandlerTensor` to run inference with models that uses tensor inputs, such as `tf.Tensor` or `torch.Tensor`.\n", |
| "\n", |
| "When you construct your pipeline, you might also need to use the following items:\n", |
| "\n", |
| "\n", |
| "* Use the `load_model_args` to provide optional arguments to load the model.\n", |
| "* Use the `inference_args` argument to do the inference.\n", |
| "* For TensorFlow models, specify the `framework='tf'`.\n", |
| "* For PyTorch models, specify the `framework='pt'`.\n", |
| "\n", |
| "\n", |
| "\n", |
| "The following language modeling task predicts the masked word in a sentence." |
| ], |
| "metadata": { |
| "id": "mDcpG78tWcBN" |
| } |
| }, |
| { |
| "cell_type": "markdown", |
| "source": [ |
| "### Create a model handler\n", |
| "\n", |
| "This example shows a masked language modeling task. These models take keyed tensors as inputs." |
| ], |
| "metadata": { |
| "id": "dU6NDE4DaRuF" |
| } |
| }, |
| { |
| "cell_type": "code", |
| "source": [ |
| "model_handler = HuggingFaceModelHandlerKeyedTensor(\n", |
| " model_uri=\"stevhliu/my_awesome_eli5_mlm_model\",\n", |
| " model_class=TFAutoModelForMaskedLM,\n", |
| " framework='tf',\n", |
| " load_model_args={'from_pt': True},\n", |
| " max_batch_size=1\n", |
| ")" |
| ], |
| "metadata": { |
| "id": "Zx1ep1UXWYrq" |
| }, |
| "execution_count": null, |
| "outputs": [] |
| }, |
| { |
| "cell_type": "markdown", |
| "source": [ |
| "### Define the input examples\n", |
| "\n", |
| "Use this code to define the input examples." |
| ], |
| "metadata": { |
| "id": "D18eQZfgcIM6" |
| } |
| }, |
| { |
| "cell_type": "code", |
| "source": [ |
| "text = ['The capital of France is Paris .',\n", |
| " 'It is raining cats and dogs .',\n", |
| " 'He looked up and saw the sun and stars .',\n", |
| " 'Today is Monday and tomorrow is Tuesday .',\n", |
| " 'There are 5 coconuts on this palm tree .']" |
| ], |
| "metadata": { |
| "id": "vWI_A6VrcH-m" |
| }, |
| "execution_count": null, |
| "outputs": [] |
| }, |
| { |
| "cell_type": "markdown", |
| "source": [ |
| "### Preprocess the input\n", |
| "\n", |
| "Edit the given input to replace the last word with a `<mask>`. Then, tokenize the input for doing inference." |
| ], |
| "metadata": { |
| "id": "-62nvMSbeNBy" |
| } |
| }, |
| { |
| "cell_type": "code", |
| "source": [ |
| "def add_mask_to_last_word(text: str) -> Tuple[str, str]:\n", |
| " \"\"\"Replace the last word of sentence with <mask> and return\n", |
| " the original sentence and the masked sentence.\"\"\"\n", |
| " text_list = text.split()\n", |
| " masked = ' '.join(text_list[:-2] + ['<mask>' + text_list[-1]])\n", |
| " return text, masked\n", |
| "\n", |
| "tokenizer = AutoTokenizer.from_pretrained(\"stevhliu/my_awesome_eli5_mlm_model\")\n", |
| "\n", |
| "def tokenize_sentence(\n", |
| " text_and_mask: Tuple[str, str],\n", |
| " tokenizer) -> Tuple[str, Dict[str, tf.Tensor]]:\n", |
| " \"\"\"Convert string examples to tensors.\"\"\"\n", |
| " text, masked_text = text_and_mask\n", |
| " tokenized_sentence = tokenizer.encode_plus(\n", |
| " masked_text, return_tensors=\"tf\")\n", |
| "\n", |
| " # Workaround to manually remove batch dim until we have the feature to\n", |
| " # add optional batching flag.\n", |
| " # TODO(https://github.com/apache/beam/issues/21863): Remove when optional\n", |
| " # batching flag added\n", |
| " return text, {\n", |
| " k: tf.squeeze(v)\n", |
| " for k, v in dict(tokenized_sentence).items()\n", |
| " }" |
| ], |
| "metadata": { |
| "id": "d6TXVfWhWRzz" |
| }, |
| "execution_count": null, |
| "outputs": [] |
| }, |
| { |
| "cell_type": "markdown", |
| "source": [ |
| "### Postprocess the results\n", |
| "\n", |
| "Extract the result from the `PredictionResult` object. Then, format the output to print the actual sentence and the word predicted for the last word in the sentence." |
| ], |
| "metadata": { |
| "id": "KnLtuYyTfC-g" |
| } |
| }, |
| { |
| "cell_type": "code", |
| "source": [ |
| "class PostProcessor(beam.DoFn):\n", |
| " \"\"\"Processes the PredictionResult to get the predicted word.\n", |
| "\n", |
| " The logits are the output of the BERT Model. To get the word with the highest\n", |
| " probability of being the masked word, take the argmax.\n", |
| " \"\"\"\n", |
| " def __init__(self, tokenizer):\n", |
| " super().__init__()\n", |
| " self.tokenizer = tokenizer\n", |
| "\n", |
| " def process(self, element: Tuple[str, PredictionResult]) -> Iterable[str]:\n", |
| " text, prediction_result = element\n", |
| " inputs = prediction_result.example\n", |
| " logits = prediction_result.inference['logits']\n", |
| " mask_token_index = tf.where(inputs[\"input_ids\"] == self.tokenizer.mask_token_id)[0]\n", |
| " predicted_token_id = tf.math.argmax(logits[mask_token_index[0]], axis=-1)\n", |
| " decoded_word = self.tokenizer.decode(predicted_token_id)\n", |
| " print(f\"Actual Sentence: {text}\\nPredicted last word: {decoded_word}\")\n", |
| " print('-' * 80)" |
| ], |
| "metadata": { |
| "id": "DnWlNV1kelnq" |
| }, |
| "execution_count": null, |
| "outputs": [] |
| }, |
| { |
| "cell_type": "markdown", |
| "source": [ |
| "### Run the pipeline\n", |
| "\n", |
| "Use the following code to run the pipeline." |
| ], |
| "metadata": { |
| "id": "scepcVUpgD63" |
| } |
| }, |
| { |
| "cell_type": "code", |
| "source": [ |
| "with beam.Pipeline() as beam_pipeline:\n", |
| " tokenized_examples = (\n", |
| " beam_pipeline\n", |
| " | \"CreateExamples\" >> beam.Create(text)\n", |
| " | 'AddMask' >> beam.Map(add_mask_to_last_word)\n", |
| " | 'TokenizeSentence' >>\n", |
| " beam.Map(lambda x: tokenize_sentence(x, tokenizer)))\n", |
| "\n", |
| " result = (\n", |
| " tokenized_examples\n", |
| " | \"RunInference\" >> RunInference(KeyedModelHandler(model_handler))\n", |
| " | \"PostProcess\" >> beam.ParDo(PostProcessor(tokenizer))\n", |
| " )" |
| ], |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "IEPrQGEWgBCo", |
| "outputId": "218cc1f4-2613-4bf1-9666-782df020536b" |
| }, |
| "execution_count": null, |
| "outputs": [ |
| { |
| "output_type": "stream", |
| "name": "stdout", |
| "text": [ |
| "Actual Sentence: The capital of France is Paris .\n", |
| "Predicted last word: Paris\n", |
| "--------------------------------------------------------------------------------\n", |
| "Actual Sentence: It is raining cats and dogs .\n", |
| "Predicted last word: dogs\n", |
| "--------------------------------------------------------------------------------\n", |
| "Actual Sentence: He looked up and saw the sun and stars .\n", |
| "Predicted last word: stars\n", |
| "--------------------------------------------------------------------------------\n", |
| "Actual Sentence: Today is Monday and tomorrow is Tuesday .\n", |
| "Predicted last word: Tuesday\n", |
| "--------------------------------------------------------------------------------\n", |
| "Actual Sentence: There are 5 coconuts on this palm tree .\n", |
| "Predicted last word: tree\n", |
| "--------------------------------------------------------------------------------\n" |
| ] |
| } |
| ] |
| } |
| ] |
| } |