title: “Python API” weight: 5 type: docs aliases:
PyPaimon is a Python implementation for connecting Paimon catalog, reading & writing tables. The complete Python implementation of the brand new PyPaimon does not require JDK installation.
SDK is published at pypaimon. You can install by
pip install pypaimon
Before coming into contact with the Table, you need to create a Catalog.
{{< tabs “create-catalog” >}} {{< tab “filesystem” >}}
from pypaimon import CatalogFactory # Note that keys and values are all string catalog_options = { 'warehouse': 'file:///path/to/warehouse' } catalog = CatalogFactory.create(catalog_options)
{{< /tab >}} {{< tab “rest catalog” >}} The sample code is as follows. The detailed meaning of option can be found in DLF Token.
from pypaimon import CatalogFactory # Note that keys and values are all string catalog_options = { 'metastore': 'rest', 'warehouse': 'xxx', 'uri': 'xxx', 'dlf.region': 'xxx', 'token.provider': 'xxx', 'dlf.access-key-id': 'xxx', 'dlf.access-key-secret': 'xxx' } catalog = CatalogFactory.create(catalog_options)
{{< /tab >}} {{< /tabs >}}
Currently, PyPaimon only support filesystem catalog and rest catalog. See [Catalog]({{< ref “concepts/catalog” >}}).
You can use the catalog to create table for writing data.
Table is located in a database. If you want to create table in a new database, you should create it.
catalog.create_database( name='database_name', ignore_if_exists=True, # To raise error if the database exists, set False properties={'key': 'value'} # optional database properties )
Table schema contains fields definition, partition keys, primary keys, table options and comment. The field definition is described by pyarrow.Schema. All arguments except fields definition are optional.
Generally, there are two ways to build pyarrow.Schema.
First, you can use pyarrow.schema method directly, for example:
import pyarrow as pa from pypaimon import Schema pa_schema = pa.schema([ ('dt', pa.string()), ('hh', pa.string()), ('pk', pa.int64()), ('value', pa.string()) ]) schema = Schema.from_pyarrow_schema( pa_schema=pa_schema, partition_keys=['dt', 'hh'], primary_keys=['dt', 'hh', 'pk'], options={'bucket': '2'}, comment='my test table')
See [Data Types]({{< ref “python-api#data-types” >}}) for all supported pyarrow-to-paimon data types mapping.
Second, if you have some Pandas data, the pa_schema can be extracted from DataFrame:
import pandas as pd import pyarrow as pa from pypaimon import Schema # Example DataFrame data data = { 'dt': ['2024-01-01', '2024-01-01', '2024-01-02'], 'hh': ['12', '15', '20'], 'pk': [1, 2, 3], 'value': ['a', 'b', 'c'], } dataframe = pd.DataFrame(data) # Get Paimon Schema record_batch = pa.RecordBatch.from_pandas(dataframe) schema = Schema.from_pyarrow_schema( pa_schema=record_batch.schema, partition_keys=['dt', 'hh'], primary_keys=['dt', 'hh', 'pk'], options={'bucket': '2'}, comment='my test table' )
After building table schema, you can create corresponding table:
schema = ... catalog.create_table( identifier='database_name.table_name', schema=schema, ignore_if_exists=True # To raise error if the table exists, set False ) # Get Table table = catalog.get_table('database_name.table_name')
Paimon table write is Two-Phase Commit, you can write many times, but once committed, no more data can be written.
{{< hint warning >}} Currently, the feature of writing multiple times and committing once only supports append only table. {{< /hint >}}
table = catalog.get_table('database_name.table_name') # 1. Create table write and commit write_builder = table.new_batch_write_builder() table_write = write_builder.new_write() table_commit = write_builder.new_commit() # 2. Write data. Support 3 methods: # 2.1 Write pandas.DataFrame dataframe = ... table_write.write_pandas(dataframe) # 2.2 Write pyarrow.Table pa_table = ... table_write.write_arrow(pa_table) # 2.3 Write pyarrow.RecordBatch record_batch = ... table_write.write_arrow_batch(record_batch) # 3. Commit data commit_messages = table_write.prepare_commit() table_commit.commit(commit_messages) # 4. Close resources table_write.close() table_commit.close()
By default, the data will be appended to table. If you want to overwrite table, you should use TableWrite#overwrite API:
# overwrite whole table write_builder = table.new_batch_write_builder().overwrite() # overwrite partition 'dt=2024-01-01' write_builder = table.new_batch_write_builder().overwrite({'dt': '2024-01-01'})
You can create TableUpdate.update_by_arrow_with_row_id to update columns to data evolution tables.
The input data should include the _ROW_ID column, update operation will automatically sort and match each _ROW_ID to its corresponding first_row_id, then groups rows with the same first_row_id and writes them to a separate file.
simple_pa_schema = pa.schema([ ('f0', pa.int8()), ('f1', pa.int16()), ]) schema = Schema.from_pyarrow_schema(simple_pa_schema, options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'}) catalog.create_table('default.test_row_tracking', schema, False) table = catalog.get_table('default.test_row_tracking') # write all columns write_builder = table.new_batch_write_builder() table_write = write_builder.new_write() table_commit = write_builder.new_commit() expect_data = pa.Table.from_pydict({ 'f0': [-1, 2], 'f1': [-1001, 1002] }, schema=simple_pa_schema) table_write.write_arrow(expect_data) table_commit.commit(table_write.prepare_commit()) table_write.close() table_commit.close() # update partial columns write_builder = table.new_batch_write_builder() table_update = write_builder.new_update().with_update_type(['f0']) table_commit = write_builder.new_commit() data2 = pa.Table.from_pydict({ '_ROW_ID': [0, 1], 'f0': [5, 6], }, schema=pa.schema([ ('_ROW_ID', pa.int64()), ('f0', pa.int8()), ])) cmts = table_update.update_by_arrow_with_row_id(data2) table_commit.commit(cmts) table_commit.close() # content should be: # 'f0': [5, 6], # 'f1': [-1001, 1002]
A ReadBuilder is used to build reading utils and perform filter and projection pushdown.
table = catalog.get_table('database_name.table_name') read_builder = table.new_read_builder()
You can use PredicateBuilder to build filters and pushdown them by ReadBuilder:
# Example filter: ('f0' < 3 OR 'f1' > 6) AND 'f3' = 'A' predicate_builder = read_builder.new_predicate_builder() predicate1 = predicate_builder.less_than('f0', 3) predicate2 = predicate_builder.greater_than('f1', 6) predicate3 = predicate_builder.or_predicates([predicate1, predicate2]) predicate4 = predicate_builder.equal('f3', 'A') predicate_5 = predicate_builder.and_predicates([predicate3, predicate4]) read_builder = read_builder.with_filter(predicate_5)
See [Predicate]({{< ref “python-api#predicate” >}}) for all supported filters and building methods.
You can also pushdown projection by ReadBuilder:
# select f3 and f2 columns read_builder = read_builder.with_projection(['f3', 'f2'])
Then you can step into Scan Plan stage to get splits:
table_scan = read_builder.new_scan() splits = table_scan.plan().splits()
Finally, you can read data from the splits to various data format.
This requires pyarrow to be installed.
You can read all the data into a pyarrow.Table:
table_read = read_builder.new_read() pa_table = table_read.to_arrow(splits) print(pa_table) # pyarrow.Table # f0: int32 # f1: string # ---- # f0: [[1,2,3],[4,5,6],...] # f1: [["a","b","c"],["d","e","f"],...]
You can also read data into a pyarrow.RecordBatchReader and iterate record batches:
table_read = read_builder.new_read() for batch in table_read.to_arrow_batch_reader(splits): print(batch) # pyarrow.RecordBatch # f0: int32 # f1: string # ---- # f0: [1,2,3] # f1: ["a","b","c"]
You can read the data row by row into a native Python iterator. This is convenient for custom row-based processing logic.
table_read = read_builder.new_read() for row in table_read.to_iterator(splits): print(row) # [1,2,3] # ["a","b","c"]
This requires pandas to be installed.
You can read all the data into a pandas.DataFrame:
table_read = read_builder.new_read() df = table_read.to_pandas(splits) print(df) # f0 f1 # 0 1 a # 1 2 b # 2 3 c # 3 4 d # ...
This requires duckdb to be installed.
You can convert the splits into an in-memory DuckDB table and query it:
table_read = read_builder.new_read() duckdb_con = table_read.to_duckdb(splits, 'duckdb_table') print(duckdb_con.query("SELECT * FROM duckdb_table").fetchdf()) # f0 f1 # 0 1 a # 1 2 b # 2 3 c # 3 4 d # ... print(duckdb_con.query("SELECT * FROM duckdb_table WHERE f0 = 1").fetchdf()) # f0 f1 # 0 1 a
This requires ray to be installed.
You can convert the splits into a Ray Dataset and handle it by Ray Data API for distributed processing:
table_read = read_builder.new_read() ray_dataset = table_read.to_ray(splits) print(ray_dataset) # MaterializedDataset(num_blocks=1, num_rows=9, schema={f0: int32, f1: string}) print(ray_dataset.take(3)) # [{'f0': 1, 'f1': 'a'}, {'f0': 2, 'f1': 'b'}, {'f0': 3, 'f1': 'c'}] print(ray_dataset.to_pandas()) # f0 f1 # 0 1 a # 1 2 b # 2 3 c # 3 4 d # ...
The to_ray() method supports Ray Data API parameters for distributed processing:
# Basic usage ray_dataset = table_read.to_ray(splits) # Specify number of output blocks ray_dataset = table_read.to_ray(splits, override_num_blocks=4) # Configure Ray remote arguments ray_dataset = table_read.to_ray( splits, override_num_blocks=4, ray_remote_args={"num_cpus": 2, "max_retries": 3} ) # Use Ray Data operations mapped_dataset = ray_dataset.map(lambda row: {'value': row['value'] * 2}) filtered_dataset = ray_dataset.filter(lambda row: row['score'] > 80) df = ray_dataset.to_pandas()
Parameters:
override_num_blocks: Optional override for the number of output blocks. By default, Ray automatically determines the optimal number.ray_remote_args: Optional kwargs passed to ray.remote() in read tasks (e.g., {"num_cpus": 2, "max_retries": 3}).concurrency: Optional max number of Ray tasks to run concurrently. By default, dynamically decided based on available resources.**read_args: Additional kwargs passed to the datasource (e.g., per_task_row_limit in Ray 2.52.0+).Ray Block Size Configuration:
If you need to configure Ray‘s block size (e.g., when Paimon splits exceed Ray’s default 128MB block size), set it before calling to_ray():
from ray.data import DataContext ctx = DataContext.get_current() ctx.target_max_block_size = 256 * 1024 * 1024 # 256MB (default is 128MB) ray_dataset = table_read.to_ray(splits)
See Ray Data API Documentation for more details.
This requires torch to be installed.
You can read all the data into a torch.utils.data.Dataset or torch.utils.data.IterableDataset:
from torch.utils.data import DataLoader table_read = read_builder.new_read() dataset = table_read.to_torch(splits, streaming=True) dataloader = DataLoader( dataset, batch_size=2, num_workers=2, # Concurrency to read data shuffle=False ) # Collect all data from dataloader for batch_idx, batch_data in enumerate(dataloader): print(batch_data) # output: # {'user_id': tensor([1, 2]), 'behavior': ['a', 'b']} # {'user_id': tensor([3, 4]), 'behavior': ['c', 'd']} # {'user_id': tensor([5, 6]), 'behavior': ['e', 'f']} # {'user_id': tensor([7, 8]), 'behavior': ['g', 'h']}
When the streaming parameter is true, it will iteratively read; when it is false, it will read the full amount of data into memory.
This API allows reading data committed between two snapshot timestamps. The steps are as follows.
CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP on a copied table via table.copy({...}). The value must be a string: "startMillis,endMillis", where startMillis is exclusive and endMillis is inclusive.SnapshotManager to obtain snapshot timestamps or you can determine them by yourself.Example:
from pypaimon import CatalogFactory from pypaimon.common.core_options import CoreOptions from pypaimon.snapshot.snapshot_manager import SnapshotManager # Prepare catalog and obtain a table catalog = CatalogFactory.create({'warehouse': '/path/to/warehouse'}) table = catalog.get_table('default.your_table_name') # Assume the table has at least two snapshots (1 and 2) snapshot_manager = SnapshotManager(table) t1 = snapshot_manager.get_snapshot_by_id(1).time_millis t2 = snapshot_manager.get_snapshot_by_id(2).time_millis # Read records committed between [t1, t2] table_inc = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP: f"{t1},{t2}"}) read_builder = table_inc.new_read_builder() table_scan = read_builder.new_scan() table_read = read_builder.new_read() splits = table_scan.plan().splits() # To Arrow arrow_table = table_read.to_arrow(splits) # Or to pandas pandas_df = table_read.to_pandas(splits)
Shard Read allows you to read data in parallel by dividing the table into multiple shards. This is useful for distributed processing and parallel computation.
You can specify the shard index and total number of shards to read a specific portion of the data:
# Prepare read builder table = catalog.get_table('database_name.table_name') read_builder = table.new_read_builder() table_read = read_builder.new_read() # Read the second shard (index 1) out of 3 total shards splits = read_builder.new_scan().with_shard(1, 3).plan().splits() # Read all shards and concatenate results splits1 = read_builder.new_scan().with_shard(0, 3).plan().splits() splits2 = read_builder.new_scan().with_shard(1, 3).plan().splits() splits3 = read_builder.new_scan().with_shard(2, 3).plan().splits() # Combine results from all shards all_splits = splits1 + splits2 + splits3 pa_table = table_read.to_arrow(all_splits)
Example with shard read:
import pyarrow as pa from pypaimon import CatalogFactory, Schema # Create catalog catalog_options = {'warehouse': 'file:///path/to/warehouse'} catalog = CatalogFactory.create(catalog_options) catalog.create_database("default", False) # Define schema pa_schema = pa.schema([ ('user_id', pa.int64()), ('item_id', pa.int64()), ('behavior', pa.string()), ('dt', pa.string()), ]) # Create table and write data schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt']) catalog.create_table('default.test_table', schema, False) table = catalog.get_table('default.test_table') # Write data in two batches write_builder = table.new_batch_write_builder() # First write table_write = write_builder.new_write() table_commit = write_builder.new_commit() data1 = { 'user_id': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14], 'item_id': [1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009, 1010, 1011, 1012, 1013, 1014], 'behavior': ['a', 'b', 'c', None, 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm'], 'dt': ['p1', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p1'], } pa_table = pa.Table.from_pydict(data1, schema=pa_schema) table_write.write_arrow(pa_table) table_commit.commit(table_write.prepare_commit()) table_write.close() table_commit.close() # Second write table_write = write_builder.new_write() table_commit = write_builder.new_commit() data2 = { 'user_id': [5, 6, 7, 8, 18], 'item_id': [1005, 1006, 1007, 1008, 1018], 'behavior': ['e', 'f', 'g', 'h', 'z'], 'dt': ['p2', 'p1', 'p2', 'p2', 'p1'], } pa_table = pa.Table.from_pydict(data2, schema=pa_schema) table_write.write_arrow(pa_table) table_commit.commit(table_write.prepare_commit()) table_write.close() table_commit.close() # Read specific shard read_builder = table.new_read_builder() table_read = read_builder.new_read() # Read shard 2 out of 3 total shards splits = read_builder.new_scan().with_shard(2, 3).plan().splits() shard_data = table_read.to_arrow(splits) # Verify shard distribution by reading all shards splits1 = read_builder.new_scan().with_shard(0, 3).plan().splits() splits2 = read_builder.new_scan().with_shard(1, 3).plan().splits() splits3 = read_builder.new_scan().with_shard(2, 3).plan().splits() # Combine all shards should equal full table read all_shards_data = pa.concat_tables([ table_read.to_arrow(splits1), table_read.to_arrow(splits2), table_read.to_arrow(splits3), ]) full_table_data = table_read.to_arrow(read_builder.new_scan().plan().splits())
Key points about shard read:
| Python Native Type | PyArrow Type | Paimon Type |
|---|---|---|
int | pyarrow.int8() | TINYINT |
int | pyarrow.int16() | SMALLINT |
int | pyarrow.int32() | INT |
int | pyarrow.int64() | BIGINT |
float | pyarrow.float32() | FLOAT |
float | pyarrow.float64() | DOUBLE |
bool | pyarrow.bool_() | BOOLEAN |
str | pyarrow.string() | STRING, CHAR(n), VARCHAR(n) |
bytes | pyarrow.binary() | BYTES, VARBINARY(n) |
bytes | pyarrow.binary(length) | BINARY(length) |
decimal.Decimal | pyarrow.decimal128(precision, scale) | DECIMAL(precision, scale) |
datetime.datetime | pyarrow.timestamp(unit, tz=None) | TIMESTAMP(p) |
datetime.date | pyarrow.date32() | DATE |
datetime.time | pyarrow.time32(unit) or pyarrow.time64(unit) | TIME(p) |
| Predicate kind | Predicate method |
|---|---|
| p1 and p2 | PredicateBuilder.and_predicates([p1, p2]) |
| p1 or p2 | PredicateBuilder.or_predicates([p1, p2]) |
| f = literal | PredicateBuilder.equal(f, literal) |
| f != literal | PredicateBuilder.not_equal(f, literal) |
| f < literal | PredicateBuilder.less_than(f, literal) |
| f <= literal | PredicateBuilder.less_or_equal(f, literal) |
| f > literal | PredicateBuilder.greater_than(f, literal) |
| f >= literal | PredicateBuilder.greater_or_equal(f, literal) |
| f is null | PredicateBuilder.is_null(f) |
| f is not null | PredicateBuilder.is_not_null(f) |
| f.startswith(literal) | PredicateBuilder.startswith(f, literal) |
| f.endswith(literal) | PredicateBuilder.endswith(f, literal) |
| f.contains(literal) | PredicateBuilder.contains(f, literal) |
| f is in [l1, l2] | PredicateBuilder.is_in(f, [l1, l2]) |
| f is not in [l1, l2] | PredicateBuilder.is_not_in(f, [l1, l2]) |
| lower <= f <= upper | PredicateBuilder.between(f, lower, upper) |
The following shows the supported features of Python Paimon compared to Java Paimon:
Catalog Level
Table Level
bucket = -1 (unaware)bucket > 0 (fixed)bucket = -2 (postpone)bucket > 0 (fixed)with_shard feature