blob: bb1a3ac34c5f705a638bf6f0f13749ab97a0aa36 [file] [log] [blame]
{
"cells": [
{
"cell_type": "markdown",
"id": "dbb50720-0572-4b30-9e89-7ab7bdb2428b",
"metadata": {},
"source": [
"<table align=\"left\"><td><a target=\"_blank\" href=\"https://beam.apache.org/documentation/io/built-in/webapis/\"><img src=\"https://beam.apache.org/images/logos/full-color/name-bottom/beam-logo-full-color-name-bottom-100.png\" width=\"32\" height=\"32\" />View the docs</a></td></table>"
]
},
{
"cell_type": "raw",
"id": "6dbfef30-d44c-482a-a122-31c07be80d77",
"metadata": {},
"source": [
"#@title Licensed under the Apache License, Version 2.0 (the \"License\")\n",
"# Licensed to the Apache Software Foundation (ASF) under one\n",
"# or more contributor license agreements. See the NOTICE file\n",
"# distributed with this work for additional information\n",
"# regarding copyright ownership. The ASF licenses this file\n",
"# to you under the Apache License, Version 2.0 (the\n",
"# \"License\"); you may not use this file except in compliance\n",
"# with the License. You may obtain a copy of the License at\n",
"#\n",
"# http://www.apache.org/licenses/LICENSE-2.0\n",
"#\n",
"# Unless required by applicable law or agreed to in writing,\n",
"# software distributed under the License is distributed on an\n",
"# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
"# KIND, either express or implied. See the License for the\n",
"# specific language governing permissions and limitations\n",
"# under the License."
]
},
{
"cell_type": "markdown",
"id": "391810af-3391-46da-8551-0ea98a75f8f3",
"metadata": {},
"source": [
"`ImageRequest` is the custom request we provide the `HttpImageClient` to invoke the HTTP call\n",
"that acquires the image.\n",
"\n",
"`ImageResponse` is the custom response we return from the `HttpImageClient` that contains the image data\n",
"as a result of calling the remote server with the image URL.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "93e19710-82a2-4fdd-b1a1-f6239c08c229",
"metadata": {},
"outputs": [],
"source": [
"from collections import namedtuple\n",
"\n",
"class ImageRequest:\n",
" image_url_to_mime_type = {\n",
" \"jpg\": \"image/jpeg\",\n",
" \"jpeg\": \"image/jpeg\",\n",
" \"png\": \"image/png\",\n",
" }\n",
"\n",
" def __init__(self, image_url):\n",
" self.image_url = image_url\n",
" self.mime_type = self.image_url_to_mime_type.get(image_url.split(\".\")[-1])\n",
"\n",
"ImageResponse = namedtuple(\"ImageResponse\", [\"mime_type\", \"data\"]) "
]
},
{
"cell_type": "markdown",
"id": "6c4f6fdf-5710-44d4-9c9c-b5e3f69982be",
"metadata": {},
"source": [
"#### Define Caller\n",
"\n",
"We implement the `Caller`, the `HttpImageClient`, that receives an `ImageRequest` and returns an `ImageResponse`.\n",
"\n",
"_For demo purposes, the example uses a tuple to preserve the raw URL in the returned `ImageResponse`._\n",
"\n",
"I/O errors are retried by the PTransform if the Caller is raising certain errors. \n",
"Prior to raising an exception, the transform performs a retry **for certain errors**\n",
"using a prescribed exponential backoff. Your `Caller` must raise specific errors, to signal the transform\n",
"to perform the retry with backoff. \n",
"\n",
"`RequestResponseIO` will attempt a retry with backoff when `Caller` raises:\n",
"* UserCodeQuotaException\n",
"* UserCodeTimeoutException\n",
"\n",
"After a threshold number of retries, the error is re-raised.\n",
"\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "bac514d6-8c1f-4221-bbad-8a6a122f9369",
"metadata": {},
"outputs": [],
"source": [
"import requests\n",
"from apache_beam.io.requestresponse import (\n",
" Caller,\n",
" UserCodeExecutionException,\n",
" UserCodeQuotaException,\n",
" UserCodeTimeoutException,\n",
")\n",
"\n",
"\n",
"class HttpImageClient(Caller):\n",
" STATUS_TOO_MANY_REQUESTS = 429\n",
" STATUS_TIMEOUT = 408\n",
"\n",
" def __call__(self, kv):\n",
" url, request = kv\n",
" try:\n",
" response = requests.get(request.image_url)\n",
" except requests.exceptions.Timeout as e:\n",
" raise UserCodeTimeoutException() from e\n",
" except requests.exceptions.HTTPError as e:\n",
" raise UserCodeExecutionException() from e\n",
"\n",
" if response.status_code >= 500:\n",
" raise UserCodeExecutionException()\n",
"\n",
" if response.status_code >= 400:\n",
" match response.status_code:\n",
" case self.STATUS_TOO_MANY_REQUESTS:\n",
" raise UserCodeQuotaException()\n",
" case self.STATUS_TIMEOUT:\n",
" raise UserCodeTimeoutException()\n",
" case _:\n",
" raise UserCodeExecutionException()\n",
"\n",
" return url, ImageResponse(request.mime_type, response.content)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "0eed9274-335a-4a0b-8676-e61fb6fb1875",
"metadata": {},
"outputs": [],
"source": [
"images = [\n",
" \"https://storage.googleapis.com/generativeai-downloads/images/cake.jpg\",\n",
" \"https://storage.googleapis.com/generativeai-downloads/images/chocolate.png\",\n",
" \"https://storage.googleapis.com/generativeai-downloads/images/croissant.jpg\",\n",
" \"https://storage.googleapis.com/generativeai-downloads/images/dog_form.jpg\",\n",
" \"https://storage.googleapis.com/generativeai-downloads/images/factory.png\",\n",
" \"https://storage.googleapis.com/generativeai-downloads/images/scones.jpg\",\n",
"]"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f54d5965-9b5e-413e-94cc-a50712bfdd92",
"metadata": {},
"outputs": [],
"source": [
"import apache_beam as beam\n",
"from apache_beam.io.requestresponse import (\n",
" RequestResponseIO,\n",
")\n",
"from apache_beam.options.pipeline_options import PipelineOptions\n",
"\n",
"\n",
"def build_image_request(image_url):\n",
" return image_url, ImageRequest(image_url)\n",
"\n",
"with beam.Pipeline(options=PipelineOptions(pickle_library=\"cloudpickle\")) as pipeline:\n",
" _ = (\n",
" pipeline\n",
" | \"Create data\" >> beam.Create(images)\n",
" | \"Map to ImageRequest\" >> beam.Map(build_image_request)\n",
" | \"Download image\" >> RequestResponseIO(HttpImageClient())\n",
" | \"Print results\"\n",
" >> beam.MapTuple(\n",
" lambda url, response: print(\n",
" f\"{url}, mimeType={response.mime_type}, size={len(response.data)}\"\n",
" )\n",
" )\n",
" )\n"
]
},
{
"cell_type": "markdown",
"id": "cd76a354-09ae-41e1-8dd4-40815946e8f4",
"metadata": {},
"source": [
"The last example demonstrated invoking HTTP requests directly. However, there are some API services that provide\n",
"client code that one should use within the `Caller` implementation. Using client code within Beam presents\n",
"unique challenges, namely serialization. Additionally, some client code requires explicit handling in terms of\n",
"setup and teardown\n",
"\n",
"`RequestResponseIO` can handle such setup and teardown scenarios by overwriting context manager dunder methods \n",
"\\_\\_enter\\_\\_ and \\_\\_exit\\_\\_ on the Caller.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8b96fa3c-a8af-4d31-9249-c5b301c32632",
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"\n",
"from google import genai\n",
"from google.genai import types\n",
"from google.genai.errors import APIError\n",
"\n",
"API_KEY = \"<your api key>\"\n",
"\n",
"class GeminiAIClient(Caller):\n",
" MODEL_GEMINI_FLASH_LITE = \"gemini-2.0-flash-lite\"\n",
"\n",
" def __init__(self, api_key):\n",
" self.api_key = api_key\n",
"\n",
" def __enter__(self):\n",
" self.client = genai.Client(api_key=self.api_key)\n",
" return self\n",
"\n",
" def __call__(self, kv):\n",
" url, request = kv\n",
" try:\n",
" response = self.client.models.generate_content(\n",
" model=self.MODEL_GEMINI_FLASH_LITE,\n",
" contents=[\n",
" types.Part.from_bytes(\n",
" data=request.data,\n",
" mime_type=request.mime_type,\n",
" ),\n",
" \"Caption this image.\",\n",
" ],\n",
" )\n",
" except APIError as e:\n",
" raise UserCodeExecutionException() from e\n",
"\n",
" return url, response"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4e2fb1d8-691e-4366-904c-7d25435e68d7",
"metadata": {},
"outputs": [],
"source": [
"with beam.Pipeline(options=PipelineOptions(pickle_library=\"cloudpickle\")) as pipeline:\n",
" _ = (\n",
" pipeline\n",
" | \"Create data\" >> beam.Create(images)\n",
" | \"Map to ImageRequest\" >> beam.Map(build_image_request)\n",
" | \"Download image\" >> RequestResponseIO(HttpImageClient())\n",
" | \"Gemini AI\" >> RequestResponseIO(GeminiAIClient(API_KEY))\n",
" | \"Print results\"\n",
" >> beam.MapTuple(lambda url, response: print(url, response.text))\n",
" )\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.14"
}
},
"nbformat": 4,
"nbformat_minor": 5
}