| { |
| "cells": [ |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "c6cc20c0", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "# Import required libraries\n", |
| "import pyarrow as pa\n", |
| "\n", |
| "import pyiceberg\n", |
| "from pyiceberg.catalog import load_catalog\n", |
| "\n", |
| "print(f\"PyIceberg version: {pyiceberg.__version__}\")" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "33dcfca9", |
| "metadata": {}, |
| "source": [ |
| "## Setup: Connecting to a Catalog\n", |
| "\n", |
| "Iceberg uses a catalog to organize tables. For this example, we'll use a `SqlCatalog` with SQLite for local testing." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "649053ed", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "# Import required libraries\n", |
| "import os\n", |
| "import tempfile\n", |
| "\n", |
| "import pyarrow.compute as pc" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "72d5da19", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "# Create a temporary warehouse location\n", |
| "warehouse_path = tempfile.mkdtemp(prefix=\"iceberg_warehouse_\")\n", |
| "print(f\"Warehouse location: {warehouse_path}\")" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "14e9429c", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "# Configure and load the catalog\n", |
| "catalog = load_catalog(\n", |
| " \"default\",\n", |
| " type=\"sql\",\n", |
| " uri=f\"sqlite:///{warehouse_path}/pyiceberg_catalog.db\",\n", |
| " warehouse=f\"file://{warehouse_path}\",\n", |
| ")\n", |
| "\n", |
| "print(\"Catalog loaded successfully!\")\n", |
| "print(f\"Namespaces: {list(catalog.list_namespaces())}\")" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "e330d377", |
| "metadata": {}, |
| "source": [ |
| "## Create a Namespace and Table\n", |
| "\n", |
| "Let's create a namespace and a simple Iceberg table." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "90312e03", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "# Create a namespace\n", |
| "catalog.create_namespace(\"default\")\n", |
| "print(f\"Available namespaces: {list(catalog.list_namespaces())}\")" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "f96438ef", |
| "metadata": {}, |
| "source": [ |
| "## Write Data to an Iceberg Table\n", |
| "\n", |
| "We'll create a sample dataset and write it to an Iceberg table." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "2ef11eb9", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "# Create sample data using PyArrow\n", |
| "\n", |
| "# Sample taxi-like data\n", |
| "data = {\n", |
| " \"vendor_id\": [1, 2, 1, 2, 1],\n", |
| " \"trip_distance\": [1.5, 2.3, 0.8, 5.2, 3.1],\n", |
| " \"fare_amount\": [10.0, 15.5, 6.0, 22.0, 18.0],\n", |
| " \"tip_amount\": [2.0, 3.0, 1.0, 4.5, 3.5],\n", |
| " \"passenger_count\": [1, 2, 1, 3, 2],\n", |
| "}\n", |
| "\n", |
| "df = pa.table(data)\n", |
| "print(\"Sample data:\")\n", |
| "print(df)" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "678d122a", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "# Create an Iceberg table with the schema from our dataframe\n", |
| "table = catalog.create_table(\n", |
| " \"default.sample_trips\",\n", |
| " schema=df.schema,\n", |
| ")\n", |
| "\n", |
| "print(f\"Created table: {table}\")\n", |
| "print(f\"Table schema: {table.schema()}\")" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "8e135b2a", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "# Append data to the table\n", |
| "table.append(df)\n", |
| "print(f\"Rows written: {len(table.scan().to_arrow())}\")" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "0ef43fbf", |
| "metadata": {}, |
| "source": [ |
| "## Read Data from the Table\n", |
| "\n", |
| "Let's read back the data we just wrote." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "d1ef0396", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "# Scan and read the entire table\n", |
| "result = table.scan().to_arrow()\n", |
| "print(\"Table contents:\")\n", |
| "print(result)" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "a8a3c906", |
| "metadata": {}, |
| "source": [ |
| "## Schema Evolution\n", |
| "\n", |
| "One of Iceberg's powerful features is schema evolution. Let's add a new computed column." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "8725f3ce", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "# Add a new computed column: tip per mile\n", |
| "df = df.append_column(\"tip_per_mile\", pc.divide(df[\"tip_amount\"], df[\"trip_distance\"]))\n", |
| "print(\"Updated dataframe with new column:\")\n", |
| "print(df)" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "e0c4550b", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "# Evolve the table schema to include the new column\n", |
| "with table.update_schema() as update_schema:\n", |
| " update_schema.union_by_name(df.schema)\n", |
| "\n", |
| "print(\"Schema evolved!\")\n", |
| "print(f\"Updated table schema: {table.schema()}\")" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "2a65eee4", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "# Overwrite the table with the new data\n", |
| "table.overwrite(df)\n", |
| "print(\"Data overwritten with new schema\")\n", |
| "\n", |
| "# Verify the new column exists\n", |
| "result = table.scan().to_arrow()\n", |
| "print(result)" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "7140ba0c", |
| "metadata": {}, |
| "source": [ |
| "## Filtering Data\n", |
| "\n", |
| "PyIceberg supports predicate pushdown for efficient data filtering." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "3af0f0b1", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "# Filter rows where tip_per_mile > 1.0\n", |
| "filtered_df = table.scan(row_filter=\"tip_per_mile > 1.0\").to_arrow()\n", |
| "print(f\"Rows with tip_per_mile > 1.0: {len(filtered_df)}\")\n", |
| "print(filtered_df)" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "ff173f80", |
| "metadata": {}, |
| "source": [ |
| "## Inspect Table Metadata\n", |
| "\n", |
| "Iceberg tables maintain rich metadata about their structure and history." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "e3763e27", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "# View table properties\n", |
| "print(f\"Table location: {table.location()}\")\n", |
| "print(f\"Table properties: {table.properties}\")\n", |
| "print(f\"Current snapshot ID: {table.current_snapshot()}\")" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "49154477", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "# View table history (snapshots)\n", |
| "print(\"Table history:\")\n", |
| "for snapshot in table.history():\n", |
| " print(f\" Snapshot: {snapshot}\")" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "448a1962", |
| "metadata": {}, |
| "source": [ |
| "## Explore Data Files\n", |
| "\n", |
| "Let's see what files Iceberg created in the warehouse." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "3c8948e5", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "# List all files in the warehouse\n", |
| "for root, _dirs, files in os.walk(warehouse_path):\n", |
| " level = root.replace(warehouse_path, \"\").count(os.sep)\n", |
| " indent = \" \" * 2 * level\n", |
| " print(f\"{indent}{os.path.basename(root)}/\")\n", |
| " subindent = \" \" * 2 * (level + 1)\n", |
| " for file in files:\n", |
| " print(f\"{subindent}{file}\")" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "e9db29ad", |
| "metadata": {}, |
| "source": [ |
| "## Additional Operations\n", |
| "\n", |
| "PyIceberg supports many more operations including:\n", |
| "- Time travel queries\n", |
| "- Partition evolution\n", |
| "- Table maintenance (expire snapshots, rewrite data files)\n", |
| "- Integration with pandas, DuckDB, Ray, and more\n", |
| "\n", |
| "Check the [PyIceberg documentation](https://py.iceberg.apache.org/) for more details!" |
| ] |
| } |
| ], |
| "metadata": { |
| "kernelspec": { |
| "display_name": "Python 3 (ipykernel)", |
| "language": "python", |
| "name": "python3" |
| }, |
| "language_info": { |
| "codemirror_mode": { |
| "name": "ipython", |
| "version": 3 |
| }, |
| "file_extension": ".py", |
| "mimetype": "text/x-python", |
| "name": "python", |
| "nbconvert_exporter": "python", |
| "pygments_lexer": "ipython3", |
| "version": "3.12.9" |
| } |
| }, |
| "nbformat": 4, |
| "nbformat_minor": 5 |
| } |