hide:
PyIceberg is based around catalogs to load tables. First step is to instantiate a catalog that loads tables. Let's use the following configuration to define a catalog called prod:
catalog: prod: uri: http://rest-catalog/ws/ credential: t-1234:secret
This information must be placed inside a file called .pyiceberg.yaml located either in the $HOME or %USERPROFILE% directory (depending on whether the operating system is Unix-based or Windows-based, respectively) or in the $PYICEBERG_HOME directory (if the corresponding environment variable is set).
For more details on possible configurations refer to the specific page.
Then load the prod catalog:
from pyiceberg.catalog import load_catalog catalog = load_catalog( "docs", **{ "uri": "http://127.0.0.1:8181", "s3.endpoint": "http://127.0.0.1:9000", "py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO", "s3.access-key-id": "admin", "s3.secret-access-key": "password", } )
Let's create a namespace:
catalog.create_namespace("docs_example")
And then list them:
ns = catalog.list_namespaces() assert ns == [("docs_example",)]
And then list tables in the namespace:
catalog.list_tables("docs_example")
To create a table from a catalog:
from pyiceberg.schema import Schema from pyiceberg.types import ( TimestampType, FloatType, DoubleType, StringType, NestedField, StructType, ) schema = Schema( NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=True), NestedField(field_id=2, name="symbol", field_type=StringType(), required=True), NestedField(field_id=3, name="bid", field_type=FloatType(), required=False), NestedField(field_id=4, name="ask", field_type=DoubleType(), required=False), NestedField( field_id=5, name="details", field_type=StructType( NestedField( field_id=4, name="created_by", field_type=StringType(), required=False ), ), required=False, ), ) from pyiceberg.partitioning import PartitionSpec, PartitionField from pyiceberg.transforms import DayTransform partition_spec = PartitionSpec( PartitionField( source_id=1, field_id=1000, transform=DayTransform(), name="datetime_day" ) ) from pyiceberg.table.sorting import SortOrder, SortField from pyiceberg.transforms import IdentityTransform # Sort on the symbol sort_order = SortOrder(SortField(source_id=2, transform=IdentityTransform())) catalog.create_table( identifier="docs_example.bids", schema=schema, partition_spec=partition_spec, sort_order=sort_order, )
Loading the bids table:
table = catalog.load_table("docs_example.bids") # Equivalent to: table = catalog.load_table(("docs_example", "bids")) # The tuple syntax can be used if the namespace or table contains a dot.
This returns a Table that represents an Iceberg table that can be queried and altered.
To load a table directly from a metadata file (i.e., without using a catalog), you can use a StaticTable as follows:
from pyiceberg.table import StaticTable static_table = StaticTable.from_metadata( "s3://warehouse/wh/nyc.db/taxis/metadata/00002-6ea51ce3-62aa-4197-9cf8-43d07c3440ca.metadata.json" )
The static-table is considered read-only.
PyIceberg supports full schema evolution through the Python API. It takes care of setting the field-IDs and makes sure that only non-breaking changes are done (can be overriden).
In the examples below, the .update_schema() is called from the table itself.
with table.update_schema() as update: update.add_column("some_field", IntegerType(), "doc")
You can also initiate a transaction if you want to make more changes than just evolving the schema:
with table.transaction() as transaction: with transaction.update_schema() as update_schema: update.add_column("some_other_field", IntegerType(), "doc") # ... Update properties etc
Using add_column you can add a column, without having to worry about the field-id:
with table.update_schema() as update: update.add_column("retries", IntegerType(), "Number of retries to place the bid") # In a struct update.add_column("details.confirmed_by", StringType(), "Name of the exchange")
Renaming a field in an Iceberg table is simple:
with table.update_schema() as update: update.rename("retries", "num_retries") # This will rename `confirmed_by` to `exchange` update.rename("properties.confirmed_by", "exchange")
Move a field inside of struct:
with table.update_schema() as update: update.move_first("symbol") update.move_after("bid", "ask") # This will move `confirmed_by` before `exchange` update.move_before("details.created_by", "details.exchange")
Update a fields' type, description or required.
with table.update_schema() as update: # Promote a float to a double update.update_column("bid", field_type=DoubleType()) # Make a field optional update.update_column("symbol", required=False) # Update the documentation update.update_column("symbol", doc="Name of the share on the exchange")
Be careful, some operations are not compatible, but can still be done at your own risk by setting allow_incompatible_changes:
with table.update_schema(allow_incompatible_changes=True) as update: # Incompatible change, cannot require an optional field update.update_column("symbol", required=True)
Delete a field, careful this is a incompatible change (readers/writers might expect this field):
with table.update_schema(allow_incompatible_changes=True) as update: update.delete_column("some_field")
Set and remove properties through the Transaction API:
with table.transaction() as transaction: transaction.set_properties(abc="def") assert table.properties == {"abc": "def"} with table.transaction() as transaction: transaction.remove_properties("abc") assert table.properties == {}
Or, without context manager:
table = table.transaction().set_properties(abc="def").commit_transaction() assert table.properties == {"abc": "def"} table = table.transaction().remove_properties("abc").commit_transaction() assert table.properties == {}
To query a table, a table scan is needed. A table scan accepts a filter, columns, optionally a limit and a snapshot ID:
from pyiceberg.catalog import load_catalog from pyiceberg.expressions import GreaterThanOrEqual catalog = load_catalog("default") table = catalog.load_table("nyc.taxis") scan = table.scan( row_filter=GreaterThanOrEqual("trip_distance", 10.0), selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"), limit=100, ) # Or filter using a string predicate scan = table.scan( row_filter="trip_distance > 10.0", ) [task.file.file_path for task in scan.plan_files()]
The low level API plan_files methods returns a set of tasks that provide the files that might contain matching rows:
[ "s3://warehouse/wh/nyc/taxis/data/00003-4-42464649-92dd-41ad-b83b-dea1a2fe4b58-00001.parquet" ]
In this case it is up to the engine itself to filter the file itself. Below, to_arrow() and to_duckdb() that already do this for you.
!!! note “Requirements” This requires PyArrow to be installed.
Using PyIceberg it is filter out data from a huge table and pull it into a PyArrow table:
table.scan( row_filter=GreaterThanOrEqual("trip_distance", 10.0), selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"), ).to_arrow()
This will return a PyArrow table:
pyarrow.Table VendorID: int64 tpep_pickup_datetime: timestamp[us, tz=+00:00] tpep_dropoff_datetime: timestamp[us, tz=+00:00] ---- VendorID: [[2,1,2,1,1,...,2,2,2,2,2],[2,1,1,1,2,...,1,1,2,1,2],...,[2,2,2,2,2,...,2,6,6,2,2],[2,2,2,2,2,...,2,2,2,2,2]] tpep_pickup_datetime: [[2021-04-01 00:28:05.000000,...,2021-04-30 23:44:25.000000]] tpep_dropoff_datetime: [[2021-04-01 00:47:59.000000,...,2021-05-01 00:14:47.000000]]
This will only pull in the files that that might contain matching rows.
!!! note “Requirements” This requires DuckDB to be installed.
A table scan can also be converted into a in-memory DuckDB table:
con = table.scan( row_filter=GreaterThanOrEqual("trip_distance", 10.0), selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"), ).to_duckdb(table_name="distant_taxi_trips")
Using the cursor that we can run queries on the DuckDB table:
print( con.execute( "SELECT tpep_dropoff_datetime - tpep_pickup_datetime AS duration FROM distant_taxi_trips LIMIT 4" ).fetchall() ) [ (datetime.timedelta(seconds=1194),), (datetime.timedelta(seconds=1118),), (datetime.timedelta(seconds=1697),), (datetime.timedelta(seconds=1581),), ]
!!! note “Requirements” This requires Ray to be installed.
A table scan can also be converted into a Ray dataset:
ray_dataset = table.scan( row_filter=GreaterThanOrEqual("trip_distance", 10.0), selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"), ).to_ray()
This will return a Ray dataset:
Dataset(
num_blocks=1,
num_rows=1168798,
schema={
VendorID: int64,
tpep_pickup_datetime: timestamp[us, tz=UTC],
tpep_dropoff_datetime: timestamp[us, tz=UTC]
}
)
Using Ray Dataset API to interact with the dataset:
print(ray_dataset.take(2)) [ { "VendorID": 2, "tpep_pickup_datetime": datetime.datetime(2008, 12, 31, 23, 23, 50), "tpep_dropoff_datetime": datetime.datetime(2009, 1, 1, 0, 34, 31), }, { "VendorID": 2, "tpep_pickup_datetime": datetime.datetime(2008, 12, 31, 23, 5, 3), "tpep_dropoff_datetime": datetime.datetime(2009, 1, 1, 16, 10, 18), }, ]