| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import asyncio |
| import traceback |
| from datetime import date, datetime |
| from datetime import time as dt_time |
| from decimal import Decimal |
| |
| import pandas as pd |
| import pyarrow as pa |
| |
| import fluss |
| |
| |
| async def main(): |
| # Create connection configuration |
| config_spec = { |
| "bootstrap.servers": "127.0.0.1:9123", |
| # Add other configuration options as needed |
| "writer.request-max-size": "10485760", # 10 MB |
| "writer.acks": "all", # Wait for all replicas to acknowledge |
| "writer.retries": "3", # Retry up to 3 times on failure |
| "writer.batch-size": "1000", # Batch size for writes |
| } |
| config = fluss.Config(config_spec) |
| |
| # Create connection using the static create method |
| conn = await fluss.FlussConnection.create(config) |
| |
| # Define fields for PyArrow |
| fields = [ |
| pa.field("id", pa.int32()), |
| pa.field("name", pa.string()), |
| pa.field("score", pa.float32()), |
| pa.field("age", pa.int32()), |
| pa.field("birth_date", pa.date32()), |
| pa.field("check_in_time", pa.time32("ms")), |
| pa.field("created_at", pa.timestamp("us")), # TIMESTAMP (NTZ) |
| pa.field("updated_at", pa.timestamp("us", tz="UTC")), # TIMESTAMP_LTZ |
| pa.field("salary", pa.decimal128(10, 2)), |
| ] |
| |
| # Create a PyArrow schema |
| schema = pa.schema(fields) |
| |
| # Create a Fluss Schema first (this is what TableDescriptor expects) |
| fluss_schema = fluss.Schema(schema) |
| |
| # Create a Fluss TableDescriptor |
| table_descriptor = fluss.TableDescriptor(fluss_schema) |
| |
| # Get the admin for Fluss |
| admin = await conn.get_admin() |
| |
| # Create a Fluss table |
| table_path = fluss.TablePath("fluss", "sample_table_types") |
| |
| try: |
| await admin.create_table(table_path, table_descriptor, True) |
| print(f"Created table: {table_path}") |
| except Exception as e: |
| print(f"Table creation failed: {e}") |
| |
| # Get table information via admin |
| try: |
| table_info = await admin.get_table_info(table_path) |
| print(f"Table info: {table_info}") |
| print(f"Table ID: {table_info.table_id}") |
| print(f"Schema ID: {table_info.schema_id}") |
| print(f"Created time: {table_info.created_time}") |
| print(f"Primary keys: {table_info.get_primary_keys()}") |
| except Exception as e: |
| print(f"Failed to get table info: {e}") |
| |
| # Demo: List offsets |
| print("\n--- Testing list_offsets() ---") |
| try: |
| # Query latest offsets using OffsetSpec factory method |
| offsets = await admin.list_offsets( |
| table_path, |
| bucket_ids=[0], |
| offset_spec=fluss.OffsetSpec.latest() |
| ) |
| print(f"Latest offsets for table (before writes): {offsets}") |
| except Exception as e: |
| print(f"Failed to list offsets: {e}") |
| |
| # Get the table instance |
| table = await conn.get_table(table_path) |
| print(f"Got table: {table}") |
| |
| # Create a writer for the table |
| append_writer = table.new_append().create_writer() |
| print(f"Created append writer: {append_writer}") |
| |
| try: |
| # Demo: Write PyArrow Table |
| print("\n--- Testing PyArrow Table write ---") |
| pa_table = pa.Table.from_arrays( |
| [ |
| pa.array([1, 2, 3], type=pa.int32()), |
| pa.array(["Alice", "Bob", "Charlie"], type=pa.string()), |
| pa.array([95.2, 87.2, 92.1], type=pa.float32()), |
| pa.array([25, 30, 35], type=pa.int32()), |
| pa.array( |
| [date(1999, 5, 15), date(1994, 3, 20), date(1989, 11, 8)], |
| type=pa.date32(), |
| ), |
| pa.array( |
| [dt_time(9, 0, 0), dt_time(9, 30, 0), dt_time(10, 0, 0)], |
| type=pa.time32("ms"), |
| ), |
| pa.array( |
| [ |
| datetime(2024, 1, 15, 10, 30), |
| datetime(2024, 1, 15, 11, 0), |
| datetime(2024, 1, 15, 11, 30), |
| ], |
| type=pa.timestamp("us"), |
| ), |
| pa.array( |
| [ |
| datetime(2024, 1, 15, 10, 30), |
| datetime(2024, 1, 15, 11, 0), |
| datetime(2024, 1, 15, 11, 30), |
| ], |
| type=pa.timestamp("us", tz="UTC"), |
| ), |
| pa.array( |
| [Decimal("75000.00"), Decimal("82000.50"), Decimal("95000.75")], |
| type=pa.decimal128(10, 2), |
| ), |
| ], |
| schema=schema, |
| ) |
| |
| append_writer.write_arrow(pa_table) |
| print("Successfully wrote PyArrow Table") |
| |
| # Demo: Write PyArrow RecordBatch |
| print("\n--- Testing PyArrow RecordBatch write ---") |
| pa_record_batch = pa.RecordBatch.from_arrays( |
| [ |
| pa.array([4, 5], type=pa.int32()), |
| pa.array(["David", "Eve"], type=pa.string()), |
| pa.array([88.5, 91.0], type=pa.float32()), |
| pa.array([28, 32], type=pa.int32()), |
| pa.array([date(1996, 7, 22), date(1992, 12, 1)], type=pa.date32()), |
| pa.array([dt_time(14, 15, 0), dt_time(8, 45, 0)], type=pa.time32("ms")), |
| pa.array( |
| [datetime(2024, 1, 16, 9, 0), datetime(2024, 1, 16, 9, 30)], |
| type=pa.timestamp("us"), |
| ), |
| pa.array( |
| [datetime(2024, 1, 16, 9, 0), datetime(2024, 1, 16, 9, 30)], |
| type=pa.timestamp("us", tz="UTC"), |
| ), |
| pa.array( |
| [Decimal("68000.00"), Decimal("72500.25")], |
| type=pa.decimal128(10, 2), |
| ), |
| ], |
| schema=schema, |
| ) |
| |
| append_writer.write_arrow_batch(pa_record_batch) |
| print("Successfully wrote PyArrow RecordBatch") |
| |
| # Test 3: Append single rows with Date, Time, Timestamp, Decimal |
| print("\n--- Testing single row append with temporal/decimal types ---") |
| # Dict input with all types including Date, Time, Timestamp, Decimal |
| append_writer.append( |
| { |
| "id": 8, |
| "name": "Helen", |
| "score": 93.5, |
| "age": 26, |
| "birth_date": date(1998, 4, 10), |
| "check_in_time": dt_time(11, 30, 45), |
| "created_at": datetime(2024, 1, 17, 14, 0, 0), |
| "updated_at": datetime(2024, 1, 17, 14, 0, 0), |
| "salary": Decimal("88000.00"), |
| } |
| ) |
| print("Successfully appended row (dict with Date, Time, Timestamp, Decimal)") |
| |
| # List input with all types |
| append_writer.append( |
| [ |
| 9, |
| "Ivan", |
| 90.0, |
| 31, |
| date(1993, 8, 25), |
| dt_time(16, 45, 0), |
| datetime(2024, 1, 17, 15, 30, 0), |
| datetime(2024, 1, 17, 15, 30, 0), |
| Decimal("91500.50"), |
| ] |
| ) |
| print("Successfully appended row (list with Date, Time, Timestamp, Decimal)") |
| |
| # Demo: Write Pandas DataFrame |
| print("\n--- Testing Pandas DataFrame write ---") |
| df = pd.DataFrame( |
| { |
| "id": [10, 11], |
| "name": ["Frank", "Grace"], |
| "score": [89.3, 94.7], |
| "age": [29, 27], |
| "birth_date": [date(1995, 2, 14), date(1997, 9, 30)], |
| "check_in_time": [dt_time(10, 0, 0), dt_time(10, 30, 0)], |
| "created_at": [ |
| datetime(2024, 1, 18, 8, 0), |
| datetime(2024, 1, 18, 8, 30), |
| ], |
| "updated_at": [ |
| datetime(2024, 1, 18, 8, 0), |
| datetime(2024, 1, 18, 8, 30), |
| ], |
| "salary": [Decimal("79000.00"), Decimal("85500.75")], |
| } |
| ) |
| |
| append_writer.write_pandas(df) |
| print("Successfully wrote Pandas DataFrame") |
| |
| # Flush all pending data |
| print("\n--- Flushing data ---") |
| await append_writer.flush() |
| print("Successfully flushed data") |
| |
| # Demo: Check offsets after writes |
| print("\n--- Checking offsets after writes ---") |
| try: |
| offsets = await admin.list_offsets( |
| table_path, |
| bucket_ids=[0], |
| offset_spec=fluss.OffsetSpec.latest() |
| ) |
| print(f"Latest offsets after writing 7 records: {offsets}") |
| except Exception as e: |
| print(f"Failed to list offsets: {e}") |
| |
| except Exception as e: |
| print(f"Error during writing: {e}") |
| |
| # Now scan the table to verify data was written |
| print("\n--- Scanning table (batch scanner) ---") |
| try: |
| # Use new_scan().create_record_batch_log_scanner() for batch-based operations |
| batch_scanner = await table.new_scan().create_record_batch_log_scanner() |
| print(f"Created batch scanner: {batch_scanner}") |
| |
| # Subscribe to buckets (required before to_arrow/to_pandas) |
| # Use subscribe_buckets to subscribe all buckets from EARLIEST_OFFSET |
| num_buckets = (await admin.get_table_info(table_path)).num_buckets |
| batch_scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) |
| print(f"Subscribed to {num_buckets} buckets from EARLIEST_OFFSET") |
| |
| # Read all data using to_arrow() |
| print("Scanning results using to_arrow():") |
| |
| # Try to get as PyArrow Table |
| try: |
| pa_table_result = batch_scanner.to_arrow() |
| print(f"\nAs PyArrow Table: {pa_table_result}") |
| except Exception as e: |
| print(f"Could not convert to PyArrow: {e}") |
| |
| # Create a new batch scanner for to_pandas() test |
| batch_scanner2 = await table.new_scan().create_record_batch_log_scanner() |
| batch_scanner2.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) |
| |
| # Try to get as Pandas DataFrame |
| try: |
| df_result = batch_scanner2.to_pandas() |
| print(f"\nAs Pandas DataFrame:\n{df_result}") |
| except Exception as e: |
| print(f"Could not convert to Pandas: {e}") |
| |
| # TODO: support to_arrow_batch_reader() |
| # which is reserved for streaming use cases |
| |
| # TODO: support to_duckdb() |
| |
| # Test poll_arrow() method for incremental reading as Arrow Table |
| print("\n--- Testing poll_arrow() method ---") |
| batch_scanner3 = await table.new_scan().create_record_batch_log_scanner() |
| batch_scanner3.subscribe(bucket_id=0, start_offset=fluss.EARLIEST_OFFSET) |
| print(f"Subscribed to bucket 0 at EARLIEST_OFFSET ({fluss.EARLIEST_OFFSET})") |
| |
| # Poll with a timeout of 5000ms (5 seconds) |
| # Note: poll_arrow() returns an empty table (not an error) on timeout |
| try: |
| poll_result = batch_scanner3.poll_arrow(5000) |
| print(f"Number of rows: {poll_result.num_rows}") |
| |
| if poll_result.num_rows > 0: |
| poll_df = poll_result.to_pandas() |
| print(f"Polled data:\n{poll_df}") |
| else: |
| print("Empty result (no records available)") |
| # Empty table still has schema - this is useful! |
| print(f"Schema: {poll_result.schema}") |
| |
| except Exception as e: |
| print(f"Error during poll_arrow: {e}") |
| |
| # Test poll_record_batch() method for batches with metadata |
| print("\n--- Testing poll_record_batch() method ---") |
| batch_scanner4 = await table.new_scan().create_record_batch_log_scanner() |
| batch_scanner4.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) |
| |
| try: |
| batches = batch_scanner4.poll_record_batch(5000) |
| print(f"Number of batches: {len(batches)}") |
| |
| for i, batch in enumerate(batches): |
| print(f" Batch {i}: bucket={batch.bucket}, " |
| f"offsets={batch.base_offset}-{batch.last_offset}, " |
| f"rows={batch.batch.num_rows}") |
| |
| except Exception as e: |
| print(f"Error during poll_record_batch: {e}") |
| |
| except Exception as e: |
| print(f"Error during batch scanning: {e}") |
| |
| # Test record-based scanning with poll() |
| print("\n--- Scanning table (record scanner) ---") |
| try: |
| # Use new_scan().create_log_scanner() for record-based operations |
| record_scanner = await table.new_scan().create_log_scanner() |
| print(f"Created record scanner: {record_scanner}") |
| |
| record_scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) |
| |
| # Poll returns ScanRecords — records grouped by bucket |
| print("\n--- Testing poll() method (record-by-record) ---") |
| try: |
| scan_records = record_scanner.poll(5000) |
| print(f"Total records: {scan_records.count()}, buckets: {len(scan_records.buckets())}") |
| |
| # Flat iteration over all records (regardless of bucket) |
| print(f" Flat iteration: {scan_records.count()} records") |
| for record in scan_records: |
| print(f" offset={record.offset}, timestamp={record.timestamp}") |
| |
| # Per-bucket access |
| for bucket in scan_records.buckets(): |
| bucket_recs = scan_records.records(bucket) |
| print(f" Bucket {bucket}: {len(bucket_recs)} records") |
| for record in bucket_recs[:3]: |
| print(f" offset={record.offset}, " |
| f"timestamp={record.timestamp}, " |
| f"change_type={record.change_type}, " |
| f"row={record.row}") |
| |
| except Exception as e: |
| print(f"Error during poll: {e}") |
| |
| except Exception as e: |
| print(f"Error during record scanning: {e}") |
| |
| # Demo: unsubscribe — unsubscribe from a bucket (non-partitioned tables) |
| print("\n--- Testing unsubscribe ---") |
| try: |
| unsub_scanner = await table.new_scan().create_record_batch_log_scanner() |
| unsub_scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) |
| print(f"Subscribed to {num_buckets} buckets") |
| # Unsubscribe from bucket 0 — future polls will skip this bucket |
| unsub_scanner.unsubscribe(bucket_id=0) |
| print("Unsubscribed from bucket 0") |
| remaining = unsub_scanner.poll_arrow(5000) |
| print(f"After unsubscribe, got {remaining.num_rows} records (from remaining buckets)") |
| except Exception as e: |
| print(f"Error during unsubscribe test: {e}") |
| |
| # ===================================================== |
| # Demo: Primary Key Table with Lookup and Upsert |
| # ===================================================== |
| print("\n" + "=" * 60) |
| print("--- Testing Primary Key Table (Lookup & Upsert) ---") |
| print("=" * 60) |
| |
| # Create a primary key table for lookup/upsert tests |
| # Include temporal and decimal types to test full conversion |
| pk_table_fields = [ |
| pa.field("user_id", pa.int32()), |
| pa.field("name", pa.string()), |
| pa.field("email", pa.string()), |
| pa.field("age", pa.int32()), |
| pa.field("birth_date", pa.date32()), |
| pa.field("login_time", pa.time32("ms")), |
| pa.field("created_at", pa.timestamp("us")), # TIMESTAMP (NTZ) |
| pa.field("updated_at", pa.timestamp("us", tz="UTC")), # TIMESTAMP_LTZ |
| pa.field("balance", pa.decimal128(10, 2)), |
| ] |
| pk_schema = pa.schema(pk_table_fields) |
| fluss_pk_schema = fluss.Schema(pk_schema, primary_keys=["user_id"]) |
| |
| # Create table descriptor |
| pk_table_descriptor = fluss.TableDescriptor( |
| fluss_pk_schema, |
| bucket_count=3, |
| ) |
| |
| pk_table_path = fluss.TablePath("fluss", "users_pk_table_v3") |
| |
| try: |
| await admin.create_table(pk_table_path, pk_table_descriptor, True) |
| print(f"Created PK table: {pk_table_path}") |
| except Exception as e: |
| print(f"PK Table creation failed (may already exist): {e}") |
| |
| # Get the PK table |
| pk_table = await conn.get_table(pk_table_path) |
| print(f"Got PK table: {pk_table}") |
| print(f"Has primary key: {pk_table.has_primary_key()}") |
| |
| # --- Test Upsert --- |
| print("\n--- Testing Upsert (fire-and-forget) ---") |
| try: |
| upsert_writer = pk_table.new_upsert().create_writer() |
| print(f"Created upsert writer: {upsert_writer}") |
| |
| # Fire-and-forget: queue writes synchronously, flush at end. |
| # Records are batched internally for efficiency. |
| upsert_writer.upsert( |
| { |
| "user_id": 1, |
| "name": "Alice", |
| "email": "alice@example.com", |
| "age": 25, |
| "birth_date": date(1999, 5, 15), |
| "login_time": dt_time(9, 30, 45, 123000), # 09:30:45.123 |
| "created_at": datetime( |
| 2024, 1, 15, 10, 30, 45, 123456 |
| ), # with microseconds |
| "updated_at": datetime(2024, 1, 15, 10, 30, 45, 123456), |
| "balance": Decimal("1234.56"), |
| } |
| ) |
| print("Queued user_id=1 (Alice)") |
| |
| upsert_writer.upsert( |
| { |
| "user_id": 2, |
| "name": "Bob", |
| "email": "bob@example.com", |
| "age": 30, |
| "birth_date": date(1994, 3, 20), |
| "login_time": dt_time(14, 15, 30, 500000), # 14:15:30.500 |
| "created_at": datetime(2024, 1, 16, 11, 22, 33, 444555), |
| "updated_at": datetime(2024, 1, 16, 11, 22, 33, 444555), |
| "balance": Decimal("5678.91"), |
| } |
| ) |
| print("Queued user_id=2 (Bob)") |
| |
| upsert_writer.upsert( |
| { |
| "user_id": 3, |
| "name": "Charlie", |
| "email": "charlie@example.com", |
| "age": 35, |
| "birth_date": date(1989, 11, 8), |
| "login_time": dt_time(16, 45, 59, 999000), # 16:45:59.999 |
| "created_at": datetime(2024, 1, 17, 23, 59, 59, 999999), |
| "updated_at": datetime(2024, 1, 17, 23, 59, 59, 999999), |
| "balance": Decimal("9876.54"), |
| } |
| ) |
| print("Queued user_id=3 (Charlie)") |
| |
| # flush() waits for all queued writes to be acknowledged by the server |
| await upsert_writer.flush() |
| print("Flushed — all 3 rows acknowledged by server") |
| |
| # Per-record acknowledgment: await the returned handle to block until |
| # the server confirms this specific write, useful when you need to |
| # read-after-write or verify critical updates. |
| print("\n--- Testing Upsert (per-record acknowledgment) ---") |
| handle = upsert_writer.upsert( |
| { |
| "user_id": 1, |
| "name": "Alice Updated", |
| "email": "alice.new@example.com", |
| "age": 26, |
| "birth_date": date(1999, 5, 15), |
| "login_time": dt_time(10, 11, 12, 345000), # 10:11:12.345 |
| "created_at": datetime(2024, 1, 15, 10, 30, 45, 123456), # unchanged |
| "updated_at": datetime( |
| 2024, 1, 20, 15, 45, 30, 678901 |
| ), # new update time |
| "balance": Decimal("2345.67"), |
| } |
| ) |
| await handle.wait() # wait for server acknowledgment |
| print("Updated user_id=1 (Alice -> Alice Updated) — server acknowledged") |
| |
| except Exception as e: |
| print(f"Error during upsert: {e}") |
| traceback.print_exc() |
| |
| # --- Test Lookup --- |
| print("\n--- Testing Lookup ---") |
| try: |
| lookuper = pk_table.new_lookup().create_lookuper() |
| print(f"Created lookuper: {lookuper}") |
| |
| result = await lookuper.lookup({"user_id": 1}) |
| if result: |
| print("Lookup user_id=1: Found!") |
| print(f" name: {result['name']}") |
| print(f" email: {result['email']}") |
| print(f" age: {result['age']}") |
| print( |
| f" birth_date: {result['birth_date']} (type: {type(result['birth_date']).__name__})" |
| ) |
| print( |
| f" login_time: {result['login_time']} (type: {type(result['login_time']).__name__})" |
| ) |
| print( |
| f" created_at: {result['created_at']} (type: {type(result['created_at']).__name__})" |
| ) |
| print( |
| f" updated_at: {result['updated_at']} (type: {type(result['updated_at']).__name__})" |
| ) |
| print( |
| f" balance: {result['balance']} (type: {type(result['balance']).__name__})" |
| ) |
| else: |
| print("Lookup user_id=1: Not found") |
| |
| # Lookup another row |
| result = await lookuper.lookup({"user_id": 2}) |
| if result: |
| print(f"Lookup user_id=2: Found! -> {result}") |
| else: |
| print("Lookup user_id=2: Not found") |
| |
| # Lookup non-existent row |
| result = await lookuper.lookup({"user_id": 999}) |
| if result: |
| print(f"Lookup user_id=999: Found! -> {result}") |
| else: |
| print("Lookup user_id=999: Not found (as expected)") |
| |
| except Exception as e: |
| print(f"Error during lookup: {e}") |
| traceback.print_exc() |
| |
| # --- Test Delete --- |
| print("\n--- Testing Delete ---") |
| try: |
| upsert_writer = pk_table.new_upsert().create_writer() |
| |
| handle = upsert_writer.delete({"user_id": 3}) |
| await handle.wait() |
| print("Deleted user_id=3 — server acknowledged") |
| |
| lookuper = pk_table.new_lookup().create_lookuper() |
| result = await lookuper.lookup({"user_id": 3}) |
| if result: |
| print(f"Lookup user_id=3 after delete: Still found! -> {result}") |
| else: |
| print("Lookup user_id=3 after delete: Not found (deletion confirmed)") |
| |
| except Exception as e: |
| print(f"Error during delete: {e}") |
| traceback.print_exc() |
| |
| # --- Test Partial Update by column names --- |
| print("\n--- Testing Partial Update (by column names) ---") |
| try: |
| partial_writer = pk_table.new_upsert().partial_update_by_name(["user_id", "balance"]).create_writer() |
| handle = partial_writer.upsert({"user_id": 1, "balance": Decimal("9999.99")}) |
| await handle.wait() |
| print("Partial update: set balance=9999.99 for user_id=1") |
| |
| lookuper = pk_table.new_lookup().create_lookuper() |
| result = await lookuper.lookup({"user_id": 1}) |
| if result: |
| print(f"Partial update verified:" |
| f"\n name={result['name']} (unchanged)" |
| f"\n balance={result['balance']} (updated)") |
| else: |
| print("ERROR: Expected to find user_id=1") |
| |
| except Exception as e: |
| print(f"Error during partial update by names: {e}") |
| traceback.print_exc() |
| |
| # --- Test Partial Update by column indices --- |
| print("\n--- Testing Partial Update (by column indices) ---") |
| try: |
| # Columns: 0=user_id (PK), 1=name — update name only |
| partial_writer_idx = pk_table.new_upsert().partial_update_by_index([0, 1]).create_writer() |
| handle = partial_writer_idx.upsert([1, "Alice Renamed"]) |
| await handle.wait() |
| print("Partial update by indices: set name='Alice Renamed' for user_id=1") |
| |
| lookuper = pk_table.new_lookup().create_lookuper() |
| result = await lookuper.lookup({"user_id": 1}) |
| if result: |
| print(f"Partial update by indices verified:" |
| f"\n name={result['name']} (updated)" |
| f"\n balance={result['balance']} (unchanged)") |
| else: |
| print("ERROR: Expected to find user_id=1") |
| |
| except Exception as e: |
| print(f"Error during partial update by indices: {e}") |
| traceback.print_exc() |
| |
| # Demo: Column projection using builder pattern |
| print("\n--- Testing Column Projection ---") |
| try: |
| # Get bucket count for subscriptions |
| num_buckets = (await admin.get_table_info(table_path)).num_buckets |
| |
| # Project specific columns by index (using batch scanner for to_pandas) |
| print("\n1. Projection by index [0, 1] (id, name):") |
| scanner_index = await table.new_scan().project([0, 1]).create_record_batch_log_scanner() |
| scanner_index.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) |
| df_projected = scanner_index.to_pandas() |
| print(df_projected.head()) |
| print( |
| f" Projected {df_projected.shape[1]} columns: {list(df_projected.columns)}" |
| ) |
| |
| # Project specific columns by name (Pythonic!) |
| print("\n2. Projection by name ['name', 'score'] (Pythonic):") |
| scanner_names = await table.new_scan() \ |
| .project_by_name(["name", "score"]) \ |
| .create_record_batch_log_scanner() |
| scanner_names.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) |
| df_named = scanner_names.to_pandas() |
| print(df_named.head()) |
| print(f" Projected {df_named.shape[1]} columns: {list(df_named.columns)}") |
| |
| # Test empty result schema with projection |
| print("\n3. Testing empty result schema with projection:") |
| scanner_proj = await table.new_scan().project([0, 2]).create_record_batch_log_scanner() |
| scanner_proj.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) |
| # Quick poll that may return empty |
| result = scanner_proj.poll_arrow(100) |
| print(f" Schema columns: {result.schema.names}") |
| |
| except Exception as e: |
| print(f"Error during projection: {e}") |
| |
| # Demo: Drop tables |
| print("\n--- Testing drop_table() ---") |
| try: |
| # Drop the log table |
| await admin.drop_table(table_path, ignore_if_not_exists=True) |
| print(f"Successfully dropped table: {table_path}") |
| # Drop the PK table |
| await admin.drop_table(pk_table_path, ignore_if_not_exists=True) |
| print(f"Successfully dropped table: {pk_table_path}") |
| except Exception as e: |
| print(f"Failed to drop table: {e}") |
| |
| # ===================================================== |
| # Demo: Partitioned Table with list_partition_offsets |
| # ===================================================== |
| print("\n" + "=" * 60) |
| print("--- Testing Partitioned Table ---") |
| print("=" * 60) |
| |
| # Create a partitioned log table |
| partitioned_fields = [ |
| pa.field("id", pa.int32()), |
| pa.field("region", pa.string()), # partition key |
| pa.field("value", pa.int64()), |
| ] |
| partitioned_schema = pa.schema(partitioned_fields) |
| fluss_partitioned_schema = fluss.Schema(partitioned_schema) |
| |
| partitioned_table_descriptor = fluss.TableDescriptor( |
| fluss_partitioned_schema, |
| partition_keys=["region"], # Partition by region |
| bucket_count=1, |
| ) |
| |
| partitioned_table_path = fluss.TablePath("fluss", "partitioned_log_table_py") |
| |
| try: |
| # Drop if exists first |
| await admin.drop_table(partitioned_table_path, ignore_if_not_exists=True) |
| print(f"Dropped existing table: {partitioned_table_path}") |
| |
| # Create the partitioned table |
| await admin.create_table(partitioned_table_path, partitioned_table_descriptor, False) |
| print(f"Created partitioned table: {partitioned_table_path}") |
| |
| # Create partitions for US and EU regions |
| print("\n--- Creating partitions ---") |
| await admin.create_partition(partitioned_table_path, {"region": "US"}, ignore_if_exists=True) |
| print("Created partition: region=US") |
| await admin.create_partition(partitioned_table_path, {"region": "EU"}, ignore_if_exists=True) |
| print("Created partition: region=EU") |
| |
| # List partitions |
| print("\n--- Listing partitions ---") |
| partition_infos = await admin.list_partition_infos(partitioned_table_path) |
| for p in partition_infos: |
| print(f" {p}") # PartitionInfo(partition_id=..., partition_name='region=...') |
| |
| # Get the table and write some data |
| partitioned_table = await conn.get_table(partitioned_table_path) |
| partitioned_writer = partitioned_table.new_append().create_writer() |
| |
| # Append data to US partition |
| partitioned_writer.append({"id": 1, "region": "US", "value": 100}) |
| partitioned_writer.append({"id": 2, "region": "US", "value": 200}) |
| # Append data to EU partition |
| partitioned_writer.append({"id": 3, "region": "EU", "value": 300}) |
| partitioned_writer.append({"id": 4, "region": "EU", "value": 400}) |
| await partitioned_writer.flush() |
| print("\nWrote 4 records (2 to US, 2 to EU)") |
| |
| # Demo: list_partition_infos with partial spec filter |
| print("\n--- Testing list_partition_infos with spec ---") |
| us_partitions = await admin.list_partition_infos( |
| partitioned_table_path, partition_spec={"region": "US"} |
| ) |
| print(f"Filtered partitions (region=US): {us_partitions}") |
| |
| # Demo: list_partition_offsets |
| print("\n--- Testing list_partition_offsets ---") |
| |
| # Query offsets for US partition |
| # Note: partition_name is just the value (e.g., "US"), not "region=US" |
| us_offsets = await admin.list_partition_offsets( |
| partitioned_table_path, |
| partition_name="US", |
| bucket_ids=[0], |
| offset_spec=fluss.OffsetSpec.latest() |
| ) |
| print(f"US partition latest offsets: {us_offsets}") |
| |
| # Query offsets for EU partition |
| eu_offsets = await admin.list_partition_offsets( |
| partitioned_table_path, |
| partition_name="EU", |
| bucket_ids=[0], |
| offset_spec=fluss.OffsetSpec.latest() |
| ) |
| print(f"EU partition latest offsets: {eu_offsets}") |
| |
| # Demo: subscribe_partition for reading partitioned data |
| print("\n--- Testing subscribe_partition + to_arrow() ---") |
| partitioned_scanner = await partitioned_table.new_scan().create_record_batch_log_scanner() |
| |
| # Subscribe to each partition using partition_id |
| for p in partition_infos: |
| partitioned_scanner.subscribe_partition( |
| partition_id=p.partition_id, |
| bucket_id=0, |
| start_offset=fluss.EARLIEST_OFFSET |
| ) |
| print(f"Subscribed to partition {p.partition_name} (id={p.partition_id})") |
| |
| # Use to_arrow() - now works for partitioned tables! |
| partitioned_arrow = partitioned_scanner.to_arrow() |
| print(f"\nto_arrow() returned {partitioned_arrow.num_rows} records from partitioned table:") |
| print(partitioned_arrow.to_pandas()) |
| |
| # Demo: subscribe_partition_buckets for batch subscribing to multiple partitions at once |
| print("\n--- Testing subscribe_partition_buckets + to_arrow() ---") |
| partitioned_scanner_batch = await partitioned_table.new_scan().create_record_batch_log_scanner() |
| partition_bucket_offsets = { |
| (p.partition_id, 0): fluss.EARLIEST_OFFSET for p in partition_infos |
| } |
| partitioned_scanner_batch.subscribe_partition_buckets(partition_bucket_offsets) |
| print(f"Batch subscribed to {len(partition_bucket_offsets)} partition+bucket combinations") |
| partitioned_batch_arrow = partitioned_scanner_batch.to_arrow() |
| print(f"to_arrow() returned {partitioned_batch_arrow.num_rows} records:") |
| print(partitioned_batch_arrow.to_pandas()) |
| |
| # Demo: unsubscribe_partition - unsubscribe from one partition, read remaining |
| print("\n--- Testing unsubscribe_partition ---") |
| partitioned_scanner3 = await partitioned_table.new_scan().create_record_batch_log_scanner() |
| for p in partition_infos: |
| partitioned_scanner3.subscribe_partition(p.partition_id, 0, fluss.EARLIEST_OFFSET) |
| # Unsubscribe from the first partition |
| first_partition = partition_infos[0] |
| partitioned_scanner3.unsubscribe_partition(first_partition.partition_id, 0) |
| print(f"Unsubscribed from partition {first_partition.partition_name} (id={first_partition.partition_id})") |
| remaining_arrow = partitioned_scanner3.to_arrow() |
| print(f"After unsubscribe, to_arrow() returned {remaining_arrow.num_rows} records (from remaining partitions):") |
| print(remaining_arrow.to_pandas()) |
| |
| # Demo: to_pandas() also works for partitioned tables |
| print("\n--- Testing to_pandas() on partitioned table ---") |
| partitioned_scanner2 = await partitioned_table.new_scan().create_record_batch_log_scanner() |
| for p in partition_infos: |
| partitioned_scanner2.subscribe_partition(p.partition_id, 0, fluss.EARLIEST_OFFSET) |
| partitioned_df = partitioned_scanner2.to_pandas() |
| print(f"to_pandas() returned {len(partitioned_df)} records:") |
| print(partitioned_df) |
| |
| # Cleanup |
| await admin.drop_table(partitioned_table_path, ignore_if_not_exists=True) |
| print(f"\nDropped partitioned table: {partitioned_table_path}") |
| |
| except Exception as e: |
| print(f"Error with partitioned table: {e}") |
| traceback.print_exc() |
| |
| # ===================================================== |
| # Demo: Partitioned KV Table (Upsert, Lookup, Delete) |
| # ===================================================== |
| print("\n" + "=" * 60) |
| print("--- Testing Partitioned KV Table ---") |
| print("=" * 60) |
| |
| partitioned_kv_fields = [ |
| pa.field("region", pa.string()), # partition key + part of PK |
| pa.field("user_id", pa.int32()), # part of PK |
| pa.field("name", pa.string()), |
| pa.field("score", pa.int64()), |
| ] |
| partitioned_kv_schema = pa.schema(partitioned_kv_fields) |
| fluss_partitioned_kv_schema = fluss.Schema( |
| partitioned_kv_schema, primary_keys=["region", "user_id"] |
| ) |
| |
| partitioned_kv_descriptor = fluss.TableDescriptor( |
| fluss_partitioned_kv_schema, |
| partition_keys=["region"], |
| ) |
| |
| partitioned_kv_path = fluss.TablePath("fluss", "partitioned_kv_table_py") |
| |
| try: |
| await admin.drop_table(partitioned_kv_path, ignore_if_not_exists=True) |
| await admin.create_table(partitioned_kv_path, partitioned_kv_descriptor, False) |
| print(f"Created partitioned KV table: {partitioned_kv_path}") |
| |
| # Create partitions |
| await admin.create_partition(partitioned_kv_path, {"region": "US"}) |
| await admin.create_partition(partitioned_kv_path, {"region": "EU"}) |
| await admin.create_partition(partitioned_kv_path, {"region": "APAC"}) |
| print("Created partitions: US, EU, APAC") |
| |
| partitioned_kv_table = await conn.get_table(partitioned_kv_path) |
| upsert_writer = partitioned_kv_table.new_upsert().create_writer() |
| |
| # Upsert rows across partitions |
| test_data = [ |
| ("US", 1, "Gustave", 100), |
| ("US", 2, "Lune", 200), |
| ("EU", 1, "Sciel", 150), |
| ("EU", 2, "Maelle", 250), |
| ("APAC", 1, "Noco", 300), |
| ] |
| for region, user_id, name, score in test_data: |
| upsert_writer.upsert({ |
| "region": region, "user_id": user_id, |
| "name": name, "score": score, |
| }) |
| await upsert_writer.flush() |
| print(f"Upserted {len(test_data)} rows across 3 partitions") |
| |
| # Lookup all rows across partitions |
| print("\n--- Lookup across partitions ---") |
| lookuper = partitioned_kv_table.new_lookup().create_lookuper() |
| for region, user_id, name, score in test_data: |
| result = await lookuper.lookup({"region": region, "user_id": user_id}) |
| assert result is not None, f"Expected to find region={region} user_id={user_id}" |
| assert result["name"] == name, f"Name mismatch: {result['name']} != {name}" |
| assert result["score"] == score, f"Score mismatch: {result['score']} != {score}" |
| print(f"All {len(test_data)} rows verified across partitions") |
| |
| # Update within a partition |
| print("\n--- Update within partition ---") |
| handle = upsert_writer.upsert({ |
| "region": "US", "user_id": 1, |
| "name": "Gustave Updated", "score": 999, |
| }) |
| await handle.wait() |
| result = await lookuper.lookup({"region": "US", "user_id": 1}) |
| assert result is not None, "Expected to find region=US user_id=1 after update" |
| assert result["name"] == "Gustave Updated" |
| assert result["score"] == 999 |
| print(f"Update verified: US/1 name={result['name']} score={result['score']}") |
| |
| # Lookup in non-existent partition |
| print("\n--- Lookup in non-existent partition ---") |
| result = await lookuper.lookup({"region": "UNKNOWN", "user_id": 1}) |
| assert result is None, "Expected UNKNOWN partition lookup to return None" |
| print("UNKNOWN partition lookup: not found (expected)") |
| |
| # Delete within a partition |
| print("\n--- Delete within partition ---") |
| handle = upsert_writer.delete({"region": "EU", "user_id": 1}) |
| await handle.wait() |
| result = await lookuper.lookup({"region": "EU", "user_id": 1}) |
| assert result is None, "Expected EU/1 to be deleted" |
| print("Delete verified: EU/1 not found") |
| |
| # Verify sibling record still exists |
| result = await lookuper.lookup({"region": "EU", "user_id": 2}) |
| assert result is not None, "Expected EU/2 to still exist" |
| assert result["name"] == "Maelle" |
| print(f"EU/2 still exists: name={result['name']}") |
| |
| # Cleanup |
| await admin.drop_table(partitioned_kv_path, ignore_if_not_exists=True) |
| print(f"\nDropped partitioned KV table: {partitioned_kv_path}") |
| |
| except Exception as e: |
| print(f"Error with partitioned KV table: {e}") |
| traceback.print_exc() |
| |
| # Close connection |
| conn.close() |
| print("\nConnection closed") |
| |
| |
| if __name__ == "__main__": |
| # Run the async main function |
| asyncio.run(main()) |