blob: aa7773e049d076cc0931ef1aa94cee4d573d93da [file] [log] [blame] [view]
---
title: "Python API"
weight: 5
type: docs
aliases:
- /api/python-api.html
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# Python API
PyPaimon is a Python implementation for connecting Paimon catalog, reading & writing tables. The complete Python
implementation of the brand new PyPaimon does not require JDK installation.
## Environment Settings
SDK is published at [pypaimon](https://pypi.org/project/pypaimon/). You can install by
```shell
pip install pypaimon
```
## Create Catalog
Before coming into contact with the Table, you need to create a Catalog.
{{< tabs "create-catalog" >}}
{{< tab "filesystem" >}}
```python
from pypaimon import CatalogFactory
# Note that keys and values are all string
catalog_options = {
'warehouse': 'file:///path/to/warehouse'
}
catalog = CatalogFactory.create(catalog_options)
```
{{< /tab >}}
{{< tab "rest catalog" >}}
The sample code is as follows. The detailed meaning of option can be found in [DLF Token](../concepts/rest/dlf.md).
```python
from pypaimon import CatalogFactory
# Note that keys and values are all string
catalog_options = {
'metastore': 'rest',
'warehouse': 'xxx',
'uri': 'xxx',
'dlf.region': 'xxx',
'token.provider': 'xxx',
'dlf.access-key-id': 'xxx',
'dlf.access-key-secret': 'xxx'
}
catalog = CatalogFactory.create(catalog_options)
```
{{< /tab >}}
{{< /tabs >}}
Currently, PyPaimon only support filesystem catalog and rest catalog. See [Catalog]({{< ref "concepts/catalog" >}}).
You can use the catalog to create table for writing data.
## Create Database
Table is located in a database. If you want to create table in a new database, you should create it.
```python
catalog.create_database(
name='database_name',
ignore_if_exists=True, # To raise error if the database exists, set False
properties={'key': 'value'} # optional database properties
)
```
## Create Table
Table schema contains fields definition, partition keys, primary keys, table options and comment.
The field definition is described by `pyarrow.Schema`. All arguments except fields definition are optional.
Generally, there are two ways to build `pyarrow.Schema`.
First, you can use `pyarrow.schema` method directly, for example:
```python
import pyarrow as pa
from pypaimon import Schema
pa_schema = pa.schema([
('dt', pa.string()),
('hh', pa.string()),
('pk', pa.int64()),
('value', pa.string())
])
schema = Schema.from_pyarrow_schema(
pa_schema=pa_schema,
partition_keys=['dt', 'hh'],
primary_keys=['dt', 'hh', 'pk'],
options={'bucket': '2'},
comment='my test table')
```
See [Data Types]({{< ref "python-api#data-types" >}}) for all supported `pyarrow-to-paimon` data types mapping.
Second, if you have some Pandas data, the `pa_schema` can be extracted from `DataFrame`:
```python
import pandas as pd
import pyarrow as pa
from pypaimon import Schema
# Example DataFrame data
data = {
'dt': ['2024-01-01', '2024-01-01', '2024-01-02'],
'hh': ['12', '15', '20'],
'pk': [1, 2, 3],
'value': ['a', 'b', 'c'],
}
dataframe = pd.DataFrame(data)
# Get Paimon Schema
record_batch = pa.RecordBatch.from_pandas(dataframe)
schema = Schema.from_pyarrow_schema(
pa_schema=record_batch.schema,
partition_keys=['dt', 'hh'],
primary_keys=['dt', 'hh', 'pk'],
options={'bucket': '2'},
comment='my test table'
)
```
After building table schema, you can create corresponding table:
```python
schema = ...
catalog.create_table(
identifier='database_name.table_name',
schema=schema,
ignore_if_exists=True # To raise error if the table exists, set False
)
# Get Table
table = catalog.get_table('database_name.table_name')
```
## Batch Write
Paimon table write is Two-Phase Commit, you can write many times, but once committed, no more data can be written.
{{< hint warning >}}
Currently, the feature of writing multiple times and committing once only supports append only table.
{{< /hint >}}
```python
table = catalog.get_table('database_name.table_name')
# 1. Create table write and commit
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
# 2. Write data. Support 3 methods:
# 2.1 Write pandas.DataFrame
dataframe = ...
table_write.write_pandas(dataframe)
# 2.2 Write pyarrow.Table
pa_table = ...
table_write.write_arrow(pa_table)
# 2.3 Write pyarrow.RecordBatch
record_batch = ...
table_write.write_arrow_batch(record_batch)
# 3. Commit data
commit_messages = table_write.prepare_commit()
table_commit.commit(commit_messages)
# 4. Close resources
table_write.close()
table_commit.close()
```
By default, the data will be appended to table. If you want to overwrite table, you should use `TableWrite#overwrite`
API:
```python
# overwrite whole table
write_builder = table.new_batch_write_builder().overwrite()
# overwrite partition 'dt=2024-01-01'
write_builder = table.new_batch_write_builder().overwrite({'dt': '2024-01-01'})
```
### Update columns
You can create `TableUpdate.update_by_arrow_with_row_id` to update columns to data evolution tables.
The input data should include the `_ROW_ID` column, update operation will automatically sort and match each `_ROW_ID` to
its corresponding `first_row_id`, then groups rows with the same `first_row_id` and writes them to a separate file.
```python
simple_pa_schema = pa.schema([
('f0', pa.int8()),
('f1', pa.int16()),
])
schema = Schema.from_pyarrow_schema(simple_pa_schema,
options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'})
catalog.create_table('default.test_row_tracking', schema, False)
table = catalog.get_table('default.test_row_tracking')
# write all columns
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
expect_data = pa.Table.from_pydict({
'f0': [-1, 2],
'f1': [-1001, 1002]
}, schema=simple_pa_schema)
table_write.write_arrow(expect_data)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()
# update partial columns
write_builder = table.new_batch_write_builder()
table_update = write_builder.new_update().with_update_type(['f0'])
table_commit = write_builder.new_commit()
data2 = pa.Table.from_pydict({
'_ROW_ID': [0, 1],
'f0': [5, 6],
}, schema=pa.schema([
('_ROW_ID', pa.int64()),
('f0', pa.int8()),
]))
cmts = table_update.update_by_arrow_with_row_id(data2)
table_commit.commit(cmts)
table_commit.close()
# content should be:
# 'f0': [5, 6],
# 'f1': [-1001, 1002]
```
## Batch Read
### Predicate pushdown
A `ReadBuilder` is used to build reading utils and perform filter and projection pushdown.
```python
table = catalog.get_table('database_name.table_name')
read_builder = table.new_read_builder()
```
You can use `PredicateBuilder` to build filters and pushdown them by `ReadBuilder`:
```python
# Example filter: ('f0' < 3 OR 'f1' > 6) AND 'f3' = 'A'
predicate_builder = read_builder.new_predicate_builder()
predicate1 = predicate_builder.less_than('f0', 3)
predicate2 = predicate_builder.greater_than('f1', 6)
predicate3 = predicate_builder.or_predicates([predicate1, predicate2])
predicate4 = predicate_builder.equal('f3', 'A')
predicate_5 = predicate_builder.and_predicates([predicate3, predicate4])
read_builder = read_builder.with_filter(predicate_5)
```
See [Predicate]({{< ref "python-api#predicate" >}}) for all supported filters and building methods.
You can also pushdown projection by `ReadBuilder`:
```python
# select f3 and f2 columns
read_builder = read_builder.with_projection(['f3', 'f2'])
```
### Generate Splits
Then you can step into Scan Plan stage to get `splits`:
```python
table_scan = read_builder.new_scan()
splits = table_scan.plan().splits()
```
Finally, you can read data from the `splits` to various data format.
### Read Apache Arrow
This requires `pyarrow` to be installed.
You can read all the data into a `pyarrow.Table`:
```python
table_read = read_builder.new_read()
pa_table = table_read.to_arrow(splits)
print(pa_table)
# pyarrow.Table
# f0: int32
# f1: string
# ----
# f0: [[1,2,3],[4,5,6],...]
# f1: [["a","b","c"],["d","e","f"],...]
```
You can also read data into a `pyarrow.RecordBatchReader` and iterate record batches:
```python
table_read = read_builder.new_read()
for batch in table_read.to_arrow_batch_reader(splits):
print(batch)
# pyarrow.RecordBatch
# f0: int32
# f1: string
# ----
# f0: [1,2,3]
# f1: ["a","b","c"]
```
### Read Python Iterator
You can read the data row by row into a native Python iterator.
This is convenient for custom row-based processing logic.
```python
table_read = read_builder.new_read()
for row in table_read.to_iterator(splits):
print(row)
# [1,2,3]
# ["a","b","c"]
```
### Read Pandas
This requires `pandas` to be installed.
You can read all the data into a `pandas.DataFrame`:
```python
table_read = read_builder.new_read()
df = table_read.to_pandas(splits)
print(df)
# f0 f1
# 0 1 a
# 1 2 b
# 2 3 c
# 3 4 d
# ...
```
### Read DuckDB
This requires `duckdb` to be installed.
You can convert the splits into an in-memory DuckDB table and query it:
```python
table_read = read_builder.new_read()
duckdb_con = table_read.to_duckdb(splits, 'duckdb_table')
print(duckdb_con.query("SELECT * FROM duckdb_table").fetchdf())
# f0 f1
# 0 1 a
# 1 2 b
# 2 3 c
# 3 4 d
# ...
print(duckdb_con.query("SELECT * FROM duckdb_table WHERE f0 = 1").fetchdf())
# f0 f1
# 0 1 a
```
### Read Ray
This requires `ray` to be installed.
You can convert the splits into a Ray Dataset and handle it by Ray Data API for distributed processing:
```python
table_read = read_builder.new_read()
ray_dataset = table_read.to_ray(splits)
print(ray_dataset)
# MaterializedDataset(num_blocks=1, num_rows=9, schema={f0: int32, f1: string})
print(ray_dataset.take(3))
# [{'f0': 1, 'f1': 'a'}, {'f0': 2, 'f1': 'b'}, {'f0': 3, 'f1': 'c'}]
print(ray_dataset.to_pandas())
# f0 f1
# 0 1 a
# 1 2 b
# 2 3 c
# 3 4 d
# ...
```
The `to_ray()` method supports Ray Data API parameters for distributed processing:
```python
# Basic usage
ray_dataset = table_read.to_ray(splits)
# Specify number of output blocks
ray_dataset = table_read.to_ray(splits, override_num_blocks=4)
# Configure Ray remote arguments
ray_dataset = table_read.to_ray(
splits,
override_num_blocks=4,
ray_remote_args={"num_cpus": 2, "max_retries": 3}
)
# Use Ray Data operations
mapped_dataset = ray_dataset.map(lambda row: {'value': row['value'] * 2})
filtered_dataset = ray_dataset.filter(lambda row: row['score'] > 80)
df = ray_dataset.to_pandas()
```
**Parameters:**
- `override_num_blocks`: Optional override for the number of output blocks. By default,
Ray automatically determines the optimal number.
- `ray_remote_args`: Optional kwargs passed to `ray.remote()` in read tasks
(e.g., `{"num_cpus": 2, "max_retries": 3}`).
- `concurrency`: Optional max number of Ray tasks to run concurrently. By default,
dynamically decided based on available resources.
- `**read_args`: Additional kwargs passed to the datasource (e.g., `per_task_row_limit`
in Ray 2.52.0+).
**Ray Block Size Configuration:**
If you need to configure Ray's block size (e.g., when Paimon splits exceed Ray's default
128MB block size), set it before calling `to_ray()`:
```python
from ray.data import DataContext
ctx = DataContext.get_current()
ctx.target_max_block_size = 256 * 1024 * 1024 # 256MB (default is 128MB)
ray_dataset = table_read.to_ray(splits)
```
See [Ray Data API Documentation](https://docs.ray.io/en/latest/data/api/doc/ray.data.read_datasource.html) for more details.
### Read Pytorch Dataset
This requires `torch` to be installed.
You can read all the data into a `torch.utils.data.Dataset` or `torch.utils.data.IterableDataset`:
```python
from torch.utils.data import DataLoader
table_read = read_builder.new_read()
dataset = table_read.to_torch(splits, streaming=True)
dataloader = DataLoader(
dataset,
batch_size=2,
num_workers=2, # Concurrency to read data
shuffle=False
)
# Collect all data from dataloader
for batch_idx, batch_data in enumerate(dataloader):
print(batch_data)
# output:
# {'user_id': tensor([1, 2]), 'behavior': ['a', 'b']}
# {'user_id': tensor([3, 4]), 'behavior': ['c', 'd']}
# {'user_id': tensor([5, 6]), 'behavior': ['e', 'f']}
# {'user_id': tensor([7, 8]), 'behavior': ['g', 'h']}
```
When the `streaming` parameter is true, it will iteratively read;
when it is false, it will read the full amount of data into memory.
### Incremental Read
This API allows reading data committed between two snapshot timestamps. The steps are as follows.
- Set the option `CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP` on a copied table via `table.copy({...})`. The value must
be a string: `"startMillis,endMillis"`, where `startMillis` is exclusive and `endMillis` is inclusive.
- Use `SnapshotManager` to obtain snapshot timestamps or you can determine them by yourself.
- Read the data as above.
Example:
```python
from pypaimon import CatalogFactory
from pypaimon.common.core_options import CoreOptions
from pypaimon.snapshot.snapshot_manager import SnapshotManager
# Prepare catalog and obtain a table
catalog = CatalogFactory.create({'warehouse': '/path/to/warehouse'})
table = catalog.get_table('default.your_table_name')
# Assume the table has at least two snapshots (1 and 2)
snapshot_manager = SnapshotManager(table)
t1 = snapshot_manager.get_snapshot_by_id(1).time_millis
t2 = snapshot_manager.get_snapshot_by_id(2).time_millis
# Read records committed between [t1, t2]
table_inc = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP: f"{t1},{t2}"})
read_builder = table_inc.new_read_builder()
table_scan = read_builder.new_scan()
table_read = read_builder.new_read()
splits = table_scan.plan().splits()
# To Arrow
arrow_table = table_read.to_arrow(splits)
# Or to pandas
pandas_df = table_read.to_pandas(splits)
```
### Shard Read
Shard Read allows you to read data in parallel by dividing the table into multiple shards. This is useful for
distributed processing and parallel computation.
You can specify the shard index and total number of shards to read a specific portion of the data:
```python
# Prepare read builder
table = catalog.get_table('database_name.table_name')
read_builder = table.new_read_builder()
table_read = read_builder.new_read()
# Read the second shard (index 1) out of 3 total shards
splits = read_builder.new_scan().with_shard(1, 3).plan().splits()
# Read all shards and concatenate results
splits1 = read_builder.new_scan().with_shard(0, 3).plan().splits()
splits2 = read_builder.new_scan().with_shard(1, 3).plan().splits()
splits3 = read_builder.new_scan().with_shard(2, 3).plan().splits()
# Combine results from all shards
all_splits = splits1 + splits2 + splits3
pa_table = table_read.to_arrow(all_splits)
```
Example with shard read:
```python
import pyarrow as pa
from pypaimon import CatalogFactory, Schema
# Create catalog
catalog_options = {'warehouse': 'file:///path/to/warehouse'}
catalog = CatalogFactory.create(catalog_options)
catalog.create_database("default", False)
# Define schema
pa_schema = pa.schema([
('user_id', pa.int64()),
('item_id', pa.int64()),
('behavior', pa.string()),
('dt', pa.string()),
])
# Create table and write data
schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt'])
catalog.create_table('default.test_table', schema, False)
table = catalog.get_table('default.test_table')
# Write data in two batches
write_builder = table.new_batch_write_builder()
# First write
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
data1 = {
'user_id': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14],
'item_id': [1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009, 1010, 1011, 1012, 1013, 1014],
'behavior': ['a', 'b', 'c', None, 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm'],
'dt': ['p1', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p1'],
}
pa_table = pa.Table.from_pydict(data1, schema=pa_schema)
table_write.write_arrow(pa_table)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()
# Second write
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
data2 = {
'user_id': [5, 6, 7, 8, 18],
'item_id': [1005, 1006, 1007, 1008, 1018],
'behavior': ['e', 'f', 'g', 'h', 'z'],
'dt': ['p2', 'p1', 'p2', 'p2', 'p1'],
}
pa_table = pa.Table.from_pydict(data2, schema=pa_schema)
table_write.write_arrow(pa_table)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()
# Read specific shard
read_builder = table.new_read_builder()
table_read = read_builder.new_read()
# Read shard 2 out of 3 total shards
splits = read_builder.new_scan().with_shard(2, 3).plan().splits()
shard_data = table_read.to_arrow(splits)
# Verify shard distribution by reading all shards
splits1 = read_builder.new_scan().with_shard(0, 3).plan().splits()
splits2 = read_builder.new_scan().with_shard(1, 3).plan().splits()
splits3 = read_builder.new_scan().with_shard(2, 3).plan().splits()
# Combine all shards should equal full table read
all_shards_data = pa.concat_tables([
table_read.to_arrow(splits1),
table_read.to_arrow(splits2),
table_read.to_arrow(splits3),
])
full_table_data = table_read.to_arrow(read_builder.new_scan().plan().splits())
```
Key points about shard read:
- **Shard Index**: Zero-based index of the shard to read (0 to total_shards-1)
- **Total Shards**: Total number of shards to divide the data into
- **Data Distribution**: Data is distributed evenly across shards, with remainder rows going to the last shard
- **Parallel Processing**: Each shard can be processed independently for better performance
- **Consistency**: Combining all shards should produce the complete table data
## Data Types
| Python Native Type | PyArrow Type | Paimon Type |
|:--------------------|:-------------------------------------------------|:----------------------------------|
| `int` | `pyarrow.int8()` | `TINYINT` |
| `int` | `pyarrow.int16()` | `SMALLINT` |
| `int` | `pyarrow.int32()` | `INT` |
| `int` | `pyarrow.int64()` | `BIGINT` |
| `float` | `pyarrow.float32()` | `FLOAT` |
| `float` | `pyarrow.float64()` | `DOUBLE` |
| `bool` | `pyarrow.bool_()` | `BOOLEAN` |
| `str` | `pyarrow.string()` | `STRING`, `CHAR(n)`, `VARCHAR(n)` |
| `bytes` | `pyarrow.binary()` | `BYTES`, `VARBINARY(n)` |
| `bytes` | `pyarrow.binary(length)` | `BINARY(length)` |
| `decimal.Decimal` | `pyarrow.decimal128(precision, scale)` | `DECIMAL(precision, scale)` |
| `datetime.datetime` | `pyarrow.timestamp(unit, tz=None)` | `TIMESTAMP(p)` |
| `datetime.date` | `pyarrow.date32()` | `DATE` |
| `datetime.time` | `pyarrow.time32(unit)` or `pyarrow.time64(unit)` | `TIME(p)` |
## Predicate
| Predicate kind | Predicate method |
|:----------------------|:----------------------------------------------|
| p1 and p2 | PredicateBuilder.and_predicates([p1, p2]) |
| p1 or p2 | PredicateBuilder.or_predicates([p1, p2]) |
| f = literal | PredicateBuilder.equal(f, literal) |
| f != literal | PredicateBuilder.not_equal(f, literal) |
| f < literal | PredicateBuilder.less_than(f, literal) |
| f <= literal | PredicateBuilder.less_or_equal(f, literal) |
| f > literal | PredicateBuilder.greater_than(f, literal) |
| f >= literal | PredicateBuilder.greater_or_equal(f, literal) |
| f is null | PredicateBuilder.is_null(f) |
| f is not null | PredicateBuilder.is_not_null(f) |
| f.startswith(literal) | PredicateBuilder.startswith(f, literal) |
| f.endswith(literal) | PredicateBuilder.endswith(f, literal) |
| f.contains(literal) | PredicateBuilder.contains(f, literal) |
| f is in [l1, l2] | PredicateBuilder.is_in(f, [l1, l2]) |
| f is not in [l1, l2] | PredicateBuilder.is_not_in(f, [l1, l2]) |
| lower <= f <= upper | PredicateBuilder.between(f, lower, upper) |
## Supported Features
The following shows the supported features of Python Paimon compared to Java Paimon:
**Catalog Level**
- FileSystemCatalog
- RestCatalog
**Table Level**
- Append Tables
- `bucket = -1` (unaware)
- `bucket > 0` (fixed)
- Primary Key Tables
- only support deduplicate
- `bucket = -2` (postpone)
- `bucket > 0` (fixed)
- read with deletion vectors enabled
- Read/Write Operations
- Batch read and write for append tables and primary key tables
- Predicate filtering
- Overwrite semantics
- Incremental reading of Delta data
- Reading and writing blob data
- `with_shard` feature