blob: 2db4853ad0e4cecf795f028cab7a916e79ac15c4 [file] [log] [blame]
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "c6cc20c0",
"metadata": {},
"outputs": [],
"source": [
"# Import required libraries\n",
"import pyarrow as pa\n",
"\n",
"import pyiceberg\n",
"from pyiceberg.catalog import load_catalog\n",
"\n",
"print(f\"PyIceberg version: {pyiceberg.__version__}\")"
]
},
{
"cell_type": "markdown",
"id": "33dcfca9",
"metadata": {},
"source": [
"## Setup: Connecting to a Catalog\n",
"\n",
"Iceberg uses a catalog to organize tables. For this example, we'll use a `SqlCatalog` with SQLite for local testing."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "649053ed",
"metadata": {},
"outputs": [],
"source": [
"# Import required libraries\n",
"import os\n",
"import tempfile\n",
"\n",
"import pyarrow.compute as pc"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "72d5da19",
"metadata": {},
"outputs": [],
"source": [
"# Create a temporary warehouse location\n",
"warehouse_path = tempfile.mkdtemp(prefix=\"iceberg_warehouse_\")\n",
"print(f\"Warehouse location: {warehouse_path}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "14e9429c",
"metadata": {},
"outputs": [],
"source": [
"# Configure and load the catalog\n",
"catalog = load_catalog(\n",
" \"default\",\n",
" type=\"sql\",\n",
" uri=f\"sqlite:///{warehouse_path}/pyiceberg_catalog.db\",\n",
" warehouse=f\"file://{warehouse_path}\",\n",
")\n",
"\n",
"print(\"Catalog loaded successfully!\")\n",
"print(f\"Namespaces: {list(catalog.list_namespaces())}\")"
]
},
{
"cell_type": "markdown",
"id": "e330d377",
"metadata": {},
"source": [
"## Create a Namespace and Table\n",
"\n",
"Let's create a namespace and a simple Iceberg table."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "90312e03",
"metadata": {},
"outputs": [],
"source": [
"# Create a namespace\n",
"catalog.create_namespace(\"default\")\n",
"print(f\"Available namespaces: {list(catalog.list_namespaces())}\")"
]
},
{
"cell_type": "markdown",
"id": "f96438ef",
"metadata": {},
"source": [
"## Write Data to an Iceberg Table\n",
"\n",
"We'll create a sample dataset and write it to an Iceberg table."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2ef11eb9",
"metadata": {},
"outputs": [],
"source": [
"# Create sample data using PyArrow\n",
"\n",
"# Sample taxi-like data\n",
"data = {\n",
" \"vendor_id\": [1, 2, 1, 2, 1],\n",
" \"trip_distance\": [1.5, 2.3, 0.8, 5.2, 3.1],\n",
" \"fare_amount\": [10.0, 15.5, 6.0, 22.0, 18.0],\n",
" \"tip_amount\": [2.0, 3.0, 1.0, 4.5, 3.5],\n",
" \"passenger_count\": [1, 2, 1, 3, 2],\n",
"}\n",
"\n",
"df = pa.table(data)\n",
"print(\"Sample data:\")\n",
"print(df)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "678d122a",
"metadata": {},
"outputs": [],
"source": [
"# Create an Iceberg table with the schema from our dataframe\n",
"table = catalog.create_table(\n",
" \"default.sample_trips\",\n",
" schema=df.schema,\n",
")\n",
"\n",
"print(f\"Created table: {table}\")\n",
"print(f\"Table schema: {table.schema()}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8e135b2a",
"metadata": {},
"outputs": [],
"source": [
"# Append data to the table\n",
"table.append(df)\n",
"print(f\"Rows written: {len(table.scan().to_arrow())}\")"
]
},
{
"cell_type": "markdown",
"id": "0ef43fbf",
"metadata": {},
"source": [
"## Read Data from the Table\n",
"\n",
"Let's read back the data we just wrote."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d1ef0396",
"metadata": {},
"outputs": [],
"source": [
"# Scan and read the entire table\n",
"result = table.scan().to_arrow()\n",
"print(\"Table contents:\")\n",
"print(result)"
]
},
{
"cell_type": "markdown",
"id": "a8a3c906",
"metadata": {},
"source": [
"## Schema Evolution\n",
"\n",
"One of Iceberg's powerful features is schema evolution. Let's add a new computed column."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8725f3ce",
"metadata": {},
"outputs": [],
"source": [
"# Add a new computed column: tip per mile\n",
"df = df.append_column(\"tip_per_mile\", pc.divide(df[\"tip_amount\"], df[\"trip_distance\"]))\n",
"print(\"Updated dataframe with new column:\")\n",
"print(df)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e0c4550b",
"metadata": {},
"outputs": [],
"source": [
"# Evolve the table schema to include the new column\n",
"with table.update_schema() as update_schema:\n",
" update_schema.union_by_name(df.schema)\n",
"\n",
"print(\"Schema evolved!\")\n",
"print(f\"Updated table schema: {table.schema()}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2a65eee4",
"metadata": {},
"outputs": [],
"source": [
"# Overwrite the table with the new data\n",
"table.overwrite(df)\n",
"print(\"Data overwritten with new schema\")\n",
"\n",
"# Verify the new column exists\n",
"result = table.scan().to_arrow()\n",
"print(result)"
]
},
{
"cell_type": "markdown",
"id": "7140ba0c",
"metadata": {},
"source": [
"## Filtering Data\n",
"\n",
"PyIceberg supports predicate pushdown for efficient data filtering."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3af0f0b1",
"metadata": {},
"outputs": [],
"source": [
"# Filter rows where tip_per_mile > 1.0\n",
"filtered_df = table.scan(row_filter=\"tip_per_mile > 1.0\").to_arrow()\n",
"print(f\"Rows with tip_per_mile > 1.0: {len(filtered_df)}\")\n",
"print(filtered_df)"
]
},
{
"cell_type": "markdown",
"id": "ff173f80",
"metadata": {},
"source": [
"## Inspect Table Metadata\n",
"\n",
"Iceberg tables maintain rich metadata about their structure and history."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e3763e27",
"metadata": {},
"outputs": [],
"source": [
"# View table properties\n",
"print(f\"Table location: {table.location()}\")\n",
"print(f\"Table properties: {table.properties}\")\n",
"print(f\"Current snapshot ID: {table.current_snapshot()}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "49154477",
"metadata": {},
"outputs": [],
"source": [
"# View table history (snapshots)\n",
"print(\"Table history:\")\n",
"for snapshot in table.history():\n",
" print(f\" Snapshot: {snapshot}\")"
]
},
{
"cell_type": "markdown",
"id": "448a1962",
"metadata": {},
"source": [
"## Explore Data Files\n",
"\n",
"Let's see what files Iceberg created in the warehouse."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3c8948e5",
"metadata": {},
"outputs": [],
"source": [
"# List all files in the warehouse\n",
"for root, _dirs, files in os.walk(warehouse_path):\n",
" level = root.replace(warehouse_path, \"\").count(os.sep)\n",
" indent = \" \" * 2 * level\n",
" print(f\"{indent}{os.path.basename(root)}/\")\n",
" subindent = \" \" * 2 * (level + 1)\n",
" for file in files:\n",
" print(f\"{subindent}{file}\")"
]
},
{
"cell_type": "markdown",
"id": "e9db29ad",
"metadata": {},
"source": [
"## Additional Operations\n",
"\n",
"PyIceberg supports many more operations including:\n",
"- Time travel queries\n",
"- Partition evolution\n",
"- Table maintenance (expire snapshots, rewrite data files)\n",
"- Integration with pandas, DuckDB, Ray, and more\n",
"\n",
"Check the [PyIceberg documentation](https://py.iceberg.apache.org/) for more details!"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.9"
}
},
"nbformat": 4,
"nbformat_minor": 5
}