| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| """Type stubs for Fluss Python bindings.""" |
| |
| from enum import IntEnum |
| from types import TracebackType |
| from typing import Dict, List, Optional, Tuple |
| |
| import pandas as pd |
| import pyarrow as pa |
| |
| class ChangeType(IntEnum): |
| """Represents the type of change for a record in a log.""" |
| |
| AppendOnly = 0 |
| """Append-only operation""" |
| Insert = 1 |
| """Insert operation""" |
| UpdateBefore = 2 |
| """Update operation containing the previous content of the updated row""" |
| UpdateAfter = 3 |
| """Update operation containing the new content of the updated row""" |
| Delete = 4 |
| """Delete operation""" |
| |
| def short_string(self) -> str: |
| """Returns a short string representation (+A, +I, -U, +U, -D).""" |
| ... |
| |
| class ScanRecord: |
| """Represents a single scan record with metadata.""" |
| |
| @property |
| def bucket(self) -> TableBucket: |
| """The bucket this record belongs to.""" |
| ... |
| @property |
| def offset(self) -> int: |
| """The position of this record in the log.""" |
| ... |
| @property |
| def timestamp(self) -> int: |
| """The timestamp of this record.""" |
| ... |
| @property |
| def change_type(self) -> ChangeType: |
| """The type of change (insert, update, delete, etc.).""" |
| ... |
| @property |
| def row(self) -> Dict[str, object]: |
| """The row data as a dictionary mapping column names to values.""" |
| ... |
| def __str__(self) -> str: ... |
| def __repr__(self) -> str: ... |
| |
| class RecordBatch: |
| """Represents a batch of records with metadata.""" |
| |
| @property |
| def batch(self) -> pa.RecordBatch: |
| """The Arrow RecordBatch containing the data.""" |
| ... |
| @property |
| def bucket(self) -> TableBucket: |
| """The bucket this batch belongs to.""" |
| ... |
| @property |
| def base_offset(self) -> int: |
| """The offset of the first record in this batch.""" |
| ... |
| @property |
| def last_offset(self) -> int: |
| """The offset of the last record in this batch.""" |
| ... |
| def __str__(self) -> str: ... |
| def __repr__(self) -> str: ... |
| |
| class Config: |
| def __init__(self, properties: Optional[Dict[str, str]] = None) -> None: ... |
| @property |
| def bootstrap_server(self) -> Optional[str]: ... |
| @bootstrap_server.setter |
| def bootstrap_server(self, server: str) -> None: ... |
| @property |
| def request_max_size(self) -> int: ... |
| @request_max_size.setter |
| def request_max_size(self, size: int) -> None: ... |
| @property |
| def writer_batch_size(self) -> int: ... |
| @writer_batch_size.setter |
| def writer_batch_size(self, size: int) -> None: ... |
| |
| class FlussConnection: |
| @staticmethod |
| async def connect(config: Config) -> FlussConnection: ... |
| async def get_admin(self) -> FlussAdmin: ... |
| async def get_table(self, table_path: TablePath) -> FlussTable: ... |
| def close(self) -> None: ... |
| def __enter__(self) -> FlussConnection: ... |
| def __exit__( |
| self, |
| exc_type: Optional[type], |
| exc_value: Optional[BaseException], |
| traceback: Optional[TracebackType], |
| ) -> bool: ... |
| def __repr__(self) -> str: ... |
| |
| class FlussAdmin: |
| async def create_database( |
| self, |
| database_name: str, |
| ignore_if_exists: bool = False, |
| database_descriptor: Optional["DatabaseDescriptor"] = None, |
| ) -> None: |
| """Create a database.""" |
| ... |
| async def drop_database( |
| self, |
| database_name: str, |
| ignore_if_not_exists: bool = False, |
| cascade: bool = True, |
| ) -> None: |
| """Drop a database.""" |
| ... |
| async def list_databases(self) -> List[str]: |
| """List all databases.""" |
| ... |
| async def database_exists(self, database_name: str) -> bool: |
| """Check if a database exists.""" |
| ... |
| async def get_database_info(self, database_name: str) -> "DatabaseInfo": |
| """Get database information.""" |
| ... |
| async def list_tables(self, database_name: str) -> List[str]: |
| """List all tables in a database.""" |
| ... |
| async def table_exists(self, table_path: TablePath) -> bool: |
| """Check if a table exists.""" |
| ... |
| async def drop_partition( |
| self, |
| table_path: TablePath, |
| partition_spec: Dict[str, str], |
| ignore_if_not_exists: bool = False, |
| ) -> None: |
| """Drop a partition from a partitioned table.""" |
| ... |
| async def create_table( |
| self, |
| table_path: TablePath, |
| table_descriptor: TableDescriptor, |
| ignore_if_exists: Optional[bool] = False, |
| ) -> None: ... |
| async def get_table(self, table_path: TablePath) -> TableInfo: ... |
| async def get_latest_lake_snapshot(self, table_path: TablePath) -> LakeSnapshot: ... |
| async def drop_table( |
| self, |
| table_path: TablePath, |
| ignore_if_not_exists: bool = False, |
| ) -> None: ... |
| async def list_offsets( |
| self, |
| table_path: TablePath, |
| bucket_ids: List[int], |
| offset_type: str, |
| timestamp: Optional[int] = None, |
| ) -> Dict[int, int]: |
| """List offsets for the specified buckets. |
| |
| Args: |
| table_path: Path to the table |
| bucket_ids: List of bucket IDs to query |
| offset_type: "earliest", "latest", or "timestamp" |
| timestamp: Required when offset_type is "timestamp" |
| |
| Returns: |
| Dict mapping bucket_id -> offset |
| """ |
| ... |
| async def list_partition_offsets( |
| self, |
| table_path: TablePath, |
| partition_name: str, |
| bucket_ids: List[int], |
| offset_type: str, |
| timestamp: Optional[int] = None, |
| ) -> Dict[int, int]: |
| """List offsets for buckets in a specific partition. |
| |
| Args: |
| table_path: Path to the table |
| partition_name: Partition value (e.g., "US" not "region=US") |
| bucket_ids: List of bucket IDs to query |
| offset_type: "earliest", "latest", or "timestamp" |
| timestamp: Required when offset_type is "timestamp" |
| |
| Returns: |
| Dict mapping bucket_id -> offset |
| """ |
| ... |
| async def create_partition( |
| self, |
| table_path: TablePath, |
| partition_spec: Dict[str, str], |
| ignore_if_exists: bool = False, |
| ) -> None: |
| """Create a partition for a partitioned table. |
| |
| Args: |
| table_path: Path to the table |
| partition_spec: Dict mapping partition column name to value (e.g., {"region": "US"}) |
| ignore_if_exists: If True, don't raise error if partition already exists |
| """ |
| ... |
| async def list_partition_infos( |
| self, |
| table_path: TablePath, |
| ) -> List["PartitionInfo"]: |
| """List all partitions for a partitioned table. |
| |
| Args: |
| table_path: Path to the table |
| |
| Returns: |
| List of PartitionInfo objects |
| """ |
| ... |
| def __repr__(self) -> str: ... |
| |
| |
| class DatabaseDescriptor: |
| """Descriptor for a Fluss database (comment and custom properties).""" |
| |
| def __init__( |
| self, |
| comment: Optional[str] = None, |
| custom_properties: Optional[Dict[str, str]] = None, |
| ) -> None: ... |
| @property |
| def comment(self) -> Optional[str]: ... |
| def get_custom_properties(self) -> Dict[str, str]: ... |
| def __repr__(self) -> str: ... |
| |
| |
| class DatabaseInfo: |
| """Information about a Fluss database.""" |
| |
| @property |
| def database_name(self) -> str: ... |
| def get_database_descriptor(self) -> DatabaseDescriptor: ... |
| @property |
| def created_time(self) -> int: ... |
| @property |
| def modified_time(self) -> int: ... |
| def __repr__(self) -> str: ... |
| |
| class TableScan: |
| """Builder for creating log scanners with flexible configuration. |
| |
| Use this builder to configure projection before creating a log scanner. |
| Obtain a TableScan instance via `FlussTable.new_scan()`. |
| |
| Example: |
| ```python |
| # Record-based scanning with projection |
| scanner = await table.new_scan() \\ |
| .project([0, 1, 2]) \\ |
| .create_log_scanner() |
| |
| # Batch-based scanning with column names |
| scanner = await table.new_scan() \\ |
| .project_by_name(["id", "name"]) \\ |
| .create_batch_scanner() |
| ``` |
| """ |
| |
| def project(self, indices: List[int]) -> "TableScan": |
| """Project to specific columns by their indices. |
| |
| Args: |
| indices: List of column indices (0-based) to include in the scan. |
| |
| Returns: |
| Self for method chaining. |
| """ |
| ... |
| def project_by_name(self, names: List[str]) -> "TableScan": |
| """Project to specific columns by their names. |
| |
| Args: |
| names: List of column names to include in the scan. |
| |
| Returns: |
| Self for method chaining. |
| """ |
| ... |
| async def create_log_scanner(self) -> LogScanner: |
| """Create a record-based log scanner. |
| |
| Use this scanner with `poll()` to get individual records with metadata |
| (offset, timestamp, change_type). |
| |
| Returns: |
| LogScanner for record-by-record scanning with `poll()` |
| """ |
| ... |
| async def create_batch_scanner(self) -> LogScanner: |
| """Create a batch-based log scanner. |
| |
| Use this scanner with `poll_arrow()` to get Arrow Tables, or with |
| `poll_batches()` to get individual batches with metadata. |
| |
| Returns: |
| LogScanner for batch-based scanning with `poll_arrow()` or `poll_batches()` |
| """ |
| ... |
| def __repr__(self) -> str: ... |
| |
| class FlussTable: |
| def new_scan(self) -> TableScan: |
| """Create a new table scan builder for configuring and creating log scanners. |
| |
| Use this method to create scanners with the builder pattern: |
| |
| Example: |
| ```python |
| # Record-based scanning |
| scanner = await table.new_scan() \\ |
| .project([0, 1]) \\ |
| .create_log_scanner() |
| |
| # Batch-based scanning |
| scanner = await table.new_scan() \\ |
| .project_by_name(["id", "name"]) \\ |
| .create_batch_scanner() |
| ``` |
| |
| Returns: |
| TableScan builder for configuring the scanner. |
| """ |
| ... |
| async def new_append_writer(self) -> AppendWriter: ... |
| def new_upsert( |
| self, |
| columns: Optional[List[str]] = None, |
| column_indices: Optional[List[int]] = None, |
| ) -> UpsertWriter: ... |
| def new_lookup(self) -> Lookuper: ... |
| def get_table_info(self) -> TableInfo: ... |
| def get_table_path(self) -> TablePath: ... |
| def has_primary_key(self) -> bool: ... |
| def __repr__(self) -> str: ... |
| |
| class AppendWriter: |
| def append(self, row: dict | list | tuple) -> WriteResultHandle: |
| """Append a single row to the table. |
| |
| Args: |
| row: Dictionary mapping field names to values, or |
| list/tuple of values in schema order |
| |
| Returns: |
| WriteResultHandle: Ignore for fire-and-forget, or await handle.wait() for acknowledgement. |
| |
| Supported Types: |
| Currently supports primitive types only: |
| - Boolean, TinyInt, SmallInt, Int, BigInt (integers) |
| - Float, Double (floating point) |
| - String, Char (text) |
| - Bytes, Binary (binary data) |
| - Null values |
| |
| Temporal types (Date, Timestamp, Decimal) are not yet supported. |
| |
| Example: |
| writer.append({'id': 1, 'name': 'Alice', 'score': 95.5}) |
| writer.append([1, 'Alice', 95.5]) |
| |
| Note: |
| For high-throughput bulk loading, prefer write_arrow_batch(). |
| Use flush() to ensure all queued records are sent and acknowledged. |
| """ |
| ... |
| def write_arrow(self, table: pa.Table) -> None: ... |
| def write_arrow_batch(self, batch: pa.RecordBatch) -> WriteResultHandle: ... |
| def write_pandas(self, df: pd.DataFrame) -> None: ... |
| async def flush(self) -> None: ... |
| def __repr__(self) -> str: ... |
| |
| class UpsertWriter: |
| """Writer for upserting and deleting data in a Fluss primary key table.""" |
| |
| def upsert(self, row: dict | list | tuple) -> WriteResultHandle: |
| """Upsert a row into the table. |
| |
| If a row with the same primary key exists, it will be updated. |
| Otherwise, a new row will be inserted. |
| |
| Args: |
| row: Dictionary mapping field names to values, or |
| list/tuple of values in schema order |
| |
| Returns: |
| WriteResultHandle: Ignore for fire-and-forget, or await handle.wait() for ack. |
| """ |
| ... |
| def delete(self, pk: dict | list | tuple) -> WriteResultHandle: |
| """Delete a row from the table by primary key. |
| |
| Args: |
| pk: Dictionary with PK column names as keys, or |
| list/tuple of PK values in PK column order |
| |
| Returns: |
| WriteResultHandle: Ignore for fire-and-forget, or await handle.wait() for ack. |
| """ |
| ... |
| async def flush(self) -> None: |
| """Flush all pending upsert/delete operations to the server.""" |
| ... |
| def __repr__(self) -> str: ... |
| |
| |
| class WriteResultHandle: |
| """Handle for a pending write (append/upsert/delete). Ignore for fire-and-forget, or await handle.wait() for ack.""" |
| |
| async def wait(self) -> None: |
| """Wait for server acknowledgment of this write.""" |
| ... |
| def __repr__(self) -> str: ... |
| |
| |
| class Lookuper: |
| """Lookuper for performing primary key lookups on a Fluss table.""" |
| |
| async def lookup(self, pk: dict | list | tuple) -> Optional[Dict[str, object]]: |
| """Lookup a row by its primary key. |
| |
| Args: |
| pk: Dictionary with PK column names as keys, or |
| list/tuple of PK values in PK column order |
| |
| Returns: |
| A dict containing the row data if found, None otherwise. |
| """ |
| ... |
| def __repr__(self) -> str: ... |
| |
| class LogScanner: |
| """Scanner for reading log data from a Fluss table. |
| |
| This scanner supports two modes: |
| - Record-based scanning via `poll()` - returns individual records with metadata |
| - Batch-based scanning via `poll_arrow()` / `poll_batches()` - returns Arrow batches |
| |
| Create scanners using the builder pattern: |
| # Record-based scanning |
| scanner = await table.new_scan().create_log_scanner() |
| |
| # Batch-based scanning |
| scanner = await table.new_scan().create_batch_scanner() |
| |
| # With projection |
| scanner = await table.new_scan().project([0, 1]).create_log_scanner() |
| """ |
| |
| def subscribe(self, bucket_id: int, start_offset: int) -> None: |
| """Subscribe to a single bucket at a specific offset (non-partitioned tables). |
| |
| Args: |
| bucket_id: The bucket ID to subscribe to |
| start_offset: The offset to start reading from (use EARLIEST_OFFSET for beginning) |
| """ |
| ... |
| def subscribe_buckets(self, bucket_offsets: Dict[int, int]) -> None: |
| """Subscribe to multiple buckets at specified offsets (non-partitioned tables). |
| |
| Args: |
| bucket_offsets: Dict mapping bucket_id -> start_offset |
| """ |
| ... |
| def subscribe_partition( |
| self, partition_id: int, bucket_id: int, start_offset: int |
| ) -> None: |
| """Subscribe to a bucket within a specific partition (partitioned tables only). |
| |
| Args: |
| partition_id: The partition ID (from PartitionInfo.partition_id) |
| bucket_id: The bucket ID within the partition |
| start_offset: The offset to start reading from (use EARLIEST_OFFSET for beginning) |
| """ |
| ... |
| def subscribe_partition_buckets( |
| self, partition_bucket_offsets: Dict[Tuple[int, int], int] |
| ) -> None: |
| """Subscribe to multiple partition+bucket combinations at once (partitioned tables only). |
| |
| Args: |
| partition_bucket_offsets: Dict mapping (partition_id, bucket_id) tuples to start_offsets. |
| Example: {(partition_id_1, 0): EARLIEST_OFFSET, (partition_id_2, 1): 100} |
| """ |
| ... |
| def unsubscribe_partition(self, partition_id: int, bucket_id: int) -> None: |
| """Unsubscribe from a specific partition bucket (partitioned tables only). |
| |
| Args: |
| partition_id: The partition ID to unsubscribe from |
| bucket_id: The bucket ID within the partition |
| """ |
| ... |
| def poll(self, timeout_ms: int) -> List[ScanRecord]: |
| """Poll for individual records with metadata. |
| |
| Requires a record-based scanner (created with new_scan().create_log_scanner()). |
| |
| Args: |
| timeout_ms: Timeout in milliseconds to wait for records. |
| |
| Returns: |
| List of ScanRecord objects, each containing bucket, offset, timestamp, |
| change_type, and row data as a dictionary. |
| |
| Note: |
| Returns an empty list if no records are available or timeout expires. |
| """ |
| ... |
| def poll_batches(self, timeout_ms: int) -> List[RecordBatch]: |
| """Poll for batches with metadata. |
| |
| Requires a batch-based scanner (created with new_scan().create_batch_scanner()). |
| |
| Args: |
| timeout_ms: Timeout in milliseconds to wait for batches. |
| |
| Returns: |
| List of RecordBatch objects, each containing the Arrow batch along with |
| bucket, base_offset, and last_offset metadata. |
| |
| Note: |
| Returns an empty list if no batches are available or timeout expires. |
| """ |
| ... |
| def poll_arrow(self, timeout_ms: int) -> pa.Table: |
| """Poll for records as an Arrow Table. |
| |
| Requires a batch-based scanner (created with new_scan().create_batch_scanner()). |
| |
| Args: |
| timeout_ms: Timeout in milliseconds to wait for records. |
| |
| Returns: |
| PyArrow Table containing the polled records (batches merged). |
| |
| Note: |
| Returns an empty table (with correct schema) if no records are available |
| or timeout expires. |
| """ |
| ... |
| def to_pandas(self) -> pd.DataFrame: |
| """Convert all data to Pandas DataFrame. |
| |
| Requires a batch-based scanner (created with new_scan().create_batch_scanner()). |
| Reads from currently subscribed buckets until reaching their latest offsets. |
| |
| You must call subscribe(), subscribe_buckets(), or subscribe_partition() first. |
| """ |
| ... |
| def to_arrow(self) -> pa.Table: |
| """Convert all data to Arrow Table. |
| |
| Requires a batch-based scanner (created with new_scan().create_batch_scanner()). |
| Reads from currently subscribed buckets until reaching their latest offsets. |
| |
| You must call subscribe(), subscribe_buckets(), or subscribe_partition() first. |
| """ |
| ... |
| def __repr__(self) -> str: ... |
| |
| class Schema: |
| def __init__( |
| self, schema: pa.Schema, primary_keys: Optional[List[str]] = None |
| ) -> None: ... |
| def get_column_names(self) -> List[str]: ... |
| def get_column_types(self) -> List[str]: ... |
| def get_columns(self) -> List[Tuple[str, str]]: ... |
| def __str__(self) -> str: ... |
| |
| class TableDescriptor: |
| def __init__( |
| self, |
| schema: Schema, |
| *, |
| partition_keys: Optional[List[str]] = None, |
| bucket_count: Optional[int] = None, |
| bucket_keys: Optional[List[str]] = None, |
| comment: Optional[str] = None, |
| log_format: Optional[str] = None, |
| kv_format: Optional[str] = None, |
| **properties: str, |
| ) -> None: ... |
| def get_schema(self) -> Schema: ... |
| |
| class TablePath: |
| def __init__(self, database: str, table: str) -> None: ... |
| @property |
| def database_name(self) -> str: ... |
| @property |
| def table_name(self) -> str: ... |
| def table_path_str(self) -> str: ... |
| def __str__(self) -> str: ... |
| def __repr__(self) -> str: ... |
| def __hash__(self) -> int: ... |
| def __eq__(self, other: object) -> bool: ... |
| |
| class TableInfo: |
| @property |
| def table_id(self) -> int: ... |
| @property |
| def schema_id(self) -> int: ... |
| @property |
| def created_time(self) -> int: ... |
| @property |
| def modified_time(self) -> int: ... |
| @property |
| def table_path(self) -> TablePath: ... |
| @property |
| def num_buckets(self) -> int: ... |
| @property |
| def comment(self) -> Optional[str]: ... |
| def get_primary_keys(self) -> List[str]: ... |
| def get_bucket_keys(self) -> List[str]: ... |
| def get_partition_keys(self) -> List[str]: ... |
| def has_primary_key(self) -> bool: ... |
| def is_partitioned(self) -> bool: ... |
| def get_properties(self) -> Dict[str, str]: ... |
| def get_custom_properties(self) -> Dict[str, str]: ... |
| def get_schema(self) -> Schema: ... |
| def get_column_names(self) -> List[str]: ... |
| def get_column_count(self) -> int: ... |
| |
| class FlussError(Exception): |
| message: str |
| def __init__(self, message: str) -> None: ... |
| def __str__(self) -> str: ... |
| |
| class LakeSnapshot: |
| def __init__(self, snapshot_id: int) -> None: ... |
| @property |
| def snapshot_id(self) -> int: ... |
| @property |
| def table_buckets_offset(self) -> Dict[TableBucket, int]: ... |
| def get_bucket_offset(self, bucket: TableBucket) -> Optional[int]: ... |
| def get_table_buckets(self) -> List[TableBucket]: ... |
| def __str__(self) -> str: ... |
| def __repr__(self) -> str: ... |
| |
| class TableBucket: |
| def __init__(self, table_id: int, bucket: int) -> None: ... |
| @staticmethod |
| def with_partition( |
| table_id: int, partition_id: int, bucket: int |
| ) -> TableBucket: ... |
| @property |
| def table_id(self) -> int: ... |
| @property |
| def bucket_id(self) -> int: ... |
| @property |
| def partition_id(self) -> Optional[int]: ... |
| def __hash__(self) -> int: ... |
| def __eq__(self, other: object) -> bool: ... |
| def __str__(self) -> str: ... |
| def __repr__(self) -> str: ... |
| |
| class TableDistribution: |
| def bucket_keys(self) -> List[str]: ... |
| def bucket_count(self) -> Optional[int]: ... |
| |
| class PartitionInfo: |
| """Information about a partition.""" |
| |
| @property |
| def partition_id(self) -> int: |
| """Get the partition ID (globally unique in the cluster).""" |
| ... |
| @property |
| def partition_name(self) -> str: |
| """Get the partition name.""" |
| ... |
| def __repr__(self) -> str: ... |
| |
| class OffsetType: |
| """Offset type constants for list_offsets().""" |
| |
| EARLIEST: str |
| LATEST: str |
| TIMESTAMP: str |
| |
| # Constant for earliest offset (-2) |
| EARLIEST_OFFSET: int |
| |
| __version__: str |