blob: 53801922fc33efa0137237984d402aa64e2425f9 [file] [log] [blame] [view]
---
hide:
- navigation
---
<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-->
# Python API
PyIceberg is based around catalogs to load tables. First step is to instantiate a catalog that loads tables. Let's use the following configuration to define a catalog called `prod`:
```yaml
catalog:
prod:
uri: http://rest-catalog/ws/
credential: t-1234:secret
```
Note that multiple catalogs can be defined in the same `.pyiceberg.yaml`:
```yaml
catalog:
hive:
uri: thrift://127.0.0.1:9083
s3.endpoint: http://127.0.0.1:9000
s3.access-key-id: admin
s3.secret-access-key: password
rest:
uri: https://rest-server:8181/
warehouse: my-warehouse
```
and loaded in python by calling `load_catalog(name="hive")` and `load_catalog(name="rest")`.
This information must be placed inside a file called `.pyiceberg.yaml` located either in the `$HOME` or `%USERPROFILE%` directory (depending on whether the operating system is Unix-based or Windows-based, respectively) or in the `$PYICEBERG_HOME` directory (if the corresponding environment variable is set).
For more details on possible configurations refer to the [specific page](https://py.iceberg.apache.org/configuration/).
Then load the `prod` catalog:
```python
from pyiceberg.catalog import load_catalog
catalog = load_catalog(
"docs",
**{
"uri": "http://127.0.0.1:8181",
"s3.endpoint": "http://127.0.0.1:9000",
"py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",
"s3.access-key-id": "admin",
"s3.secret-access-key": "password",
}
)
```
Let's create a namespace:
```python
catalog.create_namespace("docs_example")
```
And then list them:
```python
ns = catalog.list_namespaces()
assert ns == [("docs_example",)]
```
And then list tables in the namespace:
```python
catalog.list_tables("docs_example")
```
## Create a table
To create a table from a catalog:
```python
from pyiceberg.schema import Schema
from pyiceberg.types import (
TimestampType,
FloatType,
DoubleType,
StringType,
NestedField,
StructType,
)
schema = Schema(
NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=True),
NestedField(field_id=2, name="symbol", field_type=StringType(), required=True),
NestedField(field_id=3, name="bid", field_type=FloatType(), required=False),
NestedField(field_id=4, name="ask", field_type=DoubleType(), required=False),
NestedField(
field_id=5,
name="details",
field_type=StructType(
NestedField(
field_id=4, name="created_by", field_type=StringType(), required=False
),
),
required=False,
),
)
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import DayTransform
partition_spec = PartitionSpec(
PartitionField(
source_id=1, field_id=1000, transform=DayTransform(), name="datetime_day"
)
)
from pyiceberg.table.sorting import SortOrder, SortField
from pyiceberg.transforms import IdentityTransform
# Sort on the symbol
sort_order = SortOrder(SortField(source_id=2, transform=IdentityTransform()))
catalog.create_table(
identifier="docs_example.bids",
schema=schema,
location="s3://pyiceberg",
partition_spec=partition_spec,
sort_order=sort_order,
)
```
To create a table using a pyarrow schema:
```python
import pyarrow as pa
schema = pa.schema(
[
pa.field("foo", pa.string(), nullable=True),
pa.field("bar", pa.int32(), nullable=False),
pa.field("baz", pa.bool_(), nullable=True),
]
)
catalog.create_table(
identifier="docs_example.bids",
schema=schema,
)
```
## Load a table
### Catalog table
Loading the `bids` table:
```python
table = catalog.load_table("docs_example.bids")
# Equivalent to:
table = catalog.load_table(("docs_example", "bids"))
# The tuple syntax can be used if the namespace or table contains a dot.
```
This returns a `Table` that represents an Iceberg table that can be queried and altered.
### Static table
To load a table directly from a metadata file (i.e., **without** using a catalog), you can use a `StaticTable` as follows:
```python
from pyiceberg.table import StaticTable
static_table = StaticTable.from_metadata(
"s3://warehouse/wh/nyc.db/taxis/metadata/00002-6ea51ce3-62aa-4197-9cf8-43d07c3440ca.metadata.json"
)
```
The static-table is considered read-only.
## Write support
With PyIceberg 0.6.0 write support is added through Arrow. Let's consider an Arrow Table:
```python
import pyarrow as pa
df = pa.Table.from_pylist(
[
{"city": "Amsterdam", "lat": 52.371807, "long": 4.896029},
{"city": "San Francisco", "lat": 37.773972, "long": -122.431297},
{"city": "Drachten", "lat": 53.11254, "long": 6.0989},
{"city": "Paris", "lat": 48.864716, "long": 2.349014},
],
)
```
Next, create a table based on the schema:
```python
from pyiceberg.catalog import load_catalog
catalog = load_catalog("default")
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType, DoubleType
schema = Schema(
NestedField(1, "city", StringType(), required=False),
NestedField(2, "lat", DoubleType(), required=False),
NestedField(3, "long", DoubleType(), required=False),
)
tbl = catalog.create_table("default.cities", schema=schema)
```
Now write the data to the table:
<!-- prettier-ignore-start -->
!!! note inline end "Fast append"
PyIceberg default to the [fast append](https://iceberg.apache.org/spec/#snapshots) to minimize the amount of data written. This enables quick writes, reducing the possibility of conflicts. The downside of the fast append is that it creates more metadata than a normal commit. [Compaction is planned](https://github.com/apache/iceberg-python/issues/270) and will automatically rewrite all the metadata when a threshold is hit, to maintain performant reads.
<!-- prettier-ignore-end -->
```python
tbl.append(df)
# or
tbl.overwrite(df)
```
The data is written to the table, and when the table is read using `tbl.scan().to_arrow()`:
```
pyarrow.Table
city: string
lat: double
long: double
----
city: [["Amsterdam","San Francisco","Drachten","Paris"]]
lat: [[52.371807,37.773972,53.11254,48.864716]]
long: [[4.896029,-122.431297,6.0989,2.349014]]
```
You both can use `append(df)` or `overwrite(df)` since there is no data yet. If we want to add more data, we can use `.append()` again:
```python
df = pa.Table.from_pylist(
[{"city": "Groningen", "lat": 53.21917, "long": 6.56667}],
)
tbl.append(df)
```
When reading the table `tbl.scan().to_arrow()` you can see that `Groningen` is now also part of the table:
```
pyarrow.Table
city: string
lat: double
long: double
----
city: [["Amsterdam","San Francisco","Drachten","Paris"],["Groningen"]]
lat: [[52.371807,37.773972,53.11254,48.864716],[53.21917]]
long: [[4.896029,-122.431297,6.0989,2.349014],[6.56667]]
```
The nested lists indicate the different Arrow buffers, where the first write results into a buffer, and the second append in a separate buffer. This is expected since it will read two parquet files.
<!-- prettier-ignore-start -->
!!! example "Under development"
Writing using PyIceberg is still under development. Support for [partial overwrites](https://github.com/apache/iceberg-python/issues/268) and writing to [partitioned tables](https://github.com/apache/iceberg-python/issues/208) is planned and being worked on.
<!-- prettier-ignore-end -->
## Schema evolution
PyIceberg supports full schema evolution through the Python API. It takes care of setting the field-IDs and makes sure that only non-breaking changes are done (can be overriden).
In the examples below, the `.update_schema()` is called from the table itself.
```python
with table.update_schema() as update:
update.add_column("some_field", IntegerType(), "doc")
```
You can also initiate a transaction if you want to make more changes than just evolving the schema:
```python
with table.transaction() as transaction:
with transaction.update_schema() as update_schema:
update.add_column("some_other_field", IntegerType(), "doc")
# ... Update properties etc
```
### Union by Name
Using `.union_by_name()` you can merge another schema into an existing schema without having to worry about field-IDs:
```python
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType, DoubleType, LongType
catalog = load_catalog()
schema = Schema(
NestedField(1, "city", StringType(), required=False),
NestedField(2, "lat", DoubleType(), required=False),
NestedField(3, "long", DoubleType(), required=False),
)
table = catalog.create_table("default.locations", schema)
new_schema = Schema(
NestedField(1, "city", StringType(), required=False),
NestedField(2, "lat", DoubleType(), required=False),
NestedField(3, "long", DoubleType(), required=False),
NestedField(10, "population", LongType(), required=False),
)
with table.update_schema() as update:
update.union_by_name(new_schema)
```
Now the table has the union of the two schemas `print(table.schema())`:
```
table {
1: city: optional string
2: lat: optional double
3: long: optional double
4: population: optional long
}
```
### Add column
Using `add_column` you can add a column, without having to worry about the field-id:
```python
with table.update_schema() as update:
update.add_column("retries", IntegerType(), "Number of retries to place the bid")
# In a struct
update.add_column("details.confirmed_by", StringType(), "Name of the exchange")
```
### Rename column
Renaming a field in an Iceberg table is simple:
```python
with table.update_schema() as update:
update.rename_column("retries", "num_retries")
# This will rename `confirmed_by` to `exchange`
update.rename_column("properties.confirmed_by", "exchange")
```
### Move column
Move a field inside of struct:
```python
with table.update_schema() as update:
update.move_first("symbol")
update.move_after("bid", "ask")
# This will move `confirmed_by` before `exchange`
update.move_before("details.created_by", "details.exchange")
```
### Update column
Update a fields' type, description or required.
```python
with table.update_schema() as update:
# Promote a float to a double
update.update_column("bid", field_type=DoubleType())
# Make a field optional
update.update_column("symbol", required=False)
# Update the documentation
update.update_column("symbol", doc="Name of the share on the exchange")
```
Be careful, some operations are not compatible, but can still be done at your own risk by setting `allow_incompatible_changes`:
```python
with table.update_schema(allow_incompatible_changes=True) as update:
# Incompatible change, cannot require an optional field
update.update_column("symbol", required=True)
```
### Delete column
Delete a field, careful this is a incompatible change (readers/writers might expect this field):
```python
with table.update_schema(allow_incompatible_changes=True) as update:
update.delete_column("some_field")
```
## Table properties
Set and remove properties through the `Transaction` API:
```python
with table.transaction() as transaction:
transaction.set_properties(abc="def")
assert table.properties == {"abc": "def"}
with table.transaction() as transaction:
transaction.remove_properties("abc")
assert table.properties == {}
```
Or, without context manager:
```python
table = table.transaction().set_properties(abc="def").commit_transaction()
assert table.properties == {"abc": "def"}
table = table.transaction().remove_properties("abc").commit_transaction()
assert table.properties == {}
```
## Query the data
To query a table, a table scan is needed. A table scan accepts a filter, columns, optionally a limit and a snapshot ID:
```python
from pyiceberg.catalog import load_catalog
from pyiceberg.expressions import GreaterThanOrEqual
catalog = load_catalog("default")
table = catalog.load_table("nyc.taxis")
scan = table.scan(
row_filter=GreaterThanOrEqual("trip_distance", 10.0),
selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
limit=100,
)
# Or filter using a string predicate
scan = table.scan(
row_filter="trip_distance > 10.0",
)
[task.file.file_path for task in scan.plan_files()]
```
The low level API `plan_files` methods returns a set of tasks that provide the files that might contain matching rows:
```json
[
"s3://warehouse/wh/nyc/taxis/data/00003-4-42464649-92dd-41ad-b83b-dea1a2fe4b58-00001.parquet"
]
```
In this case it is up to the engine itself to filter the file itself. Below, `to_arrow()` and `to_duckdb()` that already do this for you.
### Apache Arrow
<!-- prettier-ignore-start -->
!!! note "Requirements"
This requires [`pyarrow` to be installed](index.md).
<!-- prettier-ignore-end -->
Using PyIceberg it is filter out data from a huge table and pull it into a PyArrow table:
```python
table.scan(
row_filter=GreaterThanOrEqual("trip_distance", 10.0),
selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
).to_arrow()
```
This will return a PyArrow table:
```
pyarrow.Table
VendorID: int64
tpep_pickup_datetime: timestamp[us, tz=+00:00]
tpep_dropoff_datetime: timestamp[us, tz=+00:00]
----
VendorID: [[2,1,2,1,1,...,2,2,2,2,2],[2,1,1,1,2,...,1,1,2,1,2],...,[2,2,2,2,2,...,2,6,6,2,2],[2,2,2,2,2,...,2,2,2,2,2]]
tpep_pickup_datetime: [[2021-04-01 00:28:05.000000,...,2021-04-30 23:44:25.000000]]
tpep_dropoff_datetime: [[2021-04-01 00:47:59.000000,...,2021-05-01 00:14:47.000000]]
```
This will only pull in the files that that might contain matching rows.
### Pandas
<!-- prettier-ignore-start -->
!!! note "Requirements"
This requires [`pandas` to be installed](index.md).
<!-- prettier-ignore-end -->
PyIceberg makes it easy to filter out data from a huge table and pull it into a Pandas dataframe locally. This will only fetch the relevant Parquet files for the query and apply the filter. This will reduce IO and therefore improve performance and reduce cost.
```python
table.scan(
row_filter="trip_distance >= 10.0",
selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
).to_pandas()
```
This will return a Pandas dataframe:
```
VendorID tpep_pickup_datetime tpep_dropoff_datetime
0 2 2021-04-01 00:28:05+00:00 2021-04-01 00:47:59+00:00
1 1 2021-04-01 00:39:01+00:00 2021-04-01 00:57:39+00:00
2 2 2021-04-01 00:14:42+00:00 2021-04-01 00:42:59+00:00
3 1 2021-04-01 00:17:17+00:00 2021-04-01 00:43:38+00:00
4 1 2021-04-01 00:24:04+00:00 2021-04-01 00:56:20+00:00
... ... ... ...
116976 2 2021-04-30 23:56:18+00:00 2021-05-01 00:29:13+00:00
116977 2 2021-04-30 23:07:41+00:00 2021-04-30 23:37:18+00:00
116978 2 2021-04-30 23:38:28+00:00 2021-05-01 00:12:04+00:00
116979 2 2021-04-30 23:33:00+00:00 2021-04-30 23:59:00+00:00
116980 2 2021-04-30 23:44:25+00:00 2021-05-01 00:14:47+00:00
[116981 rows x 3 columns]
```
It is recommended to use Pandas 2 or later, because it stores the data in an [Apache Arrow backend](https://datapythonista.me/blog/pandas-20-and-the-arrow-revolution-part-i) which avoids copies of data.
### DuckDB
<!-- prettier-ignore-start -->
!!! note "Requirements"
This requires [DuckDB to be installed](index.md).
<!-- prettier-ignore-end -->
A table scan can also be converted into a in-memory DuckDB table:
```python
con = table.scan(
row_filter=GreaterThanOrEqual("trip_distance", 10.0),
selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
).to_duckdb(table_name="distant_taxi_trips")
```
Using the cursor that we can run queries on the DuckDB table:
```python
print(
con.execute(
"SELECT tpep_dropoff_datetime - tpep_pickup_datetime AS duration FROM distant_taxi_trips LIMIT 4"
).fetchall()
)
[
(datetime.timedelta(seconds=1194),),
(datetime.timedelta(seconds=1118),),
(datetime.timedelta(seconds=1697),),
(datetime.timedelta(seconds=1581),),
]
```
### Ray
<!-- prettier-ignore-start -->
!!! note "Requirements"
This requires [Ray to be installed](index.md).
<!-- prettier-ignore-end -->
A table scan can also be converted into a Ray dataset:
```python
ray_dataset = table.scan(
row_filter=GreaterThanOrEqual("trip_distance", 10.0),
selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
).to_ray()
```
This will return a Ray dataset:
```
Dataset(
num_blocks=1,
num_rows=1168798,
schema={
VendorID: int64,
tpep_pickup_datetime: timestamp[us, tz=UTC],
tpep_dropoff_datetime: timestamp[us, tz=UTC]
}
)
```
Using [Ray Dataset API](https://docs.ray.io/en/latest/data/api/dataset.html) to interact with the dataset:
```python
print(ray_dataset.take(2))
[
{
"VendorID": 2,
"tpep_pickup_datetime": datetime.datetime(2008, 12, 31, 23, 23, 50),
"tpep_dropoff_datetime": datetime.datetime(2009, 1, 1, 0, 34, 31),
},
{
"VendorID": 2,
"tpep_pickup_datetime": datetime.datetime(2008, 12, 31, 23, 5, 3),
"tpep_dropoff_datetime": datetime.datetime(2009, 1, 1, 16, 10, 18),
},
]
```
### Daft
PyIceberg interfaces closely with Daft Dataframes (see also: [Daft integration with Iceberg](https://www.getdaft.io/projects/docs/en/latest/user_guide/integrations/iceberg.html)) which provides a full lazily optimized query engine interface on top of PyIceberg tables.
<!-- prettier-ignore-start -->
!!! note "Requirements"
This requires [Daft to be installed](index.md).
<!-- prettier-ignore-end -->
A table can be read easily into a Daft Dataframe:
```python
df = table.to_daft() # equivalent to `daft.read_iceberg(table)`
df = df.where(df["trip_distance"] >= 10.0)
df = df.select("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime")
```
This returns a Daft Dataframe which is lazily materialized. Printing `df` will display the schema:
```
╭──────────┬───────────────────────────────┬───────────────────────────────╮
│ VendorID ┆ tpep_pickup_datetime ┆ tpep_dropoff_datetime │
│ --- ┆ --- ┆ --- │
│ Int64 ┆ Timestamp(Microseconds, None) ┆ Timestamp(Microseconds, None) │
╰──────────┴───────────────────────────────┴───────────────────────────────╯
(No data to display: Dataframe not materialized)
```
We can execute the Dataframe to preview the first few rows of the query with `df.show()`.
This is correctly optimized to take advantage of Iceberg features such as hidden partitioning and file-level statistics for efficient reads.
```python
df.show(2)
```
```
╭──────────┬───────────────────────────────┬───────────────────────────────╮
│ VendorID ┆ tpep_pickup_datetime ┆ tpep_dropoff_datetime │
│ --- ┆ --- ┆ --- │
│ Int64 ┆ Timestamp(Microseconds, None) ┆ Timestamp(Microseconds, None) │
╞══════════╪═══════════════════════════════╪═══════════════════════════════╡
│ 2 ┆ 2008-12-31T23:23:50.000000 ┆ 2009-01-01T00:34:31.000000 │
├╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 2 ┆ 2008-12-31T23:05:03.000000 ┆ 2009-01-01T16:10:18.000000 │
╰──────────┴───────────────────────────────┴───────────────────────────────╯
(Showing first 2 rows)
```