| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| use crate::TOKIO_RUNTIME; |
| use crate::*; |
| use arrow::array::RecordBatch as ArrowRecordBatch; |
| use arrow_pyarrow::{FromPyArrow, ToPyArrow}; |
| use arrow_schema::SchemaRef; |
| use fluss::record::to_arrow_schema; |
| use fluss::rpc::message::OffsetSpec; |
| use pyo3::types::IntoPyDict; |
| use pyo3_async_runtimes::tokio::future_into_py; |
| use std::collections::HashMap; |
| use std::sync::Arc; |
| use std::time::Duration; |
| |
| // Time conversion constants |
| const MILLIS_PER_SECOND: i64 = 1_000; |
| const MILLIS_PER_MINUTE: i64 = 60_000; |
| const MILLIS_PER_HOUR: i64 = 3_600_000; |
| const MICROS_PER_MILLI: i64 = 1_000; |
| const MICROS_PER_SECOND: i64 = 1_000_000; |
| const MICROS_PER_DAY: i64 = 86_400_000_000; |
| const NANOS_PER_MILLI: i64 = 1_000_000; |
| const NANOS_PER_MICRO: i64 = 1_000; |
| |
| /// Represents a single scan record with metadata |
| #[pyclass] |
| pub struct ScanRecord { |
| #[pyo3(get)] |
| bucket: TableBucket, |
| #[pyo3(get)] |
| offset: i64, |
| #[pyo3(get)] |
| timestamp: i64, |
| #[pyo3(get)] |
| change_type: ChangeType, |
| /// Store row as a Python dict directly |
| row_dict: Py<pyo3::types::PyDict>, |
| } |
| |
| #[pymethods] |
| impl ScanRecord { |
| /// Get the row data as a dictionary |
| #[getter] |
| pub fn row(&self, py: Python) -> Py<pyo3::types::PyDict> { |
| self.row_dict.clone_ref(py) |
| } |
| |
| fn __str__(&self) -> String { |
| format!( |
| "ScanRecord(bucket={}, offset={}, timestamp={}, change_type={})", |
| self.bucket.__str__(), |
| self.offset, |
| self.timestamp, |
| self.change_type.short_string() |
| ) |
| } |
| |
| fn __repr__(&self) -> String { |
| self.__str__() |
| } |
| } |
| |
| impl ScanRecord { |
| /// Create a ScanRecord from core types |
| pub fn from_core( |
| py: Python, |
| bucket: &fcore::metadata::TableBucket, |
| record: &fcore::record::ScanRecord, |
| row_type: &fcore::metadata::RowType, |
| ) -> PyResult<Self> { |
| let fields = row_type.fields(); |
| let row = record.row(); |
| let dict = pyo3::types::PyDict::new(py); |
| |
| for (pos, field) in fields.iter().enumerate() { |
| let value = datum_to_python_value(py, row, pos, field.data_type())?; |
| dict.set_item(field.name(), value)?; |
| } |
| |
| Ok(ScanRecord { |
| bucket: TableBucket::from_core(bucket.clone()), |
| offset: record.offset(), |
| timestamp: record.timestamp(), |
| change_type: ChangeType::from_core(*record.change_type()), |
| row_dict: dict.unbind(), |
| }) |
| } |
| } |
| |
| /// Represents a batch of records with metadata |
| #[pyclass] |
| pub struct RecordBatch { |
| batch: Arc<ArrowRecordBatch>, |
| #[pyo3(get)] |
| bucket: TableBucket, |
| #[pyo3(get)] |
| base_offset: i64, |
| #[pyo3(get)] |
| last_offset: i64, |
| } |
| |
| #[pymethods] |
| impl RecordBatch { |
| /// Get the Arrow RecordBatch as PyArrow RecordBatch |
| #[getter] |
| pub fn batch(&self, py: Python) -> PyResult<Py<PyAny>> { |
| let pyarrow_batch = self |
| .batch |
| .as_ref() |
| .to_pyarrow(py) |
| .map_err(|e| FlussError::new_err(format!("Failed to convert batch: {e}")))?; |
| Ok(pyarrow_batch.unbind()) |
| } |
| |
| fn __str__(&self) -> String { |
| format!( |
| "RecordBatch(bucket={}, base_offset={}, last_offset={}, rows={})", |
| self.bucket.__str__(), |
| self.base_offset, |
| self.last_offset, |
| self.batch.num_rows() |
| ) |
| } |
| |
| fn __repr__(&self) -> String { |
| self.__str__() |
| } |
| } |
| |
| impl RecordBatch { |
| /// Create a RecordBatch from core ScanBatch |
| pub fn from_scan_batch(scan_batch: fcore::record::ScanBatch) -> Self { |
| RecordBatch { |
| bucket: TableBucket::from_core(scan_batch.bucket().clone()), |
| base_offset: scan_batch.base_offset(), |
| last_offset: scan_batch.last_offset(), |
| batch: Arc::new(scan_batch.into_batch()), |
| } |
| } |
| } |
| |
| /// Represents a Fluss table for data operations |
| #[pyclass] |
| pub struct FlussTable { |
| connection: Arc<fcore::client::FlussConnection>, |
| metadata: Arc<fcore::client::Metadata>, |
| table_info: fcore::metadata::TableInfo, |
| table_path: fcore::metadata::TablePath, |
| has_primary_key: bool, |
| } |
| |
| /// Builder for creating log scanners with flexible configuration. |
| /// |
| /// Use this builder to configure projection, and in the future, filters |
| /// before creating a log scanner. |
| #[pyclass] |
| pub struct TableScan { |
| connection: Arc<fcore::client::FlussConnection>, |
| metadata: Arc<fcore::client::Metadata>, |
| table_info: fcore::metadata::TableInfo, |
| projection: Option<ProjectionType>, |
| } |
| |
| /// Scanner type for internal use |
| enum ScannerType { |
| Record, |
| Batch, |
| } |
| |
| #[pymethods] |
| impl TableScan { |
| /// Project to specific columns by their indices. |
| /// |
| /// Args: |
| /// indices: List of column indices (0-based) to include in the scan. |
| /// |
| /// Returns: |
| /// Self for method chaining. |
| pub fn project(mut slf: PyRefMut<'_, Self>, indices: Vec<usize>) -> PyRefMut<'_, Self> { |
| slf.projection = Some(ProjectionType::Indices(indices)); |
| slf |
| } |
| |
| /// Project to specific columns by their names. |
| /// |
| /// Args: |
| /// names: List of column names to include in the scan. |
| /// |
| /// Returns: |
| /// Self for method chaining. |
| pub fn project_by_name(mut slf: PyRefMut<'_, Self>, names: Vec<String>) -> PyRefMut<'_, Self> { |
| slf.projection = Some(ProjectionType::Names(names)); |
| slf |
| } |
| |
| /// Create a record-based log scanner. |
| /// |
| /// Use this scanner with `poll()` to get individual records with metadata |
| /// (offset, timestamp, change_type). |
| /// |
| /// Returns: |
| /// LogScanner for record-by-record scanning with `poll()` |
| pub fn create_log_scanner<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> { |
| self.create_scanner_internal(py, ScannerType::Record) |
| } |
| |
| /// Create a batch-based log scanner. |
| /// |
| /// Use this scanner with `poll_arrow()` to get Arrow Tables, or with |
| /// `poll_batches()` to get individual batches with metadata. |
| /// |
| /// Returns: |
| /// LogScanner for batch-based scanning with `poll_arrow()` or `poll_batches()` |
| pub fn create_batch_scanner<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> { |
| self.create_scanner_internal(py, ScannerType::Batch) |
| } |
| |
| fn __repr__(&self) -> String { |
| format!( |
| "TableScan(table={}.{})", |
| self.table_info.table_path.database(), |
| self.table_info.table_path.table() |
| ) |
| } |
| } |
| |
| impl TableScan { |
| fn create_scanner_internal<'py>( |
| &self, |
| py: Python<'py>, |
| scanner_type: ScannerType, |
| ) -> PyResult<Bound<'py, PyAny>> { |
| let conn = self.connection.clone(); |
| let metadata = self.metadata.clone(); |
| let table_info = self.table_info.clone(); |
| let projection = self.projection.clone(); |
| |
| future_into_py(py, async move { |
| let fluss_table = fcore::client::FlussTable::new(&conn, metadata, table_info.clone()); |
| |
| let projection_indices = resolve_projection_indices(&projection, &table_info)?; |
| let table_scan = apply_projection(fluss_table.new_scan(), projection)?; |
| |
| let admin = conn |
| .get_admin() |
| .await |
| .map_err(|e| FlussError::new_err(e.to_string()))?; |
| |
| let (projected_schema, projected_row_type) = |
| calculate_projected_types(&table_info, projection_indices)?; |
| |
| let scanner_kind = match scanner_type { |
| ScannerType::Record => { |
| let s = table_scan.create_log_scanner().map_err(|e| { |
| FlussError::new_err(format!("Failed to create log scanner: {e}")) |
| })?; |
| ScannerKind::Record(s) |
| } |
| ScannerType::Batch => { |
| let s = table_scan.create_record_batch_log_scanner().map_err(|e| { |
| FlussError::new_err(format!("Failed to create batch scanner: {e}")) |
| })?; |
| ScannerKind::Batch(s) |
| } |
| }; |
| |
| let py_scanner = LogScanner::new( |
| scanner_kind, |
| admin, |
| table_info, |
| projected_schema, |
| projected_row_type, |
| ); |
| |
| Python::attach(|py| Py::new(py, py_scanner)) |
| }) |
| } |
| } |
| |
| /// Internal enum to represent different projection types |
| #[derive(Clone)] |
| enum ProjectionType { |
| Indices(Vec<usize>), |
| Names(Vec<String>), |
| } |
| |
| /// Resolve projection to column indices |
| fn resolve_projection_indices( |
| projection: &Option<ProjectionType>, |
| table_info: &fcore::metadata::TableInfo, |
| ) -> PyResult<Option<Vec<usize>>> { |
| match projection { |
| Some(ProjectionType::Indices(indices)) => Ok(Some(indices.clone())), |
| Some(ProjectionType::Names(names)) => { |
| let schema = table_info.get_schema(); |
| let columns = schema.columns(); |
| let mut indices = Vec::with_capacity(names.len()); |
| for name in names { |
| let idx = columns |
| .iter() |
| .position(|c| c.name() == name) |
| .ok_or_else(|| FlussError::new_err(format!("Column '{name}' not found")))?; |
| indices.push(idx); |
| } |
| Ok(Some(indices)) |
| } |
| None => Ok(None), |
| } |
| } |
| |
| /// Apply projection to table scan |
| fn apply_projection( |
| table_scan: fcore::client::TableScan, |
| projection: Option<ProjectionType>, |
| ) -> PyResult<fcore::client::TableScan> { |
| match projection { |
| Some(ProjectionType::Indices(indices)) => table_scan |
| .project(&indices) |
| .map_err(|e| FlussError::new_err(format!("Failed to project columns: {e}"))), |
| Some(ProjectionType::Names(names)) => { |
| let column_name_refs: Vec<&str> = names.iter().map(|s| s.as_str()).collect(); |
| table_scan |
| .project_by_name(&column_name_refs) |
| .map_err(|e| FlussError::new_err(format!("Failed to project columns: {e}"))) |
| } |
| None => Ok(table_scan), |
| } |
| } |
| |
| /// Calculate projected schema and row type from projection indices |
| fn calculate_projected_types( |
| table_info: &fcore::metadata::TableInfo, |
| projection_indices: Option<Vec<usize>>, |
| ) -> PyResult<(SchemaRef, fcore::metadata::RowType)> { |
| let full_schema = to_arrow_schema(table_info.get_row_type()) |
| .map_err(|e| FlussError::new_err(format!("Failed to get arrow schema: {e}")))?; |
| let full_row_type = table_info.get_row_type(); |
| |
| match projection_indices { |
| Some(indices) => { |
| let arrow_fields: Vec<_> = indices |
| .iter() |
| .map(|&i| full_schema.field(i).clone()) |
| .collect(); |
| let row_fields: Vec<_> = indices |
| .iter() |
| .map(|&i| full_row_type.fields()[i].clone()) |
| .collect(); |
| Ok(( |
| Arc::new(arrow_schema::Schema::new(arrow_fields)), |
| fcore::metadata::RowType::new(row_fields), |
| )) |
| } |
| None => Ok((full_schema, full_row_type.clone())), |
| } |
| } |
| |
| #[pymethods] |
| impl FlussTable { |
| /// Create a new table scan builder for configuring and creating log scanners. |
| /// |
| /// Use this method to create scanners with the builder pattern: |
| /// Returns: |
| /// TableScan builder for configuring the scanner. |
| pub fn new_scan(&self) -> TableScan { |
| TableScan { |
| connection: self.connection.clone(), |
| metadata: self.metadata.clone(), |
| table_info: self.table_info.clone(), |
| projection: None, |
| } |
| } |
| |
| /// Create a new append writer for the table |
| fn new_append_writer<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> { |
| let conn = self.connection.clone(); |
| let metadata = self.metadata.clone(); |
| let table_info = self.table_info.clone(); |
| |
| future_into_py(py, async move { |
| let fluss_table = fcore::client::FlussTable::new(&conn, metadata, table_info.clone()); |
| |
| let table_append = fluss_table |
| .new_append() |
| .map_err(|e| FlussError::new_err(e.to_string()))?; |
| |
| let rust_writer = table_append |
| .create_writer() |
| .map_err(|e| FlussError::new_err(e.to_string()))?; |
| |
| let py_writer = AppendWriter::from_core(rust_writer, table_info); |
| |
| Python::attach(|py| Py::new(py, py_writer)) |
| }) |
| } |
| |
| /// Get table information |
| pub fn get_table_info(&self) -> TableInfo { |
| TableInfo::from_core(self.table_info.clone()) |
| } |
| |
| /// Get table path |
| pub fn get_table_path(&self) -> TablePath { |
| TablePath::from_core(self.table_path.clone()) |
| } |
| |
| /// Check if table has primary key |
| pub fn has_primary_key(&self) -> bool { |
| self.has_primary_key |
| } |
| |
| /// Create a new lookuper for primary key lookups. |
| /// |
| /// This is only available for tables with a primary key. |
| pub fn new_lookup(&self, _py: Python) -> PyResult<crate::Lookuper> { |
| if !self.has_primary_key { |
| return Err(FlussError::new_err( |
| "Lookup is only supported for primary key tables", |
| )); |
| } |
| |
| crate::Lookuper::new( |
| &self.connection, |
| self.metadata.clone(), |
| self.table_info.clone(), |
| ) |
| } |
| |
| /// Create a new upsert writer for the table. |
| /// |
| /// This is only available for tables with a primary key. |
| /// |
| /// Args: |
| /// columns: Optional list of column names for partial update. |
| /// Only the specified columns will be updated. |
| /// column_indices: Optional list of column indices (0-based) for partial update. |
| /// Alternative to `columns` parameter. |
| #[pyo3(signature = (columns=None, column_indices=None))] |
| pub fn new_upsert( |
| &self, |
| _py: Python, |
| columns: Option<Vec<String>>, |
| column_indices: Option<Vec<usize>>, |
| ) -> PyResult<crate::UpsertWriter> { |
| if !self.has_primary_key { |
| return Err(FlussError::new_err( |
| "Upsert is only supported for primary key tables", |
| )); |
| } |
| |
| // Validate that at most one parameter is specified |
| if columns.is_some() && column_indices.is_some() { |
| return Err(FlussError::new_err( |
| "Specify only one of 'columns' or 'column_indices', not both", |
| )); |
| } |
| |
| let fluss_table = fcore::client::FlussTable::new( |
| &self.connection, |
| self.metadata.clone(), |
| self.table_info.clone(), |
| ); |
| |
| let table_upsert = fluss_table |
| .new_upsert() |
| .map_err(|e| FlussError::new_err(e.to_string()))?; |
| |
| crate::UpsertWriter::new( |
| table_upsert, |
| self.table_info.clone(), |
| columns, |
| column_indices, |
| ) |
| } |
| |
| fn __repr__(&self) -> String { |
| format!( |
| "FlussTable(path={}.{})", |
| self.table_path.database(), |
| self.table_path.table() |
| ) |
| } |
| } |
| |
| impl FlussTable { |
| /// Create a FlussTable |
| pub fn new_table( |
| connection: Arc<fcore::client::FlussConnection>, |
| metadata: Arc<fcore::client::Metadata>, |
| table_info: fcore::metadata::TableInfo, |
| table_path: fcore::metadata::TablePath, |
| has_primary_key: bool, |
| ) -> Self { |
| Self { |
| connection, |
| metadata, |
| table_info, |
| table_path, |
| has_primary_key, |
| } |
| } |
| } |
| |
| /// Writer for appending data to a Fluss table |
| #[pyclass] |
| pub struct AppendWriter { |
| inner: Arc<fcore::client::AppendWriter>, |
| table_info: fcore::metadata::TableInfo, |
| } |
| |
| #[pymethods] |
| impl AppendWriter { |
| /// Write Arrow table data (fire-and-forget, use flush() to ensure delivery) |
| pub fn write_arrow(&self, py: Python, table: Py<PyAny>) -> PyResult<()> { |
| // Convert Arrow Table to batches and write each batch |
| let batches = table.call_method0(py, "to_batches")?; |
| let batch_list: Vec<Py<PyAny>> = batches.extract(py)?; |
| |
| for batch in batch_list { |
| // Drop the handle — fire-and-forget for bulk writes |
| drop(self.write_arrow_batch(py, batch)?); |
| } |
| Ok(()) |
| } |
| |
| /// Write Arrow batch data. |
| /// |
| /// Returns: |
| /// WriteResultHandle that can be ignored (fire-and-forget) or |
| /// awaited via `handle.wait()` for server acknowledgment. |
| pub fn write_arrow_batch(&self, py: Python, batch: Py<PyAny>) -> PyResult<WriteResultHandle> { |
| // This shares the underlying Arrow buffers without copying data |
| let batch_bound = batch.bind(py); |
| let rust_batch: ArrowRecordBatch = FromPyArrow::from_pyarrow_bound(batch_bound) |
| .map_err(|e| FlussError::new_err(format!("Failed to convert RecordBatch: {e}")))?; |
| |
| let result_future = self |
| .inner |
| .append_arrow_batch(rust_batch) |
| .map_err(|e| FlussError::new_err(e.to_string()))?; |
| Ok(WriteResultHandle::new(result_future)) |
| } |
| |
| /// Append a single row to the table. |
| /// |
| /// Returns: |
| /// WriteResultHandle that can be ignored (fire-and-forget) or |
| /// awaited via `handle.wait()` for server acknowledgment. |
| pub fn append(&self, row: &Bound<'_, PyAny>) -> PyResult<WriteResultHandle> { |
| let generic_row = python_to_generic_row(row, &self.table_info)?; |
| |
| let result_future = self |
| .inner |
| .append(&generic_row) |
| .map_err(|e| FlussError::new_err(e.to_string()))?; |
| Ok(WriteResultHandle::new(result_future)) |
| } |
| |
| /// Write Pandas DataFrame data |
| pub fn write_pandas(&self, py: Python, df: Py<PyAny>) -> PyResult<()> { |
| // Get the expected Arrow schema from the Fluss table |
| let row_type = self.table_info.get_row_type(); |
| let expected_schema = fcore::record::to_arrow_schema(row_type) |
| .map_err(|e| FlussError::new_err(format!("Failed to get table schema: {e}")))?; |
| |
| // Convert Arrow schema to PyArrow schema |
| let py_schema = expected_schema |
| .as_ref() |
| .to_pyarrow(py) |
| .map_err(|e| FlussError::new_err(format!("Failed to convert schema: {e}")))?; |
| |
| // Import pyarrow module |
| let pyarrow = py.import("pyarrow")?; |
| |
| // Get the Table class from pyarrow module |
| let table_class = pyarrow.getattr("Table")?; |
| |
| // Call Table.from_pandas(df, schema=expected_schema) to ensure proper type casting |
| let pa_table = table_class.call_method( |
| "from_pandas", |
| (df,), |
| Some(&[("schema", py_schema)].into_py_dict(py)?), |
| )?; |
| |
| // Then call write_arrow with the converted table |
| self.write_arrow(py, pa_table.into()) |
| } |
| |
| /// Flush any pending data |
| pub fn flush<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> { |
| let inner = self.inner.clone(); |
| future_into_py(py, async move { |
| inner |
| .flush() |
| .await |
| .map_err(|e| FlussError::new_err(e.to_string())) |
| }) |
| } |
| |
| fn __repr__(&self) -> String { |
| "AppendWriter()".to_string() |
| } |
| } |
| |
| impl AppendWriter { |
| /// Create a AppendWriter from a core append writer |
| pub fn from_core( |
| append: fcore::client::AppendWriter, |
| table_info: fcore::metadata::TableInfo, |
| ) -> Self { |
| Self { |
| inner: Arc::new(append), |
| table_info, |
| } |
| } |
| } |
| |
| /// Represents different input shapes for a row |
| #[derive(FromPyObject)] |
| enum RowInput<'py> { |
| Dict(Bound<'py, pyo3::types::PyDict>), |
| Tuple(Bound<'py, pyo3::types::PyTuple>), |
| List(Bound<'py, pyo3::types::PyList>), |
| } |
| |
| /// Helper function to process sequence types (list/tuple) into datums |
| fn process_sequence_to_datums<'a, I>( |
| values: I, |
| len: usize, |
| fields: &[fcore::metadata::DataField], |
| ) -> PyResult<Vec<fcore::row::Datum<'static>>> |
| where |
| I: Iterator<Item = Bound<'a, PyAny>>, |
| { |
| if len != fields.len() { |
| return Err(FlussError::new_err(format!( |
| "Expected {} values, got {}", |
| fields.len(), |
| len |
| ))); |
| } |
| |
| let mut datums = Vec::with_capacity(fields.len()); |
| for (i, (field, value)) in fields.iter().zip(values).enumerate() { |
| datums.push( |
| python_value_to_datum(&value, field.data_type()).map_err(|e| { |
| FlussError::new_err(format!("Field '{}' (index {}): {}", field.name(), i, e)) |
| })?, |
| ); |
| } |
| Ok(datums) |
| } |
| |
| /// Convert Python row (dict/list/tuple) to GenericRow based on schema |
| pub fn python_to_generic_row( |
| row: &Bound<PyAny>, |
| table_info: &fcore::metadata::TableInfo, |
| ) -> PyResult<fcore::row::GenericRow<'static>> { |
| // Extract with user-friendly error message |
| let row_input: RowInput = row.extract().map_err(|_| { |
| let type_name = row |
| .get_type() |
| .name() |
| .map(|n| n.to_string()) |
| .unwrap_or_else(|_| "unknown".to_string()); |
| FlussError::new_err(format!( |
| "Row must be a dict, list, or tuple; got {type_name}" |
| )) |
| })?; |
| let schema = table_info.row_type(); |
| let fields = schema.fields(); |
| |
| let datums = match row_input { |
| RowInput::Dict(dict) => { |
| // Strict: reject unknown keys (and also reject non-str keys nicely) |
| for (k, _) in dict.iter() { |
| let key_str = k.extract::<&str>().map_err(|_| { |
| let key_type = k |
| .get_type() |
| .name() |
| .map(|n| n.to_string()) |
| .unwrap_or_else(|_| "unknown".to_string()); |
| FlussError::new_err(format!("Row dict keys must be strings; got {key_type}")) |
| })?; |
| |
| if fields.iter().all(|f| f.name() != key_str) { |
| let expected = fields |
| .iter() |
| .map(|f| f.name()) |
| .collect::<Vec<_>>() |
| .join(", "); |
| return Err(FlussError::new_err(format!( |
| "Unknown field '{key_str}'. Expected fields: {expected}" |
| ))); |
| } |
| } |
| |
| let mut datums = Vec::with_capacity(fields.len()); |
| for field in fields { |
| let value = dict.get_item(field.name())?.ok_or_else(|| { |
| FlussError::new_err(format!("Missing field: {}", field.name())) |
| })?; |
| datums.push( |
| python_value_to_datum(&value, field.data_type()).map_err(|e| { |
| FlussError::new_err(format!("Field '{}': {}", field.name(), e)) |
| })?, |
| ); |
| } |
| datums |
| } |
| |
| RowInput::List(list) => process_sequence_to_datums(list.iter(), list.len(), fields)?, |
| |
| RowInput::Tuple(tuple) => process_sequence_to_datums(tuple.iter(), tuple.len(), fields)?, |
| }; |
| |
| Ok(fcore::row::GenericRow { values: datums }) |
| } |
| |
| /// Convert Python primary key values (dict/list/tuple) to GenericRow. |
| /// Only requires PK columns; non-PK columns are filled with Null. |
| /// For dict: keys should be PK column names. |
| /// For list/tuple: values should be PK values in PK column order. |
| pub fn python_pk_to_generic_row( |
| row: &Bound<PyAny>, |
| table_info: &fcore::metadata::TableInfo, |
| ) -> PyResult<fcore::row::GenericRow<'static>> { |
| let schema = table_info.get_schema(); |
| let row_type = table_info.row_type(); |
| let fields = row_type.fields(); |
| let pk_indexes = schema.primary_key_indexes(); |
| let pk_names: Vec<&str> = schema.primary_key_column_names(); |
| |
| if pk_indexes.is_empty() { |
| return Err(FlussError::new_err( |
| "Table has no primary key; cannot use PK-only row", |
| )); |
| } |
| |
| // Initialize all datums as Null |
| let mut datums: Vec<fcore::row::Datum<'static>> = vec![fcore::row::Datum::Null; fields.len()]; |
| |
| // Extract with user-friendly error message |
| let row_input: RowInput = row.extract().map_err(|_| { |
| let type_name = row |
| .get_type() |
| .name() |
| .map(|n| n.to_string()) |
| .unwrap_or_else(|_| "unknown".to_string()); |
| FlussError::new_err(format!( |
| "PK row must be a dict, list, or tuple; got {type_name}" |
| )) |
| })?; |
| |
| match row_input { |
| RowInput::Dict(dict) => { |
| // Validate keys are PK columns |
| for (k, _) in dict.iter() { |
| let key_str = k.extract::<&str>().map_err(|_| { |
| let key_type = k |
| .get_type() |
| .name() |
| .map(|n| n.to_string()) |
| .unwrap_or_else(|_| "unknown".to_string()); |
| FlussError::new_err(format!("PK dict keys must be strings; got {key_type}")) |
| })?; |
| |
| if !pk_names.contains(&key_str) { |
| return Err(FlussError::new_err(format!( |
| "Unknown PK field '{}'. Expected PK fields: {}", |
| key_str, |
| pk_names.join(", ") |
| ))); |
| } |
| } |
| |
| // Extract PK values |
| for (i, pk_idx) in pk_indexes.iter().enumerate() { |
| let pk_name = pk_names[i]; |
| let field: &fcore::metadata::DataField = &fields[*pk_idx]; |
| let value = dict |
| .get_item(pk_name)? |
| .ok_or_else(|| FlussError::new_err(format!("Missing PK field: {pk_name}")))?; |
| datums[*pk_idx] = python_value_to_datum(&value, field.data_type()) |
| .map_err(|e| FlussError::new_err(format!("PK field '{pk_name}': {e}")))?; |
| } |
| } |
| |
| RowInput::List(list) => { |
| if list.len() != pk_indexes.len() { |
| return Err(FlussError::new_err(format!( |
| "PK list must have {} elements (PK columns), got {}", |
| pk_indexes.len(), |
| list.len() |
| ))); |
| } |
| for (i, pk_idx) in pk_indexes.iter().enumerate() { |
| let field: &fcore::metadata::DataField = &fields[*pk_idx]; |
| let value = list.get_item(i)?; |
| datums[*pk_idx] = |
| python_value_to_datum(&value, field.data_type()).map_err(|e| { |
| FlussError::new_err(format!("PK field '{}': {}", field.name(), e)) |
| })?; |
| } |
| } |
| |
| RowInput::Tuple(tuple) => { |
| if tuple.len() != pk_indexes.len() { |
| return Err(FlussError::new_err(format!( |
| "PK tuple must have {} elements (PK columns), got {}", |
| pk_indexes.len(), |
| tuple.len() |
| ))); |
| } |
| for (i, pk_idx) in pk_indexes.iter().enumerate() { |
| let field: &fcore::metadata::DataField = &fields[*pk_idx]; |
| let value = tuple.get_item(i)?; |
| datums[*pk_idx] = |
| python_value_to_datum(&value, field.data_type()).map_err(|e| { |
| FlussError::new_err(format!("PK field '{}': {}", field.name(), e)) |
| })?; |
| } |
| } |
| } |
| |
| Ok(fcore::row::GenericRow { values: datums }) |
| } |
| |
| /// Convert Python value to Datum based on data type |
| fn python_value_to_datum( |
| value: &Bound<PyAny>, |
| data_type: &fcore::metadata::DataType, |
| ) -> PyResult<fcore::row::Datum<'static>> { |
| use fcore::row::{Datum, F32, F64}; |
| |
| if value.is_none() { |
| return Ok(Datum::Null); |
| } |
| |
| match data_type { |
| fcore::metadata::DataType::Boolean(_) => { |
| let v: bool = value.extract()?; |
| Ok(Datum::Bool(v)) |
| } |
| fcore::metadata::DataType::TinyInt(_) => { |
| // Strict type checking: reject bool for int columns |
| if value.is_instance_of::<pyo3::types::PyBool>() { |
| return Err(FlussError::new_err( |
| "Expected int for TinyInt column, got bool. Use 0 or 1 explicitly.".to_string(), |
| )); |
| } |
| let v: i8 = value.extract()?; |
| Ok(Datum::Int8(v)) |
| } |
| fcore::metadata::DataType::SmallInt(_) => { |
| if value.is_instance_of::<pyo3::types::PyBool>() { |
| return Err(FlussError::new_err( |
| "Expected int for SmallInt column, got bool. Use 0 or 1 explicitly." |
| .to_string(), |
| )); |
| } |
| let v: i16 = value.extract()?; |
| Ok(Datum::Int16(v)) |
| } |
| fcore::metadata::DataType::Int(_) => { |
| if value.is_instance_of::<pyo3::types::PyBool>() { |
| return Err(FlussError::new_err( |
| "Expected int for Int column, got bool. Use 0 or 1 explicitly.".to_string(), |
| )); |
| } |
| let v: i32 = value.extract()?; |
| Ok(Datum::Int32(v)) |
| } |
| fcore::metadata::DataType::BigInt(_) => { |
| if value.is_instance_of::<pyo3::types::PyBool>() { |
| return Err(FlussError::new_err( |
| "Expected int for BigInt column, got bool. Use 0 or 1 explicitly.".to_string(), |
| )); |
| } |
| let v: i64 = value.extract()?; |
| Ok(Datum::Int64(v)) |
| } |
| fcore::metadata::DataType::Float(_) => { |
| let v: f32 = value.extract()?; |
| Ok(Datum::Float32(F32::from(v))) |
| } |
| fcore::metadata::DataType::Double(_) => { |
| let v: f64 = value.extract()?; |
| Ok(Datum::Float64(F64::from(v))) |
| } |
| fcore::metadata::DataType::String(_) | fcore::metadata::DataType::Char(_) => { |
| let v: String = value.extract()?; |
| Ok(v.into()) |
| } |
| fcore::metadata::DataType::Bytes(_) | fcore::metadata::DataType::Binary(_) => { |
| // Efficient extraction: downcast to specific type and use bulk copy. |
| // PyBytes::as_bytes() and PyByteArray::to_vec() are O(n) bulk copies of the underlying data. |
| if let Ok(bytes) = value.downcast::<pyo3::types::PyBytes>() { |
| Ok(bytes.as_bytes().to_vec().into()) |
| } else if let Ok(bytearray) = value.downcast::<pyo3::types::PyByteArray>() { |
| Ok(bytearray.to_vec().into()) |
| } else { |
| Err(FlussError::new_err(format!( |
| "Expected bytes or bytearray, got {}", |
| value.get_type().name()? |
| ))) |
| } |
| } |
| fcore::metadata::DataType::Decimal(decimal_type) => { |
| python_decimal_to_datum(value, decimal_type.precision(), decimal_type.scale()) |
| } |
| fcore::metadata::DataType::Date(_) => python_date_to_datum(value), |
| fcore::metadata::DataType::Time(_) => python_time_to_datum(value), |
| fcore::metadata::DataType::Timestamp(_) => python_datetime_to_timestamp_ntz(value), |
| fcore::metadata::DataType::TimestampLTz(_) => python_datetime_to_timestamp_ltz(value), |
| _ => Err(FlussError::new_err(format!( |
| "Unsupported data type for row-level operations: {data_type}" |
| ))), |
| } |
| } |
| |
| /// Convert Rust Datum to Python value based on data type. |
| /// This is the reverse of python_value_to_datum. |
| pub fn datum_to_python_value( |
| py: Python, |
| row: &dyn fcore::row::InternalRow, |
| pos: usize, |
| data_type: &fcore::metadata::DataType, |
| ) -> PyResult<Py<PyAny>> { |
| use fcore::metadata::DataType; |
| |
| // Check for null first |
| if row.is_null_at(pos) { |
| return Ok(py.None()); |
| } |
| |
| match data_type { |
| DataType::Boolean(_) => Ok(row |
| .get_boolean(pos) |
| .into_pyobject(py)? |
| .to_owned() |
| .into_any() |
| .unbind()), |
| DataType::TinyInt(_) => Ok(row |
| .get_byte(pos) |
| .into_pyobject(py)? |
| .to_owned() |
| .into_any() |
| .unbind()), |
| DataType::SmallInt(_) => Ok(row |
| .get_short(pos) |
| .into_pyobject(py)? |
| .to_owned() |
| .into_any() |
| .unbind()), |
| DataType::Int(_) => Ok(row |
| .get_int(pos) |
| .into_pyobject(py)? |
| .to_owned() |
| .into_any() |
| .unbind()), |
| DataType::BigInt(_) => Ok(row |
| .get_long(pos) |
| .into_pyobject(py)? |
| .to_owned() |
| .into_any() |
| .unbind()), |
| DataType::Float(_) => Ok(row |
| .get_float(pos) |
| .into_pyobject(py)? |
| .to_owned() |
| .into_any() |
| .unbind()), |
| DataType::Double(_) => Ok(row |
| .get_double(pos) |
| .into_pyobject(py)? |
| .to_owned() |
| .into_any() |
| .unbind()), |
| DataType::String(_) => { |
| let s = row.get_string(pos); |
| Ok(s.into_pyobject(py)?.into_any().unbind()) |
| } |
| DataType::Char(char_type) => { |
| let s = row.get_char(pos, char_type.length() as usize); |
| Ok(s.into_pyobject(py)?.into_any().unbind()) |
| } |
| DataType::Bytes(_) => { |
| let b = row.get_bytes(pos); |
| Ok(pyo3::types::PyBytes::new(py, b).into_any().unbind()) |
| } |
| DataType::Binary(binary_type) => { |
| let b = row.get_binary(pos, binary_type.length()); |
| Ok(pyo3::types::PyBytes::new(py, b).into_any().unbind()) |
| } |
| DataType::Decimal(decimal_type) => { |
| let decimal = row.get_decimal( |
| pos, |
| decimal_type.precision() as usize, |
| decimal_type.scale() as usize, |
| ); |
| rust_decimal_to_python(py, &decimal) |
| } |
| DataType::Date(_) => { |
| let date = row.get_date(pos); |
| rust_date_to_python(py, date) |
| } |
| DataType::Time(_) => { |
| let time = row.get_time(pos); |
| rust_time_to_python(py, time) |
| } |
| DataType::Timestamp(ts_type) => { |
| let ts = row.get_timestamp_ntz(pos, ts_type.precision()); |
| rust_timestamp_ntz_to_python(py, ts) |
| } |
| DataType::TimestampLTz(ts_type) => { |
| let ts = row.get_timestamp_ltz(pos, ts_type.precision()); |
| rust_timestamp_ltz_to_python(py, ts) |
| } |
| _ => Err(FlussError::new_err(format!( |
| "Unsupported data type for conversion to Python: {data_type}" |
| ))), |
| } |
| } |
| |
| /// Convert Rust Decimal to Python decimal.Decimal |
| fn rust_decimal_to_python(py: Python, decimal: &fcore::row::Decimal) -> PyResult<Py<PyAny>> { |
| let decimal_ty = get_decimal_type(py)?; |
| let decimal_str = decimal.to_string(); |
| let py_decimal = decimal_ty.call1((decimal_str,))?; |
| Ok(py_decimal.into_any().unbind()) |
| } |
| |
| /// Convert Rust Date (days since epoch) to Python datetime.date |
| fn rust_date_to_python(py: Python, date: fcore::row::Date) -> PyResult<Py<PyAny>> { |
| use pyo3::types::PyDate; |
| |
| let days_since_epoch = date.get_inner(); |
| let epoch = jiff::civil::date(1970, 1, 1); |
| let civil_date = epoch + jiff::Span::new().days(days_since_epoch as i64); |
| |
| let py_date = PyDate::new( |
| py, |
| civil_date.year() as i32, |
| civil_date.month() as u8, |
| civil_date.day() as u8, |
| )?; |
| Ok(py_date.into_any().unbind()) |
| } |
| |
| /// Convert Rust Time (millis since midnight) to Python datetime.time |
| fn rust_time_to_python(py: Python, time: fcore::row::Time) -> PyResult<Py<PyAny>> { |
| use pyo3::types::PyTime; |
| |
| let millis = time.get_inner() as i64; |
| let hours = millis / MILLIS_PER_HOUR; |
| let minutes = (millis % MILLIS_PER_HOUR) / MILLIS_PER_MINUTE; |
| let seconds = (millis % MILLIS_PER_MINUTE) / MILLIS_PER_SECOND; |
| let microseconds = (millis % MILLIS_PER_SECOND) * MICROS_PER_MILLI; |
| |
| let py_time = PyTime::new( |
| py, |
| hours as u8, |
| minutes as u8, |
| seconds as u8, |
| microseconds as u32, |
| None, |
| )?; |
| Ok(py_time.into_any().unbind()) |
| } |
| |
| /// Convert Rust TimestampNtz to Python naive datetime |
| fn rust_timestamp_ntz_to_python(py: Python, ts: fcore::row::TimestampNtz) -> PyResult<Py<PyAny>> { |
| use pyo3::types::PyDateTime; |
| |
| let millis = ts.get_millisecond(); |
| let nanos = ts.get_nano_of_millisecond(); |
| let total_micros = millis * MICROS_PER_MILLI + (nanos as i64 / NANOS_PER_MICRO); |
| |
| // Convert to civil datetime via jiff |
| let timestamp = jiff::Timestamp::from_microsecond(total_micros) |
| .map_err(|e| FlussError::new_err(format!("Invalid timestamp: {e}")))?; |
| let civil_dt = timestamp.to_zoned(jiff::tz::TimeZone::UTC).datetime(); |
| |
| let py_dt = PyDateTime::new( |
| py, |
| civil_dt.year() as i32, |
| civil_dt.month() as u8, |
| civil_dt.day() as u8, |
| civil_dt.hour() as u8, |
| civil_dt.minute() as u8, |
| civil_dt.second() as u8, |
| (civil_dt.subsec_nanosecond() / 1000) as u32, // microseconds |
| None, |
| )?; |
| Ok(py_dt.into_any().unbind()) |
| } |
| |
| /// Convert Rust TimestampLtz to Python timezone-aware datetime (UTC) |
| fn rust_timestamp_ltz_to_python(py: Python, ts: fcore::row::TimestampLtz) -> PyResult<Py<PyAny>> { |
| use pyo3::types::PyDateTime; |
| |
| let millis = ts.get_epoch_millisecond(); |
| let nanos = ts.get_nano_of_millisecond(); |
| let total_micros = millis * MICROS_PER_MILLI + (nanos as i64 / NANOS_PER_MICRO); |
| |
| // Convert to civil datetime via jiff |
| let timestamp = jiff::Timestamp::from_microsecond(total_micros) |
| .map_err(|e| FlussError::new_err(format!("Invalid timestamp: {e}")))?; |
| let civil_dt = timestamp.to_zoned(jiff::tz::TimeZone::UTC).datetime(); |
| |
| let utc = get_utc_timezone(py)?; |
| let py_dt = PyDateTime::new( |
| py, |
| civil_dt.year() as i32, |
| civil_dt.month() as u8, |
| civil_dt.day() as u8, |
| civil_dt.hour() as u8, |
| civil_dt.minute() as u8, |
| civil_dt.second() as u8, |
| (civil_dt.subsec_nanosecond() / 1000) as u32, // microseconds |
| Some(&utc), |
| )?; |
| Ok(py_dt.into_any().unbind()) |
| } |
| |
| /// Convert an InternalRow to a Python dictionary |
| pub fn internal_row_to_dict( |
| py: Python, |
| row: &dyn fcore::row::InternalRow, |
| table_info: &fcore::metadata::TableInfo, |
| ) -> PyResult<Py<PyAny>> { |
| let row_type = table_info.row_type(); |
| let fields = row_type.fields(); |
| let dict = pyo3::types::PyDict::new(py); |
| |
| for (pos, field) in fields.iter().enumerate() { |
| let value = datum_to_python_value(py, row, pos, field.data_type())?; |
| dict.set_item(field.name(), value)?; |
| } |
| |
| Ok(dict.into_any().unbind()) |
| } |
| |
| /// Cached decimal.Decimal type |
| /// Uses PyOnceLock for thread-safety and subinterpreter compatibility. |
| static DECIMAL_TYPE: pyo3::sync::PyOnceLock<Py<pyo3::types::PyType>> = |
| pyo3::sync::PyOnceLock::new(); |
| |
| /// Cached UTC timezone |
| static UTC_TIMEZONE: pyo3::sync::PyOnceLock<Py<PyAny>> = pyo3::sync::PyOnceLock::new(); |
| |
| /// Cached UTC epoch type |
| static UTC_EPOCH: pyo3::sync::PyOnceLock<Py<PyAny>> = pyo3::sync::PyOnceLock::new(); |
| |
| /// Get the cached decimal.Decimal type, importing it once per interpreter. |
| fn get_decimal_type(py: Python) -> PyResult<Bound<pyo3::types::PyType>> { |
| let ty = DECIMAL_TYPE.get_or_try_init(py, || -> PyResult<_> { |
| let decimal_mod = py.import("decimal")?; |
| let decimal_ty = decimal_mod |
| .getattr("Decimal")? |
| .downcast_into::<pyo3::types::PyType>()?; |
| Ok(decimal_ty.unbind()) |
| })?; |
| Ok(ty.bind(py).clone()) |
| } |
| |
| /// Get the cached UTC timezone (datetime.timezone.utc), creating it once per interpreter. |
| fn get_utc_timezone(py: Python) -> PyResult<Bound<pyo3::types::PyTzInfo>> { |
| let tz = UTC_TIMEZONE.get_or_try_init(py, || -> PyResult<_> { |
| let datetime_mod = py.import("datetime")?; |
| let timezone = datetime_mod.getattr("timezone")?; |
| let utc = timezone.getattr("utc")?; |
| Ok(utc.unbind()) |
| })?; |
| // Downcast to PyTzInfo for use with PyDateTime::new() |
| Ok(tz |
| .bind(py) |
| .clone() |
| .downcast_into::<pyo3::types::PyTzInfo>()?) |
| } |
| |
| /// Get the cached UTC epoch datetime, creating it once per interpreter. |
| fn get_utc_epoch(py: Python) -> PyResult<Bound<PyAny>> { |
| let epoch = UTC_EPOCH.get_or_try_init(py, || -> PyResult<_> { |
| let datetime_mod = py.import("datetime")?; |
| let timezone = datetime_mod.getattr("timezone")?; |
| let utc = timezone.getattr("utc")?; |
| let epoch = datetime_mod |
| .getattr("datetime")? |
| .call1((1970, 1, 1, 0, 0, 0, 0, &utc))?; |
| Ok(epoch.unbind()) |
| })?; |
| Ok(epoch.bind(py).clone()) |
| } |
| |
| /// Validate that value is a decimal.Decimal instance. |
| fn ensure_is_decimal(value: &Bound<PyAny>) -> PyResult<()> { |
| let decimal_ty = get_decimal_type(value.py())?; |
| if !value.is_instance(&decimal_ty.into_any())? { |
| return Err(FlussError::new_err(format!( |
| "Expected decimal.Decimal, got {}", |
| get_type_name(value) |
| ))); |
| } |
| Ok(()) |
| } |
| |
| /// Convert Python decimal.Decimal to Datum::Decimal. |
| /// Only accepts decimal.Decimal |
| fn python_decimal_to_datum( |
| value: &Bound<PyAny>, |
| precision: u32, |
| scale: u32, |
| ) -> PyResult<fcore::row::Datum<'static>> { |
| use std::str::FromStr; |
| |
| ensure_is_decimal(value)?; |
| |
| let decimal_str: String = value.str()?.extract()?; |
| let bd = bigdecimal::BigDecimal::from_str(&decimal_str).map_err(|e| { |
| FlussError::new_err(format!("Failed to parse decimal '{decimal_str}': {e}")) |
| })?; |
| |
| let decimal = fcore::row::Decimal::from_big_decimal(bd, precision, scale).map_err(|e| { |
| FlussError::new_err(format!( |
| "Failed to convert decimal '{decimal_str}' to DECIMAL({precision}, {scale}): {e}" |
| )) |
| })?; |
| |
| Ok(fcore::row::Datum::Decimal(decimal)) |
| } |
| |
| /// Convert Python datetime.date to Datum::Date. |
| fn python_date_to_datum(value: &Bound<PyAny>) -> PyResult<fcore::row::Datum<'static>> { |
| use pyo3::types::{PyDate, PyDateAccess, PyDateTime}; |
| |
| // Reject datetime.datetime (subclass of date) - use timestamp columns for those |
| if value.downcast::<PyDateTime>().is_ok() { |
| return Err(FlussError::new_err( |
| "Expected datetime.date, got datetime.datetime. Use a TIMESTAMP column for datetime values.", |
| )); |
| } |
| |
| let date = value.downcast::<PyDate>().map_err(|_| { |
| FlussError::new_err(format!( |
| "Expected datetime.date, got {}", |
| get_type_name(value) |
| )) |
| })?; |
| |
| let year = date.get_year(); |
| let month = date.get_month(); |
| let day = date.get_day(); |
| |
| // Calculate days since Unix epoch (1970-01-01) |
| let civil_date = jiff::civil::date(year as i16, month as i8, day as i8); |
| let epoch = jiff::civil::date(1970, 1, 1); |
| let days_since_epoch = (civil_date - epoch).get_days(); |
| |
| Ok(fcore::row::Datum::Date(fcore::row::Date::new( |
| days_since_epoch, |
| ))) |
| } |
| |
| /// Convert Python datetime.time to Datum::Time. |
| /// Uses PyO3's native PyTime type for efficient access. |
| /// |
| /// Note: Fluss TIME is always stored as milliseconds since midnight (i32) regardless |
| /// of the schema's precision setting. This matches the Java Fluss wire protocol. |
| /// Sub-millisecond precision (microseconds not divisible by 1000) will raise an error |
| /// to prevent silent data loss and ensure fail-fast behavior. |
| fn python_time_to_datum(value: &Bound<PyAny>) -> PyResult<fcore::row::Datum<'static>> { |
| use pyo3::types::{PyTime, PyTimeAccess}; |
| |
| let time = value.downcast::<PyTime>().map_err(|_| { |
| FlussError::new_err(format!( |
| "Expected datetime.time, got {}", |
| get_type_name(value) |
| )) |
| })?; |
| |
| let hour = time.get_hour() as i32; |
| let minute = time.get_minute() as i32; |
| let second = time.get_second() as i32; |
| let microsecond = time.get_microsecond() as i32; |
| |
| // Strict validation: reject sub-millisecond precision |
| if microsecond % MICROS_PER_MILLI as i32 != 0 { |
| return Err(FlussError::new_err(format!( |
| "TIME values with sub-millisecond precision are not supported. \ |
| Got time with {microsecond} microseconds (not divisible by 1000). \ |
| Fluss stores TIME as milliseconds since midnight. \ |
| Please round to milliseconds before insertion." |
| ))); |
| } |
| |
| // Convert to milliseconds since midnight |
| let millis = hour * MILLIS_PER_HOUR as i32 |
| + minute * MILLIS_PER_MINUTE as i32 |
| + second * MILLIS_PER_SECOND as i32 |
| + microsecond / MICROS_PER_MILLI as i32; |
| |
| Ok(fcore::row::Datum::Time(fcore::row::Time::new(millis))) |
| } |
| |
| /// Convert Python datetime-like object to Datum::TimestampNtz. |
| /// Supports: datetime.datetime (naive preferred), pd.Timestamp, np.datetime64 |
| fn python_datetime_to_timestamp_ntz(value: &Bound<PyAny>) -> PyResult<fcore::row::Datum<'static>> { |
| let (epoch_millis, nano_of_milli) = extract_datetime_components_ntz(value)?; |
| |
| let ts = fcore::row::TimestampNtz::from_millis_nanos(epoch_millis, nano_of_milli) |
| .map_err(|e| FlussError::new_err(format!("Failed to create TimestampNtz: {e}")))?; |
| |
| Ok(fcore::row::Datum::TimestampNtz(ts)) |
| } |
| |
| /// Convert Python datetime-like object to Datum::TimestampLtz. |
| /// For naive datetimes, assumes UTC. For aware datetimes, converts to UTC. |
| /// Supports: datetime.datetime, pd.Timestamp, np.datetime64 |
| fn python_datetime_to_timestamp_ltz(value: &Bound<PyAny>) -> PyResult<fcore::row::Datum<'static>> { |
| let (epoch_millis, nano_of_milli) = extract_datetime_components_ltz(value)?; |
| |
| let ts = fcore::row::TimestampLtz::from_millis_nanos(epoch_millis, nano_of_milli) |
| .map_err(|e| FlussError::new_err(format!("Failed to create TimestampLtz: {e}")))?; |
| |
| Ok(fcore::row::Datum::TimestampLtz(ts)) |
| } |
| |
| /// Extract epoch milliseconds for TimestampNtz (wall-clock time, no timezone conversion). |
| /// Uses integer arithmetic to avoid float precision issues. |
| /// For clarity, tz-aware datetimes are rejected - use TimestampLtz for those. |
| fn extract_datetime_components_ntz(value: &Bound<PyAny>) -> PyResult<(i64, i32)> { |
| use pyo3::types::PyDateTime; |
| |
| // Try PyDateTime first |
| if let Ok(dt) = value.downcast::<PyDateTime>() { |
| // Reject tz-aware datetime for NTZ - it's ambiguous what the user wants |
| let tzinfo = dt.getattr("tzinfo")?; |
| if !tzinfo.is_none() { |
| return Err(FlussError::new_err( |
| "TIMESTAMP (without timezone) requires a naive datetime. \ |
| Got timezone-aware datetime. Either remove tzinfo or use TIMESTAMP_LTZ column.", |
| )); |
| } |
| return datetime_to_epoch_millis_as_utc(dt); |
| } |
| |
| // Check for pandas Timestamp by verifying module name |
| if is_pandas_timestamp(value) { |
| // For NTZ, reject tz-aware pandas Timestamps for consistency with datetime behavior |
| if let Ok(tz) = value.getattr("tz") { |
| if !tz.is_none() { |
| return Err(FlussError::new_err( |
| "TIMESTAMP (without timezone) requires a naive pd.Timestamp. \ |
| Got timezone-aware Timestamp. Either use tz_localize(None) or use TIMESTAMP_LTZ column.", |
| )); |
| } |
| } |
| // Naive pandas Timestamp: .value is nanoseconds since epoch (wall-clock as UTC) |
| let nanos: i64 = value.getattr("value")?.extract()?; |
| return Ok(nanos_to_millis_and_submillis(nanos)); |
| } |
| |
| // Try to_pydatetime() for objects that support it |
| if let Ok(py_dt) = value.call_method0("to_pydatetime") { |
| if let Ok(dt) = py_dt.downcast::<PyDateTime>() { |
| let tzinfo = dt.getattr("tzinfo")?; |
| if !tzinfo.is_none() { |
| return Err(FlussError::new_err( |
| "TIMESTAMP (without timezone) requires a naive datetime. \ |
| Got timezone-aware value. Use TIMESTAMP_LTZ column instead.", |
| )); |
| } |
| return datetime_to_epoch_millis_as_utc(dt); |
| } |
| } |
| |
| Err(FlussError::new_err(format!( |
| "Expected naive datetime.datetime or pd.Timestamp, got {}", |
| get_type_name(value) |
| ))) |
| } |
| |
| /// Extract epoch milliseconds for TimestampLtz (instant in time, UTC-based). |
| /// For naive datetimes, assumes UTC. For aware datetimes, converts to UTC. |
| fn extract_datetime_components_ltz(value: &Bound<PyAny>) -> PyResult<(i64, i32)> { |
| use pyo3::types::PyDateTime; |
| |
| // Try PyDateTime first |
| if let Ok(dt) = value.downcast::<PyDateTime>() { |
| // Check if timezone-aware |
| let tzinfo = dt.getattr("tzinfo")?; |
| if tzinfo.is_none() { |
| // Naive datetime: assume UTC (treat components as UTC time) |
| return datetime_to_epoch_millis_as_utc(dt); |
| } else { |
| // Aware datetime: use timedelta from epoch to get correct UTC instant |
| return datetime_to_epoch_millis_utc_aware(dt); |
| } |
| } |
| |
| // Check for pandas Timestamp |
| if is_pandas_timestamp(value) { |
| // pandas Timestamp.value is always nanoseconds since UTC epoch |
| let nanos: i64 = value.getattr("value")?.extract()?; |
| return Ok(nanos_to_millis_and_submillis(nanos)); |
| } |
| |
| // Try to_pydatetime() |
| if let Ok(py_dt) = value.call_method0("to_pydatetime") { |
| if let Ok(dt) = py_dt.downcast::<PyDateTime>() { |
| let tzinfo = dt.getattr("tzinfo")?; |
| if tzinfo.is_none() { |
| return datetime_to_epoch_millis_as_utc(dt); |
| } else { |
| return datetime_to_epoch_millis_utc_aware(dt); |
| } |
| } |
| } |
| |
| Err(FlussError::new_err(format!( |
| "Expected datetime.datetime or pd.Timestamp, got {}", |
| get_type_name(value) |
| ))) |
| } |
| |
| /// Convert datetime components to epoch milliseconds treating them as UTC |
| fn datetime_to_epoch_millis_as_utc( |
| dt: &pyo3::Bound<'_, pyo3::types::PyDateTime>, |
| ) -> PyResult<(i64, i32)> { |
| use pyo3::types::{PyDateAccess, PyTimeAccess}; |
| |
| let year = dt.get_year(); |
| let month = dt.get_month(); |
| let day = dt.get_day(); |
| let hour = dt.get_hour(); |
| let minute = dt.get_minute(); |
| let second = dt.get_second(); |
| let microsecond = dt.get_microsecond(); |
| |
| // Create jiff civil datetime and convert to UTC timestamp |
| // Safe casts: hour (0-23), minute (0-59), second (0-59) all fit in i8 |
| let civil_dt = jiff::civil::date(year as i16, month as i8, day as i8).at( |
| hour as i8, |
| minute as i8, |
| second as i8, |
| microsecond as i32 * 1000, |
| ); |
| |
| let timestamp = jiff::tz::Offset::UTC |
| .to_timestamp(civil_dt) |
| .map_err(|e| FlussError::new_err(format!("Invalid datetime: {e}")))?; |
| |
| let millis = timestamp.as_millisecond(); |
| let nano_of_milli = (timestamp.subsec_nanosecond() % NANOS_PER_MILLI as i32) as i32; |
| |
| Ok((millis, nano_of_milli)) |
| } |
| |
| /// Convert timezone-aware datetime to epoch milliseconds using Python's timedelta. |
| /// This correctly handles timezone conversions by computing (dt - UTC_EPOCH). |
| /// The UTC epoch is cached for performance. |
| fn datetime_to_epoch_millis_utc_aware( |
| dt: &pyo3::Bound<'_, pyo3::types::PyDateTime>, |
| ) -> PyResult<(i64, i32)> { |
| use pyo3::types::{PyDelta, PyDeltaAccess}; |
| |
| let py = dt.py(); |
| let epoch = get_utc_epoch(py)?; |
| |
| // Compute delta = dt - epoch (this handles timezone conversion correctly) |
| let delta = dt.call_method1("__sub__", (epoch,))?; |
| let delta = delta.downcast::<PyDelta>()?; |
| |
| // Extract components using integer arithmetic |
| let days = delta.get_days() as i64; |
| let seconds = delta.get_seconds() as i64; |
| let microseconds = delta.get_microseconds() as i64; |
| |
| // Total milliseconds (note: days can be negative for dates before epoch) |
| let total_micros = days * MICROS_PER_DAY + seconds * MICROS_PER_SECOND + microseconds; |
| let millis = total_micros / MICROS_PER_MILLI; |
| let nano_of_milli = ((total_micros % MICROS_PER_MILLI) * MICROS_PER_MILLI) as i32; |
| |
| // Handle negative microseconds remainder |
| let (millis, nano_of_milli) = if nano_of_milli < 0 { |
| (millis - 1, nano_of_milli + NANOS_PER_MILLI as i32) |
| } else { |
| (millis, nano_of_milli) |
| }; |
| |
| Ok((millis, nano_of_milli)) |
| } |
| |
| /// Convert nanoseconds to (milliseconds, nano_of_millisecond) |
| fn nanos_to_millis_and_submillis(nanos: i64) -> (i64, i32) { |
| let millis = nanos / NANOS_PER_MILLI; |
| let nano_of_milli = (nanos % NANOS_PER_MILLI) as i32; |
| |
| // Handle negative nanoseconds correctly (Euclidean remainder) |
| if nano_of_milli < 0 { |
| (millis - 1, nano_of_milli + NANOS_PER_MILLI as i32) |
| } else { |
| (millis, nano_of_milli) |
| } |
| } |
| |
| /// Check if value is a pandas Timestamp by examining its type. |
| fn is_pandas_timestamp(value: &Bound<PyAny>) -> bool { |
| // Check module and class name to avoid importing pandas |
| if let Ok(cls) = value.get_type().getattr("__module__") { |
| if let Ok(module) = cls.extract::<&str>() { |
| if module.starts_with("pandas") { |
| if let Ok(name) = value.get_type().getattr("__name__") { |
| if let Ok(name_str) = name.extract::<&str>() { |
| return name_str == "Timestamp"; |
| } |
| } |
| } |
| } |
| } |
| false |
| } |
| |
| /// Get type name |
| fn get_type_name(value: &Bound<PyAny>) -> String { |
| value |
| .get_type() |
| .name() |
| .map(|s| s.to_string()) |
| .unwrap_or_else(|_| "unknown".to_string()) |
| } |
| |
| /// Wraps the two scanner variants so we never have an impossible state |
| /// (both None or both Some). |
| enum ScannerKind { |
| Record(fcore::client::LogScanner), |
| Batch(fcore::client::RecordBatchLogScanner), |
| } |
| |
| impl ScannerKind { |
| fn as_record(&self) -> PyResult<&fcore::client::LogScanner> { |
| match self { |
| Self::Record(s) => Ok(s), |
| Self::Batch(_) => Err(FlussError::new_err( |
| "poll() requires a record-based scanner. Use new_scan().create_log_scanner().", |
| )), |
| } |
| } |
| |
| fn as_batch(&self) -> PyResult<&fcore::client::RecordBatchLogScanner> { |
| match self { |
| Self::Batch(s) => Ok(s), |
| Self::Record(_) => Err(FlussError::new_err( |
| "This method requires a batch-based scanner. Use new_scan().create_batch_scanner().", |
| )), |
| } |
| } |
| } |
| |
| /// Dispatch a method call to whichever scanner variant is active. |
| /// Both `LogScanner` and `RecordBatchLogScanner` share the same subscribe interface. |
| macro_rules! with_scanner { |
| ($scanner:expr, $method:ident($($arg:expr),*)) => { |
| match $scanner { |
| ScannerKind::Record(s) => s.$method($($arg),*).await, |
| ScannerKind::Batch(s) => s.$method($($arg),*).await, |
| } |
| }; |
| } |
| |
| /// Scanner for reading log data from a Fluss table. |
| /// |
| /// This scanner supports two modes: |
| /// - Record-based scanning via `poll()` - returns individual records with metadata |
| /// - Batch-based scanning via `poll_arrow()` / `poll_batches()` - returns Arrow batches |
| #[pyclass] |
| pub struct LogScanner { |
| scanner: ScannerKind, |
| admin: fcore::client::FlussAdmin, |
| table_info: fcore::metadata::TableInfo, |
| /// The projected Arrow schema to use for empty table creation |
| projected_schema: SchemaRef, |
| /// The projected row type to use for record-based scanning |
| projected_row_type: fcore::metadata::RowType, |
| /// Cache for partition_id -> partition_name mapping (avoids repeated list_partition_infos calls) |
| partition_name_cache: std::sync::RwLock<Option<HashMap<i64, String>>>, |
| } |
| |
| #[pymethods] |
| impl LogScanner { |
| /// Subscribe to a single bucket at a specific offset (non-partitioned tables). |
| /// |
| /// Args: |
| /// bucket_id: The bucket ID to subscribe to |
| /// start_offset: The offset to start reading from (use EARLIEST_OFFSET for beginning) |
| fn subscribe(&self, py: Python, bucket_id: i32, start_offset: i64) -> PyResult<()> { |
| py.detach(|| { |
| TOKIO_RUNTIME.block_on(async { |
| with_scanner!(&self.scanner, subscribe(bucket_id, start_offset)) |
| .map_err(|e| FlussError::new_err(e.to_string())) |
| }) |
| }) |
| } |
| |
| /// Subscribe to multiple buckets at specified offsets (non-partitioned tables). |
| /// |
| /// Args: |
| /// bucket_offsets: A dict mapping bucket_id -> start_offset |
| fn subscribe_buckets(&self, py: Python, bucket_offsets: HashMap<i32, i64>) -> PyResult<()> { |
| py.detach(|| { |
| TOKIO_RUNTIME.block_on(async { |
| with_scanner!(&self.scanner, subscribe_buckets(&bucket_offsets)) |
| .map_err(|e| FlussError::new_err(e.to_string())) |
| }) |
| }) |
| } |
| |
| /// Subscribe to a bucket within a specific partition (partitioned tables only). |
| /// |
| /// Args: |
| /// partition_id: The partition ID (from PartitionInfo.partition_id) |
| /// bucket_id: The bucket ID within the partition |
| /// start_offset: The offset to start reading from (use EARLIEST_OFFSET for beginning) |
| fn subscribe_partition( |
| &self, |
| py: Python, |
| partition_id: i64, |
| bucket_id: i32, |
| start_offset: i64, |
| ) -> PyResult<()> { |
| py.detach(|| { |
| TOKIO_RUNTIME.block_on(async { |
| with_scanner!( |
| &self.scanner, |
| subscribe_partition(partition_id, bucket_id, start_offset) |
| ) |
| .map_err(|e| FlussError::new_err(e.to_string())) |
| }) |
| }) |
| } |
| |
| /// Subscribe to multiple partition+bucket combinations at once (partitioned tables only). |
| /// |
| /// Args: |
| /// partition_bucket_offsets: A dict mapping (partition_id, bucket_id) tuples to start_offsets |
| fn subscribe_partition_buckets( |
| &self, |
| py: Python, |
| partition_bucket_offsets: HashMap<(i64, i32), i64>, |
| ) -> PyResult<()> { |
| py.detach(|| { |
| TOKIO_RUNTIME.block_on(async { |
| with_scanner!( |
| &self.scanner, |
| subscribe_partition_buckets(&partition_bucket_offsets) |
| ) |
| .map_err(|e| FlussError::new_err(e.to_string())) |
| }) |
| }) |
| } |
| |
| /// Unsubscribe from a specific partition bucket (partitioned tables only). |
| /// |
| /// Args: |
| /// partition_id: The partition ID to unsubscribe from |
| /// bucket_id: The bucket ID within the partition |
| fn unsubscribe_partition(&self, py: Python, partition_id: i64, bucket_id: i32) -> PyResult<()> { |
| py.detach(|| { |
| TOKIO_RUNTIME.block_on(async { |
| with_scanner!( |
| &self.scanner, |
| unsubscribe_partition(partition_id, bucket_id) |
| ) |
| .map_err(|e| FlussError::new_err(e.to_string())) |
| }) |
| }) |
| } |
| |
| /// Poll for individual records with metadata. |
| /// |
| /// Args: |
| /// timeout_ms: Timeout in milliseconds to wait for records |
| /// |
| /// Returns: |
| /// List of ScanRecord objects, each containing bucket, offset, timestamp, |
| /// change_type, and row data as a dictionary. |
| /// |
| /// Note: |
| /// - Requires a record-based scanner (created with new_scan().create_log_scanner()) |
| /// - Returns an empty list if no records are available |
| /// - When timeout expires, returns an empty list (NOT an error) |
| fn poll(&self, py: Python, timeout_ms: i64) -> PyResult<Vec<ScanRecord>> { |
| let scanner = self.scanner.as_record()?; |
| |
| if timeout_ms < 0 { |
| return Err(FlussError::new_err(format!( |
| "timeout_ms must be non-negative, got: {timeout_ms}" |
| ))); |
| } |
| |
| let timeout = Duration::from_millis(timeout_ms as u64); |
| let scan_records = py |
| .detach(|| TOKIO_RUNTIME.block_on(async { scanner.poll(timeout).await })) |
| .map_err(|e| FlussError::new_err(e.to_string()))?; |
| |
| // Convert ScanRecords to Python ScanRecord list |
| // Use projected_row_type to handle column projection correctly |
| let row_type = &self.projected_row_type; |
| let mut result = Vec::new(); |
| |
| for (bucket, records) in scan_records.into_records_by_buckets() { |
| for record in records { |
| let scan_record = ScanRecord::from_core(py, &bucket, &record, row_type)?; |
| result.push(scan_record); |
| } |
| } |
| |
| Ok(result) |
| } |
| |
| /// Poll for batches with metadata. |
| /// |
| /// Args: |
| /// timeout_ms: Timeout in milliseconds to wait for batches |
| /// |
| /// Returns: |
| /// List of RecordBatch objects, each containing the Arrow batch along with |
| /// bucket, base_offset, and last_offset metadata. |
| /// |
| /// Note: |
| /// - Requires a batch-based scanner (created with new_scan().create_batch_scanner()) |
| /// - Returns an empty list if no batches are available |
| /// - When timeout expires, returns an empty list (NOT an error) |
| fn poll_batches(&self, py: Python, timeout_ms: i64) -> PyResult<Vec<RecordBatch>> { |
| let scanner = self.scanner.as_batch()?; |
| |
| if timeout_ms < 0 { |
| return Err(FlussError::new_err(format!( |
| "timeout_ms must be non-negative, got: {timeout_ms}" |
| ))); |
| } |
| |
| let timeout = Duration::from_millis(timeout_ms as u64); |
| let scan_batches = py |
| .detach(|| TOKIO_RUNTIME.block_on(async { scanner.poll(timeout).await })) |
| .map_err(|e| FlussError::new_err(e.to_string()))?; |
| |
| // Convert ScanBatch to RecordBatch with metadata |
| let result = scan_batches |
| .into_iter() |
| .map(RecordBatch::from_scan_batch) |
| .collect(); |
| |
| Ok(result) |
| } |
| |
| /// Poll for new records as an Arrow Table. |
| /// |
| /// Args: |
| /// timeout_ms: Timeout in milliseconds to wait for records |
| /// |
| /// Returns: |
| /// PyArrow Table containing the polled records (batches merged) |
| /// |
| /// Note: |
| /// - Requires a batch-based scanner (created with new_scan().create_batch_scanner()) |
| /// - Returns an empty table (with correct schema) if no records are available |
| /// - When timeout expires, returns an empty table (NOT an error) |
| fn poll_arrow(&self, py: Python, timeout_ms: i64) -> PyResult<Py<PyAny>> { |
| let scanner = self.scanner.as_batch()?; |
| |
| if timeout_ms < 0 { |
| return Err(FlussError::new_err(format!( |
| "timeout_ms must be non-negative, got: {timeout_ms}" |
| ))); |
| } |
| |
| let timeout = Duration::from_millis(timeout_ms as u64); |
| let scan_batches = py |
| .detach(|| TOKIO_RUNTIME.block_on(async { scanner.poll(timeout).await })) |
| .map_err(|e| FlussError::new_err(e.to_string()))?; |
| |
| // Convert ScanBatch to Arrow batches |
| if scan_batches.is_empty() { |
| return self.create_empty_table(py); |
| } |
| |
| let arrow_batches: Vec<_> = scan_batches |
| .into_iter() |
| .map(|scan_batch| Arc::new(scan_batch.into_batch())) |
| .collect(); |
| |
| Utils::combine_batches_to_table(py, arrow_batches) |
| } |
| |
| /// Create an empty PyArrow table with the correct (projected) schema |
| fn create_empty_table(&self, py: Python) -> PyResult<Py<PyAny>> { |
| // Use the projected schema stored in the scanner |
| let py_schema = self |
| .projected_schema |
| .as_ref() |
| .to_pyarrow(py) |
| .map_err(|e| FlussError::new_err(format!("Failed to convert schema: {e}")))?; |
| |
| let pyarrow = py.import("pyarrow")?; |
| let empty_table = pyarrow |
| .getattr("Table")? |
| .call_method1("from_batches", (vec![] as Vec<Py<PyAny>>, py_schema))?; |
| |
| Ok(empty_table.into()) |
| } |
| |
| /// Convert all data to Arrow Table. |
| /// |
| /// Reads from currently subscribed buckets until reaching their latest offsets. |
| /// Works for both partitioned and non-partitioned tables. |
| /// |
| /// You must call subscribe(), subscribe_buckets(), subscribe_partition(), or subscribe_partition_buckets() first. |
| /// |
| /// Returns: |
| /// PyArrow Table containing all data from subscribed buckets |
| fn to_arrow(&self, py: Python) -> PyResult<Py<PyAny>> { |
| let scanner = self.scanner.as_batch()?; |
| let subscribed = scanner.get_subscribed_buckets(); |
| if subscribed.is_empty() { |
| return Err(FlussError::new_err( |
| "No buckets subscribed. Call subscribe(), subscribe_buckets(), subscribe_partition(), or subscribe_partition_buckets() first.", |
| )); |
| } |
| |
| // 2. Query latest offsets for all subscribed buckets |
| let stopping_offsets = self.query_latest_offsets(py, &subscribed)?; |
| |
| // 3. Poll until all buckets reach their stopping offsets |
| self.poll_until_offsets(py, stopping_offsets) |
| } |
| |
| /// Convert all data to Pandas DataFrame. |
| /// |
| /// Reads from currently subscribed buckets until reaching their latest offsets. |
| /// Works for both partitioned and non-partitioned tables. |
| /// |
| /// You must call subscribe(), subscribe_buckets(), subscribe_partition(), or subscribe_partition_buckets() first. |
| /// |
| /// Returns: |
| /// Pandas DataFrame containing all data from subscribed buckets |
| fn to_pandas(&self, py: Python) -> PyResult<Py<PyAny>> { |
| let arrow_table = self.to_arrow(py)?; |
| |
| // Convert Arrow Table to Pandas DataFrame using pyarrow |
| let df = arrow_table.call_method0(py, "to_pandas")?; |
| Ok(df) |
| } |
| |
| fn __repr__(&self) -> String { |
| format!("LogScanner(table={})", self.table_info.table_path) |
| } |
| } |
| |
| impl LogScanner { |
| fn new( |
| scanner: ScannerKind, |
| admin: fcore::client::FlussAdmin, |
| table_info: fcore::metadata::TableInfo, |
| projected_schema: SchemaRef, |
| projected_row_type: fcore::metadata::RowType, |
| ) -> Self { |
| Self { |
| scanner, |
| admin, |
| table_info, |
| projected_schema, |
| projected_row_type, |
| partition_name_cache: std::sync::RwLock::new(None), |
| } |
| } |
| |
| /// Get partition_id -> partition_name mapping, using cache if available |
| fn get_partition_name_map( |
| &self, |
| py: Python, |
| table_path: &fcore::metadata::TablePath, |
| ) -> PyResult<HashMap<i64, String>> { |
| // Check cache first (read lock) |
| { |
| let cache = self.partition_name_cache.read().unwrap(); |
| if let Some(map) = cache.as_ref() { |
| return Ok(map.clone()); |
| } |
| } |
| |
| // Fetch partition infos (releases GIL during async call) |
| let partition_infos: Vec<fcore::metadata::PartitionInfo> = py |
| .detach(|| { |
| TOKIO_RUNTIME.block_on(async { self.admin.list_partition_infos(table_path).await }) |
| }) |
| .map_err(|e| FlussError::new_err(format!("Failed to list partition infos: {e}")))?; |
| |
| // Build and cache the mapping |
| let map: HashMap<i64, String> = partition_infos |
| .into_iter() |
| .map(|info| (info.get_partition_id(), info.get_partition_name())) |
| .collect(); |
| |
| // Store in cache (write lock) |
| { |
| let mut cache = self.partition_name_cache.write().unwrap(); |
| *cache = Some(map.clone()); |
| } |
| |
| Ok(map) |
| } |
| |
| /// Query latest offsets for subscribed buckets (handles both partitioned and non-partitioned) |
| fn query_latest_offsets( |
| &self, |
| py: Python, |
| subscribed: &[(fcore::metadata::TableBucket, i64)], |
| ) -> PyResult<HashMap<fcore::metadata::TableBucket, i64>> { |
| let scanner = self.scanner.as_batch()?; |
| let is_partitioned = scanner.is_partitioned(); |
| let table_path = &self.table_info.table_path; |
| |
| if !is_partitioned { |
| // Non-partitioned: simple case - just query all bucket IDs |
| let bucket_ids: Vec<i32> = subscribed.iter().map(|(tb, _)| tb.bucket_id()).collect(); |
| |
| let offsets: HashMap<i32, i64> = py |
| .detach(|| { |
| TOKIO_RUNTIME.block_on(async { |
| self.admin |
| .list_offsets(table_path, &bucket_ids, OffsetSpec::Latest) |
| .await |
| }) |
| }) |
| .map_err(|e| FlussError::new_err(format!("Failed to list offsets: {e}")))?; |
| |
| // Convert to TableBucket-keyed map |
| let table_id = self.table_info.table_id; |
| Ok(offsets |
| .into_iter() |
| .filter(|(_, offset)| *offset > 0) |
| .map(|(bucket_id, offset)| { |
| ( |
| fcore::metadata::TableBucket::new(table_id, bucket_id), |
| offset, |
| ) |
| }) |
| .collect()) |
| } else { |
| // Partitioned: need to query per partition |
| self.query_partitioned_offsets(py, subscribed) |
| } |
| } |
| |
| /// Query offsets for partitioned table subscriptions |
| fn query_partitioned_offsets( |
| &self, |
| py: Python, |
| subscribed: &[(fcore::metadata::TableBucket, i64)], |
| ) -> PyResult<HashMap<fcore::metadata::TableBucket, i64>> { |
| let table_path = &self.table_info.table_path; |
| |
| // Get partition_id -> partition_name mapping (cached) |
| let partition_id_to_name = self.get_partition_name_map(py, table_path)?; |
| |
| // Group subscribed buckets by partition_id |
| let mut by_partition: HashMap<i64, Vec<i32>> = HashMap::new(); |
| for (tb, _) in subscribed { |
| if let Some(partition_id) = tb.partition_id() { |
| by_partition |
| .entry(partition_id) |
| .or_default() |
| .push(tb.bucket_id()); |
| } |
| } |
| |
| // Query offsets for each partition |
| let mut result: HashMap<fcore::metadata::TableBucket, i64> = HashMap::new(); |
| let table_id = self.table_info.table_id; |
| |
| for (partition_id, bucket_ids) in by_partition { |
| let partition_name = partition_id_to_name.get(&partition_id).ok_or_else(|| { |
| FlussError::new_err(format!("Unknown partition_id: {partition_id}")) |
| })?; |
| |
| let offsets: HashMap<i32, i64> = py |
| .detach(|| { |
| TOKIO_RUNTIME.block_on(async { |
| self.admin |
| .list_partition_offsets( |
| table_path, |
| partition_name, |
| &bucket_ids, |
| OffsetSpec::Latest, |
| ) |
| .await |
| }) |
| }) |
| .map_err(|e| { |
| FlussError::new_err(format!( |
| "Failed to list offsets for partition {partition_name}: {e}" |
| )) |
| })?; |
| |
| for (bucket_id, offset) in offsets { |
| if offset > 0 { |
| let tb = fcore::metadata::TableBucket::new_with_partition( |
| table_id, |
| Some(partition_id), |
| bucket_id, |
| ); |
| result.insert(tb, offset); |
| } |
| } |
| } |
| |
| Ok(result) |
| } |
| |
| /// Poll until all buckets reach their stopping offsets |
| fn poll_until_offsets( |
| &self, |
| py: Python, |
| mut stopping_offsets: HashMap<fcore::metadata::TableBucket, i64>, |
| ) -> PyResult<Py<PyAny>> { |
| let scanner = self.scanner.as_batch()?; |
| let mut all_batches = Vec::new(); |
| |
| while !stopping_offsets.is_empty() { |
| let scan_batches = py |
| .detach(|| { |
| TOKIO_RUNTIME.block_on(async { scanner.poll(Duration::from_millis(500)).await }) |
| }) |
| .map_err(|e| FlussError::new_err(format!("Failed to poll: {e}")))?; |
| |
| if scan_batches.is_empty() { |
| continue; |
| } |
| |
| for scan_batch in scan_batches { |
| let table_bucket = scan_batch.bucket().clone(); |
| |
| // Check if this bucket is still being tracked |
| let Some(&stop_at) = stopping_offsets.get(&table_bucket) else { |
| continue; |
| }; |
| |
| let base_offset = scan_batch.base_offset(); |
| let last_offset = scan_batch.last_offset(); |
| |
| // If the batch starts at or after the stop_at offset, the bucket is exhausted |
| if base_offset >= stop_at { |
| stopping_offsets.remove(&table_bucket); |
| continue; |
| } |
| |
| let batch = if last_offset >= stop_at { |
| // Slice batch to keep only records where offset < stop_at |
| let num_to_keep = (stop_at - base_offset) as usize; |
| let b = scan_batch.into_batch(); |
| let limit = num_to_keep.min(b.num_rows()); |
| b.slice(0, limit) |
| } else { |
| scan_batch.into_batch() |
| }; |
| |
| all_batches.push(Arc::new(batch)); |
| |
| // Check if we're done with this bucket |
| if last_offset >= stop_at - 1 { |
| stopping_offsets.remove(&table_bucket); |
| } |
| } |
| } |
| |
| Utils::combine_batches_to_table(py, all_batches) |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| |
| #[test] |
| fn test_nanos_to_millis_and_submillis() { |
| // Simple positive case |
| assert_eq!(nanos_to_millis_and_submillis(1_500_000), (1, 500_000)); |
| |
| // Exact millisecond boundary |
| assert_eq!(nanos_to_millis_and_submillis(2_000_000), (2, 0)); |
| |
| // Zero |
| assert_eq!(nanos_to_millis_and_submillis(0), (0, 0)); |
| |
| // Large value |
| assert_eq!( |
| nanos_to_millis_and_submillis(86_400_000_000_000), // 1 day in nanos |
| (86_400_000, 0) |
| ); |
| |
| // Negative: -1.5 milliseconds should be (-2 millis, +500_000 nanos) |
| // Because -1_500_000 nanos = -2ms + 500_000ns |
| assert_eq!(nanos_to_millis_and_submillis(-1_500_000), (-2, 500_000)); |
| |
| // Negative exact boundary |
| assert_eq!(nanos_to_millis_and_submillis(-2_000_000), (-2, 0)); |
| |
| // Small negative |
| assert_eq!(nanos_to_millis_and_submillis(-1), (-1, 999_999)); |
| |
| // Negative with sub-millisecond part |
| assert_eq!(nanos_to_millis_and_submillis(-500_000), (-1, 500_000)); |
| } |
| } |