blob: 2a8df25c9d5e9eb5ea23ffd11df828bd87c9dd38 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use crate::TOKIO_RUNTIME;
use crate::*;
use fluss::client::EARLIEST_OFFSET;
use fluss::rpc::message::OffsetSpec;
use pyo3_async_runtimes::tokio::future_into_py;
use std::sync::Arc;
/// Represents a Fluss table for data operations
#[pyclass]
pub struct FlussTable {
connection: Arc<fcore::client::FlussConnection>,
metadata: Arc<fcore::client::Metadata>,
table_info: fcore::metadata::TableInfo,
table_path: fcore::metadata::TablePath,
has_primary_key: bool,
}
#[pymethods]
impl FlussTable {
/// Create a new append writer for the table
fn new_append_writer<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let conn = self.connection.clone();
let metadata = self.metadata.clone();
let table_info = self.table_info.clone();
future_into_py(py, async move {
let fluss_table = fcore::client::FlussTable::new(&conn, metadata, table_info);
let table_append = fluss_table
.new_append()
.map_err(|e| FlussError::new_err(e.to_string()))?;
let rust_writer = table_append.create_writer();
let py_writer = AppendWriter::from_core(rust_writer);
Python::attach(|py| Py::new(py, py_writer))
})
}
/// Create a new log scanner for the table
fn new_log_scanner<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let conn = self.connection.clone();
let metadata = self.metadata.clone();
let table_info = self.table_info.clone();
future_into_py(py, async move {
let fluss_table =
fcore::client::FlussTable::new(&conn, metadata.clone(), table_info.clone());
let table_scan = fluss_table.new_scan();
let rust_scanner = table_scan.create_log_scanner();
let admin = conn
.get_admin()
.await
.map_err(|e| FlussError::new_err(e.to_string()))?;
let py_scanner = LogScanner::from_core(rust_scanner, admin, table_info.clone());
Python::attach(|py| Py::new(py, py_scanner))
})
}
/// Get table information
pub fn get_table_info(&self) -> TableInfo {
TableInfo::from_core(self.table_info.clone())
}
/// Get table path
pub fn get_table_path(&self) -> TablePath {
TablePath::from_core(self.table_path.clone())
}
/// Check if table has primary key
pub fn has_primary_key(&self) -> bool {
self.has_primary_key
}
fn __repr__(&self) -> String {
format!(
"FlussTable(path={}.{})",
self.table_path.database(),
self.table_path.table()
)
}
}
impl FlussTable {
/// Create a FlussTable
pub fn new_table(
connection: Arc<fcore::client::FlussConnection>,
metadata: Arc<fcore::client::Metadata>,
table_info: fcore::metadata::TableInfo,
table_path: fcore::metadata::TablePath,
has_primary_key: bool,
) -> Self {
Self {
connection,
metadata,
table_info,
table_path,
has_primary_key,
}
}
}
/// Writer for appending data to a Fluss table
#[pyclass]
pub struct AppendWriter {
inner: fcore::client::AppendWriter,
}
#[pymethods]
impl AppendWriter {
/// Write Arrow table data
pub fn write_arrow(&mut self, py: Python, table: Py<PyAny>) -> PyResult<()> {
// Convert Arrow Table to batches and write each batch
let batches = table.call_method0(py, "to_batches")?;
let batch_list: Vec<Py<PyAny>> = batches.extract(py)?;
for batch in batch_list {
self.write_arrow_batch(py, batch)?;
}
Ok(())
}
/// Write Arrow batch data
pub fn write_arrow_batch(&mut self, py: Python, batch: Py<PyAny>) -> PyResult<()> {
// Extract number of rows and columns from the Arrow batch
let num_rows: usize = batch.getattr(py, "num_rows")?.extract(py)?;
let num_columns: usize = batch.getattr(py, "num_columns")?.extract(py)?;
// Process each row in the batch
for row_idx in 0..num_rows {
let mut generic_row = fcore::row::GenericRow::new();
// Extract values for each column in this row
for col_idx in 0..num_columns {
let column = batch.call_method1(py, "column", (col_idx,))?;
let value = column.call_method1(py, "__getitem__", (row_idx,))?;
// Convert the Python value to a Datum and add to the row
let datum = self.convert_python_value_to_datum(py, value)?;
generic_row.set_field(col_idx, datum);
}
// Append this row using the async append method
TOKIO_RUNTIME.block_on(async {
self.inner
.append(generic_row)
.await
.map_err(|e| FlussError::new_err(e.to_string()))
})?;
}
Ok(())
}
/// Write Pandas DataFrame data
pub fn write_pandas(&mut self, py: Python, df: Py<PyAny>) -> PyResult<()> {
// Import pyarrow module
let pyarrow = py.import("pyarrow")?;
// Get the Table class from pyarrow module
let table_class = pyarrow.getattr("Table")?;
// Call Table.from_pandas(df) - from_pandas is a class method
let pa_table = table_class.call_method1("from_pandas", (df,))?;
// Then call write_arrow with the converted table
self.write_arrow(py, pa_table.into())
}
/// Flush any pending data
pub fn flush(&mut self) -> PyResult<()> {
TOKIO_RUNTIME.block_on(async {
self.inner
.flush()
.await
.map_err(|e| FlussError::new_err(e.to_string()))
})
}
fn __repr__(&self) -> String {
"AppendWriter()".to_string()
}
}
impl AppendWriter {
/// Create a AppendWriter from a core append writer
pub fn from_core(append: fcore::client::AppendWriter) -> Self {
Self { inner: append }
}
fn convert_python_value_to_datum(
&self,
py: Python,
value: Py<PyAny>,
) -> PyResult<fcore::row::Datum<'static>> {
use fcore::row::{Blob, Datum, F32, F64};
// Check for None (null)
if value.is_none(py) {
return Ok(Datum::Null);
}
// Try to extract different types
if let Ok(type_name) = value.bind(py).get_type().name() {
if type_name == "StringScalar" {
if let Ok(py_value) = value.call_method0(py, "as_py") {
if let Ok(str_val) = py_value.extract::<String>(py) {
let leaked_str: &'static str = Box::leak(str_val.into_boxed_str());
return Ok(Datum::String(leaked_str));
}
}
}
}
if let Ok(bool_val) = value.extract::<bool>(py) {
return Ok(Datum::Bool(bool_val));
}
if let Ok(int_val) = value.extract::<i32>(py) {
return Ok(Datum::Int32(int_val));
}
if let Ok(int_val) = value.extract::<i64>(py) {
return Ok(Datum::Int64(int_val));
}
if let Ok(float_val) = value.extract::<f32>(py) {
return Ok(Datum::Float32(F32::from(float_val)));
}
if let Ok(float_val) = value.extract::<f64>(py) {
return Ok(Datum::Float64(F64::from(float_val)));
}
if let Ok(str_val) = value.extract::<String>(py) {
// Convert String to &'static str by leaking memory
// This is a simplified approach - in production, you might want better lifetime management
let leaked_str: &'static str = Box::leak(str_val.into_boxed_str());
return Ok(Datum::String(leaked_str));
}
if let Ok(bytes_val) = value.extract::<Vec<u8>>(py) {
let blob = Blob::from(bytes_val);
return Ok(Datum::Blob(blob));
}
// If we can't convert, return an error
let type_name = value.bind(py).get_type().name()?;
Err(FlussError::new_err(format!(
"Cannot convert Python value to Datum: {type_name:?}"
)))
}
}
/// Scanner for reading log data from a Fluss table
#[pyclass]
pub struct LogScanner {
inner: fcore::client::LogScanner,
admin: fcore::client::FlussAdmin,
table_info: fcore::metadata::TableInfo,
#[allow(dead_code)]
start_timestamp: Option<i64>,
#[allow(dead_code)]
end_timestamp: Option<i64>,
}
#[pymethods]
impl LogScanner {
/// Subscribe to log data with timestamp range
fn subscribe(
&mut self,
_start_timestamp: Option<i64>,
_end_timestamp: Option<i64>,
) -> PyResult<()> {
if _start_timestamp.is_some() {
return Err(FlussError::new_err(
"Specifying start_timestamp is not yet supported. Please use None.".to_string(),
));
}
if _end_timestamp.is_some() {
return Err(FlussError::new_err(
"Specifying end_timestamp is not yet supported. Please use None.".to_string(),
));
}
let num_buckets = self.table_info.get_num_buckets();
for bucket_id in 0..num_buckets {
let start_offset = EARLIEST_OFFSET;
TOKIO_RUNTIME.block_on(async {
self.inner
.subscribe(bucket_id, start_offset)
.await
.map_err(|e| FlussError::new_err(e.to_string()))
})?;
}
Ok(())
}
/// Convert all data to Arrow Table
fn to_arrow(&self, py: Python) -> PyResult<Py<PyAny>> {
use std::collections::HashMap;
use std::time::Duration;
let mut all_batches = Vec::new();
let num_buckets = self.table_info.get_num_buckets();
let bucket_ids: Vec<i32> = (0..num_buckets).collect();
// todo: after supporting list_offsets with timestamp, we can use start_timestamp and end_timestamp here
let mut stopping_offsets: HashMap<i32, i64> = TOKIO_RUNTIME
.block_on(async {
self.admin
.list_offsets(
&self.table_info.table_path,
bucket_ids.as_slice(),
OffsetSpec::Latest,
)
.await
})
.map_err(|e| FlussError::new_err(e.to_string()))?;
if !stopping_offsets.is_empty() {
loop {
let batch_result = TOKIO_RUNTIME
.block_on(async { self.inner.poll(Duration::from_millis(500)).await });
match batch_result {
Ok(scan_records) => {
let mut result_records: Vec<fcore::record::ScanRecord> = vec![];
for (bucket, records) in scan_records.into_records_by_buckets() {
let stopping_offset = stopping_offsets.get(&bucket.bucket_id());
if stopping_offset.is_none() {
// not to include this bucket, skip records for this bucket
// since we already reach end offset for this bucket
continue;
}
if let Some(last_record) = records.last() {
let offset = last_record.offset();
result_records.extend(records);
if offset >= stopping_offset.unwrap() - 1 {
stopping_offsets.remove(&bucket.bucket_id());
}
}
}
if !result_records.is_empty() {
let arrow_batch = Utils::convert_scan_records_to_arrow(result_records);
all_batches.extend(arrow_batch);
}
// we have reach end offsets of all bucket
if stopping_offsets.is_empty() {
break;
}
}
Err(e) => return Err(FlussError::new_err(e.to_string())),
}
}
}
Utils::combine_batches_to_table(py, all_batches)
}
/// Convert all data to Pandas DataFrame
fn to_pandas(&self, py: Python) -> PyResult<Py<PyAny>> {
let arrow_table = self.to_arrow(py)?;
// Convert Arrow Table to Pandas DataFrame using pyarrow
let df = arrow_table.call_method0(py, "to_pandas")?;
Ok(df)
}
fn __repr__(&self) -> String {
format!("LogScanner(table={})", self.table_info.table_path)
}
}
impl LogScanner {
/// Create LogScanner from core LogScanner
pub fn from_core(
inner_scanner: fcore::client::LogScanner,
admin: fcore::client::FlussAdmin,
table_info: fcore::metadata::TableInfo,
) -> Self {
Self {
inner: inner_scanner,
admin,
table_info,
start_timestamp: None,
end_timestamp: None,
}
}
}