| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| mod types; |
| |
| use std::sync::{Arc, LazyLock}; |
| use std::time::Duration; |
| |
| use fluss as fcore; |
| use fluss::PartitionId; |
| |
| static RUNTIME: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| { |
| tokio::runtime::Builder::new_multi_thread() |
| .enable_all() |
| .build() |
| .unwrap() |
| }); |
| |
| #[cxx::bridge(namespace = "fluss::ffi")] |
| mod ffi { |
| struct HashMapValue { |
| key: String, |
| value: String, |
| } |
| |
| struct FfiConfig { |
| bootstrap_servers: String, |
| writer_request_max_size: i32, |
| writer_acks: String, |
| writer_retries: i32, |
| writer_batch_size: i32, |
| writer_bucket_no_key_assigner: String, |
| scanner_remote_log_prefetch_num: usize, |
| remote_file_download_thread_num: usize, |
| scanner_remote_log_read_concurrency: usize, |
| scanner_log_max_poll_records: usize, |
| scanner_log_fetch_max_bytes: i32, |
| scanner_log_fetch_min_bytes: i32, |
| scanner_log_fetch_wait_max_time_ms: i32, |
| scanner_log_fetch_max_bytes_for_bucket: i32, |
| writer_batch_timeout_ms: i64, |
| connect_timeout_ms: u64, |
| security_protocol: String, |
| security_sasl_mechanism: String, |
| security_sasl_username: String, |
| security_sasl_password: String, |
| } |
| |
| struct FfiResult { |
| error_code: i32, |
| error_message: String, |
| } |
| |
| struct FfiTablePath { |
| database_name: String, |
| table_name: String, |
| } |
| |
| struct FfiColumn { |
| name: String, |
| data_type: i32, |
| comment: String, |
| precision: i32, |
| scale: i32, |
| } |
| |
| struct FfiSchema { |
| columns: Vec<FfiColumn>, |
| primary_keys: Vec<String>, |
| } |
| |
| struct FfiTableDescriptor { |
| schema: FfiSchema, |
| partition_keys: Vec<String>, |
| bucket_count: i32, |
| bucket_keys: Vec<String>, |
| properties: Vec<HashMapValue>, |
| custom_properties: Vec<HashMapValue>, |
| comment: String, |
| } |
| |
| struct FfiTableInfo { |
| table_id: i64, |
| schema_id: i32, |
| table_path: FfiTablePath, |
| created_time: i64, |
| modified_time: i64, |
| primary_keys: Vec<String>, |
| bucket_keys: Vec<String>, |
| partition_keys: Vec<String>, |
| num_buckets: i32, |
| has_primary_key: bool, |
| is_partitioned: bool, |
| properties: Vec<HashMapValue>, |
| custom_properties: Vec<HashMapValue>, |
| comment: String, |
| schema: FfiSchema, |
| } |
| |
| struct FfiTableInfoResult { |
| result: FfiResult, |
| table_info: FfiTableInfo, |
| } |
| |
| // NOTE: FfiDatum, FfiGenericRow, FfiScanRecord, FfiScanRecords, FfiScanRecordsResult |
| // have been replaced by opaque types below (ScanResultInner, GenericRowInner, LookupResultInner). |
| |
| struct FfiArrowRecordBatch { |
| array_ptr: usize, |
| schema_ptr: usize, |
| table_id: i64, |
| partition_id: i64, |
| bucket_id: i32, |
| base_offset: i64, |
| } |
| |
| struct FfiArrowRecordBatches { |
| batches: Vec<FfiArrowRecordBatch>, |
| } |
| |
| struct FfiArrowRecordBatchesResult { |
| result: FfiResult, |
| arrow_batches: FfiArrowRecordBatches, |
| } |
| |
| struct FfiLakeSnapshot { |
| snapshot_id: i64, |
| bucket_offsets: Vec<FfiBucketOffset>, |
| } |
| |
| struct FfiBucketOffset { |
| table_id: i64, |
| partition_id: i64, |
| bucket_id: i32, |
| offset: i64, |
| } |
| |
| struct FfiOffsetQuery { |
| offset_type: i32, |
| timestamp: i64, |
| } |
| |
| struct FfiBucketInfo { |
| table_id: i64, |
| bucket_id: i32, |
| has_partition_id: bool, |
| partition_id: i64, |
| record_count: usize, |
| } |
| |
| struct FfiBucketSubscription { |
| bucket_id: i32, |
| offset: i64, |
| } |
| |
| struct FfiPartitionBucketSubscription { |
| partition_id: i64, |
| bucket_id: i32, |
| offset: i64, |
| } |
| |
| struct FfiBucketOffsetPair { |
| bucket_id: i32, |
| offset: i64, |
| } |
| |
| struct FfiListOffsetsResult { |
| result: FfiResult, |
| bucket_offsets: Vec<FfiBucketOffsetPair>, |
| } |
| |
| // NOTE: FfiLookupResult replaced by opaque LookupResultInner below. |
| |
| struct FfiLakeSnapshotResult { |
| result: FfiResult, |
| lake_snapshot: FfiLakeSnapshot, |
| } |
| |
| struct FfiPartitionKeyValue { |
| key: String, |
| value: String, |
| } |
| |
| struct FfiPartitionInfo { |
| partition_id: i64, |
| partition_name: String, |
| } |
| |
| struct FfiListPartitionInfosResult { |
| result: FfiResult, |
| partition_infos: Vec<FfiPartitionInfo>, |
| } |
| |
| struct FfiDatabaseDescriptor { |
| comment: String, |
| properties: Vec<HashMapValue>, |
| } |
| |
| struct FfiDatabaseInfo { |
| database_name: String, |
| comment: String, |
| properties: Vec<HashMapValue>, |
| created_time: i64, |
| modified_time: i64, |
| } |
| |
| struct FfiDatabaseInfoResult { |
| result: FfiResult, |
| database_info: FfiDatabaseInfo, |
| } |
| |
| struct FfiListDatabasesResult { |
| result: FfiResult, |
| database_names: Vec<String>, |
| } |
| |
| struct FfiListTablesResult { |
| result: FfiResult, |
| table_names: Vec<String>, |
| } |
| |
| struct FfiBoolResult { |
| result: FfiResult, |
| value: bool, |
| } |
| |
| struct FfiServerNode { |
| node_id: i32, |
| host: String, |
| port: u32, |
| server_type: String, |
| uid: String, |
| } |
| |
| struct FfiServerNodesResult { |
| result: FfiResult, |
| server_nodes: Vec<FfiServerNode>, |
| } |
| |
| struct FfiPtrResult { |
| result: FfiResult, |
| ptr: usize, |
| } |
| |
| extern "Rust" { |
| type Connection; |
| type Admin; |
| type Table; |
| type AppendWriter; |
| type WriteResult; |
| type LogScanner; |
| type UpsertWriter; |
| type Lookuper; |
| |
| // Opaque types for optimized FFI |
| type ScanResultInner; |
| type GenericRowInner; |
| type LookupResultInner; |
| |
| // Connection |
| fn new_connection(config: &FfiConfig) -> FfiPtrResult; |
| unsafe fn delete_connection(conn: *mut Connection); |
| fn get_admin(self: &Connection) -> FfiPtrResult; |
| fn get_table(self: &Connection, table_path: &FfiTablePath) -> FfiPtrResult; |
| |
| // Admin |
| unsafe fn delete_admin(admin: *mut Admin); |
| fn create_table( |
| self: &Admin, |
| table_path: &FfiTablePath, |
| descriptor: &FfiTableDescriptor, |
| ignore_if_exists: bool, |
| ) -> FfiResult; |
| fn drop_table( |
| self: &Admin, |
| table_path: &FfiTablePath, |
| ignore_if_not_exists: bool, |
| ) -> FfiResult; |
| fn get_table_info(self: &Admin, table_path: &FfiTablePath) -> FfiTableInfoResult; |
| fn get_latest_lake_snapshot( |
| self: &Admin, |
| table_path: &FfiTablePath, |
| ) -> FfiLakeSnapshotResult; |
| fn list_offsets( |
| self: &Admin, |
| table_path: &FfiTablePath, |
| bucket_ids: Vec<i32>, |
| offset_query: &FfiOffsetQuery, |
| ) -> FfiListOffsetsResult; |
| fn list_partition_offsets( |
| self: &Admin, |
| table_path: &FfiTablePath, |
| partition_name: String, |
| bucket_ids: Vec<i32>, |
| offset_query: &FfiOffsetQuery, |
| ) -> FfiListOffsetsResult; |
| fn list_partition_infos( |
| self: &Admin, |
| table_path: &FfiTablePath, |
| ) -> FfiListPartitionInfosResult; |
| fn list_partition_infos_with_spec( |
| self: &Admin, |
| table_path: &FfiTablePath, |
| partition_spec: Vec<FfiPartitionKeyValue>, |
| ) -> FfiListPartitionInfosResult; |
| fn create_partition( |
| self: &Admin, |
| table_path: &FfiTablePath, |
| partition_spec: Vec<FfiPartitionKeyValue>, |
| ignore_if_exists: bool, |
| ) -> FfiResult; |
| fn drop_partition( |
| self: &Admin, |
| table_path: &FfiTablePath, |
| partition_spec: Vec<FfiPartitionKeyValue>, |
| ignore_if_not_exists: bool, |
| ) -> FfiResult; |
| fn create_database( |
| self: &Admin, |
| database_name: &str, |
| descriptor: &FfiDatabaseDescriptor, |
| ignore_if_exists: bool, |
| ) -> FfiResult; |
| fn drop_database( |
| self: &Admin, |
| database_name: &str, |
| ignore_if_not_exists: bool, |
| cascade: bool, |
| ) -> FfiResult; |
| fn list_databases(self: &Admin) -> FfiListDatabasesResult; |
| fn database_exists(self: &Admin, database_name: &str) -> FfiBoolResult; |
| fn get_database_info(self: &Admin, database_name: &str) -> FfiDatabaseInfoResult; |
| fn list_tables(self: &Admin, database_name: &str) -> FfiListTablesResult; |
| fn table_exists(self: &Admin, table_path: &FfiTablePath) -> FfiBoolResult; |
| fn get_server_nodes(self: &Admin) -> FfiServerNodesResult; |
| |
| // Table |
| unsafe fn delete_table(table: *mut Table); |
| fn new_append_writer(self: &Table) -> FfiPtrResult; |
| fn create_scanner(self: &Table, column_indices: Vec<usize>, batch: bool) -> FfiPtrResult; |
| fn get_table_info_from_table(self: &Table) -> FfiTableInfo; |
| fn get_table_path(self: &Table) -> FfiTablePath; |
| fn has_primary_key(self: &Table) -> bool; |
| fn create_upsert_writer(self: &Table, column_indices: Vec<usize>) -> FfiPtrResult; |
| fn new_lookuper(self: &Table) -> FfiPtrResult; |
| |
| // GenericRowInner — opaque row for writes |
| fn new_generic_row(field_count: usize) -> Box<GenericRowInner>; |
| fn gr_reset(self: &mut GenericRowInner); |
| fn gr_set_null(self: &mut GenericRowInner, idx: usize); |
| fn gr_set_bool(self: &mut GenericRowInner, idx: usize, val: bool); |
| fn gr_set_i32(self: &mut GenericRowInner, idx: usize, val: i32); |
| fn gr_set_i64(self: &mut GenericRowInner, idx: usize, val: i64); |
| fn gr_set_f32(self: &mut GenericRowInner, idx: usize, val: f32); |
| fn gr_set_f64(self: &mut GenericRowInner, idx: usize, val: f64); |
| fn gr_set_str(self: &mut GenericRowInner, idx: usize, val: &str); |
| fn gr_set_bytes(self: &mut GenericRowInner, idx: usize, val: &[u8]); |
| fn gr_set_date(self: &mut GenericRowInner, idx: usize, days: i32); |
| fn gr_set_time(self: &mut GenericRowInner, idx: usize, millis: i32); |
| fn gr_set_ts_ntz(self: &mut GenericRowInner, idx: usize, millis: i64, nanos: i32); |
| fn gr_set_ts_ltz(self: &mut GenericRowInner, idx: usize, millis: i64, nanos: i32); |
| fn gr_set_decimal_str(self: &mut GenericRowInner, idx: usize, val: &str); |
| |
| // AppendWriter |
| unsafe fn delete_append_writer(writer: *mut AppendWriter); |
| fn append(self: &mut AppendWriter, row: &GenericRowInner) -> FfiPtrResult; |
| fn append_arrow_batch( |
| self: &mut AppendWriter, |
| array_ptr: usize, |
| schema_ptr: usize, |
| ) -> FfiPtrResult; |
| fn flush(self: &mut AppendWriter) -> FfiResult; |
| |
| // WriteResult |
| unsafe fn delete_write_result(wr: *mut WriteResult); |
| fn wait(self: &mut WriteResult) -> FfiResult; |
| |
| // UpsertWriter |
| unsafe fn delete_upsert_writer(writer: *mut UpsertWriter); |
| fn upsert(self: &mut UpsertWriter, row: &GenericRowInner) -> FfiPtrResult; |
| fn delete_row(self: &mut UpsertWriter, row: &GenericRowInner) -> FfiPtrResult; |
| fn upsert_flush(self: &mut UpsertWriter) -> FfiResult; |
| |
| // Lookuper |
| unsafe fn delete_lookuper(lookuper: *mut Lookuper); |
| fn lookup(self: &mut Lookuper, pk_row: &GenericRowInner) -> Box<LookupResultInner>; |
| |
| // LookupResultInner accessors |
| fn lv_has_error(self: &LookupResultInner) -> bool; |
| fn lv_error_code(self: &LookupResultInner) -> i32; |
| fn lv_error_message(self: &LookupResultInner) -> &str; |
| fn lv_found(self: &LookupResultInner) -> bool; |
| fn lv_field_count(self: &LookupResultInner) -> usize; |
| fn lv_column_name(self: &LookupResultInner, field: usize) -> Result<&str>; |
| fn lv_column_type(self: &LookupResultInner, field: usize) -> Result<i32>; |
| fn lv_is_null(self: &LookupResultInner, field: usize) -> Result<bool>; |
| fn lv_get_bool(self: &LookupResultInner, field: usize) -> Result<bool>; |
| fn lv_get_i32(self: &LookupResultInner, field: usize) -> Result<i32>; |
| fn lv_get_i64(self: &LookupResultInner, field: usize) -> Result<i64>; |
| fn lv_get_f32(self: &LookupResultInner, field: usize) -> Result<f32>; |
| fn lv_get_f64(self: &LookupResultInner, field: usize) -> Result<f64>; |
| fn lv_get_str(self: &LookupResultInner, field: usize) -> Result<&str>; |
| fn lv_get_bytes(self: &LookupResultInner, field: usize) -> Result<&[u8]>; |
| fn lv_get_date_days(self: &LookupResultInner, field: usize) -> Result<i32>; |
| fn lv_get_time_millis(self: &LookupResultInner, field: usize) -> Result<i32>; |
| fn lv_get_ts_millis(self: &LookupResultInner, field: usize) -> Result<i64>; |
| fn lv_get_ts_nanos(self: &LookupResultInner, field: usize) -> Result<i32>; |
| fn lv_is_ts_ltz(self: &LookupResultInner, field: usize) -> Result<bool>; |
| fn lv_get_decimal_str(self: &LookupResultInner, field: usize) -> Result<String>; |
| |
| // LogScanner |
| unsafe fn delete_log_scanner(scanner: *mut LogScanner); |
| fn subscribe(self: &LogScanner, bucket_id: i32, start_offset: i64) -> FfiResult; |
| fn subscribe_buckets( |
| self: &LogScanner, |
| subscriptions: Vec<FfiBucketSubscription>, |
| ) -> FfiResult; |
| fn subscribe_partition( |
| self: &LogScanner, |
| partition_id: i64, |
| bucket_id: i32, |
| start_offset: i64, |
| ) -> FfiResult; |
| fn subscribe_partition_buckets( |
| self: &LogScanner, |
| subscriptions: Vec<FfiPartitionBucketSubscription>, |
| ) -> FfiResult; |
| fn unsubscribe(self: &LogScanner, bucket_id: i32) -> FfiResult; |
| fn unsubscribe_partition(self: &LogScanner, partition_id: i64, bucket_id: i32) |
| -> FfiResult; |
| fn poll(self: &LogScanner, timeout_ms: i64) -> Box<ScanResultInner>; |
| fn poll_record_batch(self: &LogScanner, timeout_ms: i64) -> FfiArrowRecordBatchesResult; |
| fn free_arrow_ffi_structures(array_ptr: usize, schema_ptr: usize); |
| |
| // ScanResultInner accessors |
| fn sv_has_error(self: &ScanResultInner) -> bool; |
| fn sv_error_code(self: &ScanResultInner) -> i32; |
| fn sv_error_message(self: &ScanResultInner) -> &str; |
| fn sv_record_count(self: &ScanResultInner) -> usize; |
| fn sv_column_count(self: &ScanResultInner) -> usize; |
| fn sv_column_name(self: &ScanResultInner, field: usize) -> Result<&str>; |
| fn sv_column_type(self: &ScanResultInner, field: usize) -> Result<i32>; |
| fn sv_offset(self: &ScanResultInner, bucket: usize, rec: usize) -> i64; |
| fn sv_timestamp(self: &ScanResultInner, bucket: usize, rec: usize) -> i64; |
| fn sv_change_type(self: &ScanResultInner, bucket: usize, rec: usize) -> i32; |
| fn sv_field_count(self: &ScanResultInner) -> usize; |
| fn sv_is_null( |
| self: &ScanResultInner, |
| bucket: usize, |
| rec: usize, |
| field: usize, |
| ) -> Result<bool>; |
| fn sv_get_bool( |
| self: &ScanResultInner, |
| bucket: usize, |
| rec: usize, |
| field: usize, |
| ) -> Result<bool>; |
| fn sv_get_i32( |
| self: &ScanResultInner, |
| bucket: usize, |
| rec: usize, |
| field: usize, |
| ) -> Result<i32>; |
| fn sv_get_i64( |
| self: &ScanResultInner, |
| bucket: usize, |
| rec: usize, |
| field: usize, |
| ) -> Result<i64>; |
| fn sv_get_f32( |
| self: &ScanResultInner, |
| bucket: usize, |
| rec: usize, |
| field: usize, |
| ) -> Result<f32>; |
| fn sv_get_f64( |
| self: &ScanResultInner, |
| bucket: usize, |
| rec: usize, |
| field: usize, |
| ) -> Result<f64>; |
| fn sv_get_str( |
| self: &ScanResultInner, |
| bucket: usize, |
| rec: usize, |
| field: usize, |
| ) -> Result<&str>; |
| fn sv_get_bytes( |
| self: &ScanResultInner, |
| bucket: usize, |
| rec: usize, |
| field: usize, |
| ) -> Result<&[u8]>; |
| fn sv_get_date_days( |
| self: &ScanResultInner, |
| bucket: usize, |
| rec: usize, |
| field: usize, |
| ) -> Result<i32>; |
| fn sv_get_time_millis( |
| self: &ScanResultInner, |
| bucket: usize, |
| rec: usize, |
| field: usize, |
| ) -> Result<i32>; |
| fn sv_get_ts_millis( |
| self: &ScanResultInner, |
| bucket: usize, |
| rec: usize, |
| field: usize, |
| ) -> Result<i64>; |
| fn sv_get_ts_nanos( |
| self: &ScanResultInner, |
| bucket: usize, |
| rec: usize, |
| field: usize, |
| ) -> Result<i32>; |
| fn sv_is_ts_ltz( |
| self: &ScanResultInner, |
| bucket: usize, |
| rec: usize, |
| field: usize, |
| ) -> Result<bool>; |
| fn sv_get_decimal_str( |
| self: &ScanResultInner, |
| bucket: usize, |
| rec: usize, |
| field: usize, |
| ) -> Result<String>; |
| |
| fn sv_bucket_infos(self: &ScanResultInner) -> &Vec<FfiBucketInfo>; |
| } |
| } |
| |
| pub struct Connection { |
| inner: Arc<fcore::client::FlussConnection>, |
| } |
| |
| pub struct Admin { |
| inner: fcore::client::FlussAdmin, |
| } |
| |
| pub struct Table { |
| connection: Arc<fcore::client::FlussConnection>, |
| metadata: Arc<fcore::client::Metadata>, |
| table_info: fcore::metadata::TableInfo, |
| table_path: fcore::metadata::TablePath, |
| has_pk: bool, |
| } |
| |
| pub struct AppendWriter { |
| inner: fcore::client::AppendWriter, |
| table_info: fcore::metadata::TableInfo, |
| } |
| |
| pub struct WriteResult { |
| inner: Option<fcore::client::WriteResultFuture>, |
| } |
| |
| enum ScannerKind { |
| Record(fcore::client::LogScanner), |
| Batch(fcore::client::RecordBatchLogScanner), |
| } |
| |
| pub struct LogScanner { |
| scanner: ScannerKind, |
| /// Fluss columns matching the projected Arrow fields (1:1 by index). |
| /// For non-projected scanners this is the full table schema columns. |
| projected_columns: Vec<fcore::metadata::Column>, |
| } |
| |
| pub struct UpsertWriter { |
| inner: fcore::client::UpsertWriter, |
| table_info: fcore::metadata::TableInfo, |
| } |
| |
| pub struct Lookuper { |
| inner: fcore::client::Lookuper, |
| table_info: fcore::metadata::TableInfo, |
| } |
| |
| /// Error code for client-side errors that did not originate from the server API protocol. |
| /// Must be non-zero so that CPP `Result::Ok()` (which checks `error_code == 0`) correctly |
| /// detects client-side errors as failures. The value -2 is outside the server API error |
| /// code range (-1 .. 57+), so it will never collide with current or future API codes. |
| const CLIENT_ERROR_CODE: i32 = -2; |
| |
| fn ok_result() -> ffi::FfiResult { |
| ffi::FfiResult { |
| error_code: 0, |
| error_message: String::new(), |
| } |
| } |
| |
| fn err_result(code: i32, msg: String) -> ffi::FfiResult { |
| ffi::FfiResult { |
| error_code: code, |
| error_message: msg, |
| } |
| } |
| |
| /// Create a client-side error result (not from server API). |
| fn client_err(msg: String) -> ffi::FfiResult { |
| err_result(CLIENT_ERROR_CODE, msg) |
| } |
| |
| /// Convert a core Error to FfiResult. |
| /// `FlussAPIError` variants carry the server protocol error code directly. |
| /// All other error kinds are client-side and use CLIENT_ERROR_CODE. |
| fn err_from_core_error(e: &fcore::error::Error) -> ffi::FfiResult { |
| use fcore::error::Error; |
| match e { |
| Error::FlussAPIError { api_error } => err_result(api_error.code, api_error.message.clone()), |
| _ => client_err(e.to_string()), |
| } |
| } |
| |
| fn ok_ptr(ptr: usize) -> ffi::FfiPtrResult { |
| ffi::FfiPtrResult { |
| result: ok_result(), |
| ptr, |
| } |
| } |
| |
| fn client_err_ptr(msg: String) -> ffi::FfiPtrResult { |
| ffi::FfiPtrResult { |
| result: client_err(msg), |
| ptr: 0usize, |
| } |
| } |
| |
| fn err_ptr_from_core(e: &fcore::error::Error) -> ffi::FfiPtrResult { |
| ffi::FfiPtrResult { |
| result: err_from_core_error(e), |
| ptr: 0usize, |
| } |
| } |
| |
| // Connection implementation |
| fn new_connection(config: &ffi::FfiConfig) -> ffi::FfiPtrResult { |
| let assigner_type = match config |
| .writer_bucket_no_key_assigner |
| .parse::<fluss::config::NoKeyAssigner>() |
| { |
| Ok(v) => v, |
| Err(e) => return client_err_ptr(format!("Invalid bucket assigner type: {e}")), |
| }; |
| let config_core = fluss::config::Config { |
| bootstrap_servers: config.bootstrap_servers.to_string(), |
| writer_request_max_size: config.writer_request_max_size, |
| writer_acks: config.writer_acks.to_string(), |
| writer_retries: config.writer_retries, |
| writer_batch_size: config.writer_batch_size, |
| writer_batch_timeout_ms: config.writer_batch_timeout_ms, |
| writer_bucket_no_key_assigner: assigner_type, |
| scanner_remote_log_prefetch_num: config.scanner_remote_log_prefetch_num, |
| remote_file_download_thread_num: config.remote_file_download_thread_num, |
| scanner_remote_log_read_concurrency: config.scanner_remote_log_read_concurrency, |
| scanner_log_max_poll_records: config.scanner_log_max_poll_records, |
| scanner_log_fetch_max_bytes: config.scanner_log_fetch_max_bytes, |
| scanner_log_fetch_min_bytes: config.scanner_log_fetch_min_bytes, |
| scanner_log_fetch_wait_max_time_ms: config.scanner_log_fetch_wait_max_time_ms, |
| scanner_log_fetch_max_bytes_for_bucket: config.scanner_log_fetch_max_bytes_for_bucket, |
| connect_timeout_ms: config.connect_timeout_ms, |
| security_protocol: config.security_protocol.to_string(), |
| security_sasl_mechanism: config.security_sasl_mechanism.to_string(), |
| security_sasl_username: config.security_sasl_username.to_string(), |
| security_sasl_password: config.security_sasl_password.to_string(), |
| }; |
| |
| let conn = RUNTIME.block_on(async { fcore::client::FlussConnection::new(config_core).await }); |
| |
| match conn { |
| Ok(c) => { |
| let ptr = Box::into_raw(Box::new(Connection { inner: Arc::new(c) })); |
| ok_ptr(ptr as usize) |
| } |
| Err(e) => err_ptr_from_core(&e), |
| } |
| } |
| |
| unsafe fn delete_connection(conn: *mut Connection) { |
| if !conn.is_null() { |
| unsafe { |
| drop(Box::from_raw(conn)); |
| } |
| } |
| } |
| |
| impl Connection { |
| fn get_admin(&self) -> ffi::FfiPtrResult { |
| let admin_result = RUNTIME.block_on(async { self.inner.get_admin().await }); |
| |
| match admin_result { |
| Ok(admin) => { |
| let ptr = Box::into_raw(Box::new(Admin { inner: admin })); |
| ok_ptr(ptr as usize) |
| } |
| Err(e) => err_ptr_from_core(&e), |
| } |
| } |
| |
| fn get_table(&self, table_path: &ffi::FfiTablePath) -> ffi::FfiPtrResult { |
| let path = fcore::metadata::TablePath::new( |
| table_path.database_name.clone(), |
| table_path.table_name.clone(), |
| ); |
| |
| let table_result = RUNTIME.block_on(async { self.inner.get_table(&path).await }); |
| |
| match table_result { |
| Ok(t) => { |
| let ptr = Box::into_raw(Box::new(Table { |
| connection: self.inner.clone(), |
| metadata: t.metadata().clone(), |
| table_info: t.get_table_info().clone(), |
| table_path: t.table_path().clone(), |
| has_pk: t.has_primary_key(), |
| })); |
| ok_ptr(ptr as usize) |
| } |
| Err(e) => err_ptr_from_core(&e), |
| } |
| } |
| } |
| |
| // Admin implementation |
| unsafe fn delete_admin(admin: *mut Admin) { |
| if !admin.is_null() { |
| unsafe { |
| drop(Box::from_raw(admin)); |
| } |
| } |
| } |
| |
| impl Admin { |
| fn create_table( |
| &self, |
| table_path: &ffi::FfiTablePath, |
| descriptor: &ffi::FfiTableDescriptor, |
| ignore_if_exists: bool, |
| ) -> ffi::FfiResult { |
| let path = fcore::metadata::TablePath::new( |
| table_path.database_name.clone(), |
| table_path.table_name.clone(), |
| ); |
| |
| let core_descriptor = match types::ffi_descriptor_to_core(descriptor) { |
| Ok(d) => d, |
| Err(e) => return client_err(e.to_string()), |
| }; |
| |
| let result = RUNTIME.block_on(async { |
| self.inner |
| .create_table(&path, &core_descriptor, ignore_if_exists) |
| .await |
| }); |
| |
| match result { |
| Ok(_) => ok_result(), |
| Err(e) => err_from_core_error(&e), |
| } |
| } |
| |
| fn drop_table( |
| &self, |
| table_path: &ffi::FfiTablePath, |
| ignore_if_not_exists: bool, |
| ) -> ffi::FfiResult { |
| let path = fcore::metadata::TablePath::new( |
| table_path.database_name.clone(), |
| table_path.table_name.clone(), |
| ); |
| |
| let result = |
| RUNTIME.block_on(async { self.inner.drop_table(&path, ignore_if_not_exists).await }); |
| |
| match result { |
| Ok(_) => ok_result(), |
| Err(e) => err_from_core_error(&e), |
| } |
| } |
| |
| fn get_table_info(&self, table_path: &ffi::FfiTablePath) -> ffi::FfiTableInfoResult { |
| let path = fcore::metadata::TablePath::new( |
| table_path.database_name.clone(), |
| table_path.table_name.clone(), |
| ); |
| |
| let result = RUNTIME.block_on(async { self.inner.get_table_info(&path).await }); |
| |
| match result { |
| Ok(info) => ffi::FfiTableInfoResult { |
| result: ok_result(), |
| table_info: types::core_table_info_to_ffi(&info), |
| }, |
| Err(e) => ffi::FfiTableInfoResult { |
| result: err_from_core_error(&e), |
| table_info: types::empty_table_info(), |
| }, |
| } |
| } |
| |
| fn get_latest_lake_snapshot( |
| &self, |
| table_path: &ffi::FfiTablePath, |
| ) -> ffi::FfiLakeSnapshotResult { |
| let path = fcore::metadata::TablePath::new( |
| table_path.database_name.clone(), |
| table_path.table_name.clone(), |
| ); |
| |
| let result = RUNTIME.block_on(async { self.inner.get_latest_lake_snapshot(&path).await }); |
| |
| match result { |
| Ok(snapshot) => ffi::FfiLakeSnapshotResult { |
| result: ok_result(), |
| lake_snapshot: types::core_lake_snapshot_to_ffi(&snapshot), |
| }, |
| Err(e) => ffi::FfiLakeSnapshotResult { |
| result: err_from_core_error(&e), |
| lake_snapshot: ffi::FfiLakeSnapshot { |
| snapshot_id: -1, |
| bucket_offsets: vec![], |
| }, |
| }, |
| } |
| } |
| |
| // Helper function for common list offsets functionality |
| fn do_list_offsets( |
| &self, |
| table_path: &ffi::FfiTablePath, |
| partition_name: Option<&str>, |
| bucket_ids: Vec<i32>, |
| offset_query: &ffi::FfiOffsetQuery, |
| ) -> ffi::FfiListOffsetsResult { |
| use fcore::rpc::message::OffsetSpec; |
| |
| let path = fcore::metadata::TablePath::new( |
| table_path.database_name.clone(), |
| table_path.table_name.clone(), |
| ); |
| |
| let offset_spec = match offset_query.offset_type { |
| 0 => OffsetSpec::Earliest, |
| 1 => OffsetSpec::Latest, |
| 2 => OffsetSpec::Timestamp(offset_query.timestamp), |
| _ => { |
| return ffi::FfiListOffsetsResult { |
| result: client_err(format!( |
| "Invalid offset_type: {}", |
| offset_query.offset_type |
| )), |
| bucket_offsets: vec![], |
| }; |
| } |
| }; |
| |
| let result = RUNTIME.block_on(async { |
| if let Some(part_name) = partition_name { |
| self.inner |
| .list_partition_offsets(&path, part_name, &bucket_ids, offset_spec) |
| .await |
| } else { |
| self.inner |
| .list_offsets(&path, &bucket_ids, offset_spec) |
| .await |
| } |
| }); |
| |
| match result { |
| Ok(offsets) => { |
| let bucket_offsets: Vec<ffi::FfiBucketOffsetPair> = offsets |
| .into_iter() |
| .map(|(bucket_id, offset)| ffi::FfiBucketOffsetPair { bucket_id, offset }) |
| .collect(); |
| ffi::FfiListOffsetsResult { |
| result: ok_result(), |
| bucket_offsets, |
| } |
| } |
| Err(e) => ffi::FfiListOffsetsResult { |
| result: err_from_core_error(&e), |
| bucket_offsets: vec![], |
| }, |
| } |
| } |
| |
| fn list_offsets( |
| &self, |
| table_path: &ffi::FfiTablePath, |
| bucket_ids: Vec<i32>, |
| offset_query: &ffi::FfiOffsetQuery, |
| ) -> ffi::FfiListOffsetsResult { |
| self.do_list_offsets(table_path, None, bucket_ids, offset_query) |
| } |
| |
| fn list_partition_offsets( |
| &self, |
| table_path: &ffi::FfiTablePath, |
| partition_name: String, |
| bucket_ids: Vec<i32>, |
| offset_query: &ffi::FfiOffsetQuery, |
| ) -> ffi::FfiListOffsetsResult { |
| self.do_list_offsets(table_path, Some(&partition_name), bucket_ids, offset_query) |
| } |
| |
| fn list_partition_infos( |
| &self, |
| table_path: &ffi::FfiTablePath, |
| ) -> ffi::FfiListPartitionInfosResult { |
| self.do_list_partition_infos(table_path, None) |
| } |
| |
| fn list_partition_infos_with_spec( |
| &self, |
| table_path: &ffi::FfiTablePath, |
| partition_spec: Vec<ffi::FfiPartitionKeyValue>, |
| ) -> ffi::FfiListPartitionInfosResult { |
| let spec_map: std::collections::HashMap<String, String> = partition_spec |
| .into_iter() |
| .map(|kv| (kv.key, kv.value)) |
| .collect(); |
| let spec = fcore::metadata::PartitionSpec::new(spec_map); |
| self.do_list_partition_infos(table_path, Some(&spec)) |
| } |
| fn create_partition( |
| &self, |
| table_path: &ffi::FfiTablePath, |
| partition_spec: Vec<ffi::FfiPartitionKeyValue>, |
| ignore_if_exists: bool, |
| ) -> ffi::FfiResult { |
| let path = fcore::metadata::TablePath::new( |
| table_path.database_name.clone(), |
| table_path.table_name.clone(), |
| ); |
| let spec_map: std::collections::HashMap<String, String> = partition_spec |
| .into_iter() |
| .map(|kv| (kv.key, kv.value)) |
| .collect(); |
| let partition_spec = fcore::metadata::PartitionSpec::new(spec_map); |
| |
| let result = RUNTIME.block_on(async { |
| self.inner |
| .create_partition(&path, &partition_spec, ignore_if_exists) |
| .await |
| }); |
| |
| match result { |
| Ok(_) => ok_result(), |
| Err(e) => err_from_core_error(&e), |
| } |
| } |
| |
| fn drop_partition( |
| &self, |
| table_path: &ffi::FfiTablePath, |
| partition_spec: Vec<ffi::FfiPartitionKeyValue>, |
| ignore_if_not_exists: bool, |
| ) -> ffi::FfiResult { |
| let path = fcore::metadata::TablePath::new( |
| table_path.database_name.clone(), |
| table_path.table_name.clone(), |
| ); |
| let spec_map: std::collections::HashMap<String, String> = partition_spec |
| .into_iter() |
| .map(|kv| (kv.key, kv.value)) |
| .collect(); |
| let partition_spec = fcore::metadata::PartitionSpec::new(spec_map); |
| |
| let result = RUNTIME.block_on(async { |
| self.inner |
| .drop_partition(&path, &partition_spec, ignore_if_not_exists) |
| .await |
| }); |
| |
| match result { |
| Ok(_) => ok_result(), |
| Err(e) => err_from_core_error(&e), |
| } |
| } |
| |
| fn create_database( |
| &self, |
| database_name: &str, |
| descriptor: &ffi::FfiDatabaseDescriptor, |
| ignore_if_exists: bool, |
| ) -> ffi::FfiResult { |
| let descriptor_opt = types::ffi_database_descriptor_to_core(descriptor); |
| |
| let result = RUNTIME.block_on(async { |
| self.inner |
| .create_database(database_name, descriptor_opt.as_ref(), ignore_if_exists) |
| .await |
| }); |
| |
| match result { |
| Ok(_) => ok_result(), |
| Err(e) => err_from_core_error(&e), |
| } |
| } |
| |
| fn drop_database( |
| &self, |
| database_name: &str, |
| ignore_if_not_exists: bool, |
| cascade: bool, |
| ) -> ffi::FfiResult { |
| let result = RUNTIME.block_on(async { |
| self.inner |
| .drop_database(database_name, ignore_if_not_exists, cascade) |
| .await |
| }); |
| |
| match result { |
| Ok(_) => ok_result(), |
| Err(e) => err_from_core_error(&e), |
| } |
| } |
| |
| fn list_databases(&self) -> ffi::FfiListDatabasesResult { |
| let result = RUNTIME.block_on(async { self.inner.list_databases().await }); |
| |
| match result { |
| Ok(names) => ffi::FfiListDatabasesResult { |
| result: ok_result(), |
| database_names: names, |
| }, |
| Err(e) => ffi::FfiListDatabasesResult { |
| result: err_from_core_error(&e), |
| database_names: vec![], |
| }, |
| } |
| } |
| |
| fn database_exists(&self, database_name: &str) -> ffi::FfiBoolResult { |
| let result = RUNTIME.block_on(async { self.inner.database_exists(database_name).await }); |
| |
| match result { |
| Ok(exists) => ffi::FfiBoolResult { |
| result: ok_result(), |
| value: exists, |
| }, |
| Err(e) => ffi::FfiBoolResult { |
| result: err_from_core_error(&e), |
| value: false, |
| }, |
| } |
| } |
| |
| fn get_database_info(&self, database_name: &str) -> ffi::FfiDatabaseInfoResult { |
| let result = RUNTIME.block_on(async { self.inner.get_database_info(database_name).await }); |
| |
| match result { |
| Ok(info) => ffi::FfiDatabaseInfoResult { |
| result: ok_result(), |
| database_info: types::core_database_info_to_ffi(&info), |
| }, |
| Err(e) => ffi::FfiDatabaseInfoResult { |
| result: err_from_core_error(&e), |
| database_info: ffi::FfiDatabaseInfo { |
| database_name: String::new(), |
| comment: String::new(), |
| properties: vec![], |
| created_time: 0, |
| modified_time: 0, |
| }, |
| }, |
| } |
| } |
| |
| fn list_tables(&self, database_name: &str) -> ffi::FfiListTablesResult { |
| let result = RUNTIME.block_on(async { self.inner.list_tables(database_name).await }); |
| |
| match result { |
| Ok(names) => ffi::FfiListTablesResult { |
| result: ok_result(), |
| table_names: names, |
| }, |
| Err(e) => ffi::FfiListTablesResult { |
| result: err_from_core_error(&e), |
| table_names: vec![], |
| }, |
| } |
| } |
| |
| fn table_exists(&self, table_path: &ffi::FfiTablePath) -> ffi::FfiBoolResult { |
| let path = fcore::metadata::TablePath::new( |
| table_path.database_name.clone(), |
| table_path.table_name.clone(), |
| ); |
| |
| let result = RUNTIME.block_on(async { self.inner.table_exists(&path).await }); |
| |
| match result { |
| Ok(exists) => ffi::FfiBoolResult { |
| result: ok_result(), |
| value: exists, |
| }, |
| Err(e) => ffi::FfiBoolResult { |
| result: err_from_core_error(&e), |
| value: false, |
| }, |
| } |
| } |
| |
| fn do_list_partition_infos( |
| &self, |
| table_path: &ffi::FfiTablePath, |
| partial_partition_spec: Option<&fcore::metadata::PartitionSpec>, |
| ) -> ffi::FfiListPartitionInfosResult { |
| let path = fcore::metadata::TablePath::new( |
| table_path.database_name.clone(), |
| table_path.table_name.clone(), |
| ); |
| let result = RUNTIME.block_on(async { |
| self.inner |
| .list_partition_infos_with_spec(&path, partial_partition_spec) |
| .await |
| }); |
| match result { |
| Ok(infos) => { |
| let partition_infos: Vec<ffi::FfiPartitionInfo> = infos |
| .into_iter() |
| .map(|info| ffi::FfiPartitionInfo { |
| partition_id: info.get_partition_id(), |
| partition_name: info.get_partition_name(), |
| }) |
| .collect(); |
| ffi::FfiListPartitionInfosResult { |
| result: ok_result(), |
| partition_infos, |
| } |
| } |
| Err(e) => ffi::FfiListPartitionInfosResult { |
| result: err_from_core_error(&e), |
| partition_infos: vec![], |
| }, |
| } |
| } |
| |
| fn get_server_nodes(&self) -> ffi::FfiServerNodesResult { |
| let result = RUNTIME.block_on(async { self.inner.get_server_nodes().await }); |
| |
| match result { |
| Ok(nodes) => { |
| let server_nodes: Vec<ffi::FfiServerNode> = nodes |
| .into_iter() |
| .map(|node| ffi::FfiServerNode { |
| node_id: node.id(), |
| host: node.host().to_string(), |
| port: node.port(), |
| server_type: node.server_type().to_string(), |
| uid: node.uid().to_string(), |
| }) |
| .collect(); |
| ffi::FfiServerNodesResult { |
| result: ok_result(), |
| server_nodes, |
| } |
| } |
| Err(e) => ffi::FfiServerNodesResult { |
| result: err_from_core_error(&e), |
| server_nodes: vec![], |
| }, |
| } |
| } |
| } |
| |
| // Table implementation |
| unsafe fn delete_table(table: *mut Table) { |
| if !table.is_null() { |
| unsafe { |
| drop(Box::from_raw(table)); |
| } |
| } |
| } |
| |
| impl Table { |
| fn fluss_table(&self) -> fcore::client::FlussTable<'_> { |
| fcore::client::FlussTable::new( |
| &self.connection, |
| self.metadata.clone(), |
| self.table_info.clone(), |
| ) |
| } |
| |
| fn resolve_projected_columns( |
| &self, |
| indices: &[usize], |
| ) -> Result<Vec<fcore::metadata::Column>, String> { |
| let all_columns = self.table_info.get_schema().columns(); |
| indices |
| .iter() |
| .map(|&i| { |
| all_columns.get(i).cloned().ok_or_else(|| { |
| format!( |
| "Invalid column index {i}: schema has {} columns", |
| all_columns.len() |
| ) |
| }) |
| }) |
| .collect() |
| } |
| |
| fn new_append_writer(&self) -> ffi::FfiPtrResult { |
| let _enter = RUNTIME.enter(); |
| |
| let table_append = match self.fluss_table().new_append() { |
| Ok(a) => a, |
| Err(e) => return err_ptr_from_core(&e), |
| }; |
| |
| let writer = match table_append.create_writer() { |
| Ok(w) => w, |
| Err(e) => return err_ptr_from_core(&e), |
| }; |
| |
| let ptr = Box::into_raw(Box::new(AppendWriter { |
| inner: writer, |
| table_info: self.table_info.clone(), |
| })); |
| ok_ptr(ptr as usize) |
| } |
| |
| fn create_scanner(&self, column_indices: Vec<usize>, batch: bool) -> ffi::FfiPtrResult { |
| RUNTIME.block_on(async { |
| let fluss_table = self.fluss_table(); |
| let scan = fluss_table.new_scan(); |
| |
| let (projected_columns, scan) = if column_indices.is_empty() { |
| (self.table_info.get_schema().columns().to_vec(), scan) |
| } else { |
| let cols = match self.resolve_projected_columns(&column_indices) { |
| Ok(c) => c, |
| Err(e) => return client_err_ptr(e), |
| }; |
| let scan = match scan.project(&column_indices) { |
| Ok(s) => s, |
| Err(e) => return err_ptr_from_core(&e), |
| }; |
| (cols, scan) |
| }; |
| |
| let scanner = if batch { |
| match scan.create_record_batch_log_scanner() { |
| Ok(s) => ScannerKind::Batch(s), |
| Err(e) => return err_ptr_from_core(&e), |
| } |
| } else { |
| match scan.create_log_scanner() { |
| Ok(s) => ScannerKind::Record(s), |
| Err(e) => return err_ptr_from_core(&e), |
| } |
| }; |
| |
| let ptr = Box::into_raw(Box::new(LogScanner { |
| scanner, |
| projected_columns, |
| })); |
| ok_ptr(ptr as usize) |
| }) |
| } |
| |
| fn get_table_info_from_table(&self) -> ffi::FfiTableInfo { |
| types::core_table_info_to_ffi(&self.table_info) |
| } |
| |
| fn get_table_path(&self) -> ffi::FfiTablePath { |
| ffi::FfiTablePath { |
| database_name: self.table_path.database().to_string(), |
| table_name: self.table_path.table().to_string(), |
| } |
| } |
| |
| fn has_primary_key(&self) -> bool { |
| self.has_pk |
| } |
| |
| fn create_upsert_writer(&self, column_indices: Vec<usize>) -> ffi::FfiPtrResult { |
| let _enter = RUNTIME.enter(); |
| |
| let table_upsert = match self.fluss_table().new_upsert() { |
| Ok(u) => u, |
| Err(e) => return err_ptr_from_core(&e), |
| }; |
| |
| let table_upsert = if column_indices.is_empty() { |
| table_upsert |
| } else { |
| match table_upsert.partial_update(Some(column_indices)) { |
| Ok(u) => u, |
| Err(e) => return err_ptr_from_core(&e), |
| } |
| }; |
| |
| let writer = match table_upsert.create_writer() { |
| Ok(w) => w, |
| Err(e) => return err_ptr_from_core(&e), |
| }; |
| |
| let ptr = Box::into_raw(Box::new(UpsertWriter { |
| inner: writer, |
| table_info: self.table_info.clone(), |
| })); |
| ok_ptr(ptr as usize) |
| } |
| |
| fn new_lookuper(&self) -> ffi::FfiPtrResult { |
| let _enter = RUNTIME.enter(); |
| |
| let table_lookup = match self.fluss_table().new_lookup() { |
| Ok(l) => l, |
| Err(e) => return err_ptr_from_core(&e), |
| }; |
| |
| let lookuper = match table_lookup.create_lookuper() { |
| Ok(l) => l, |
| Err(e) => return err_ptr_from_core(&e), |
| }; |
| |
| let ptr = Box::into_raw(Box::new(Lookuper { |
| inner: lookuper, |
| table_info: self.table_info.clone(), |
| })); |
| ok_ptr(ptr as usize) |
| } |
| } |
| |
| // AppendWriter implementation |
| unsafe fn delete_append_writer(writer: *mut AppendWriter) { |
| if !writer.is_null() { |
| unsafe { |
| drop(Box::from_raw(writer)); |
| } |
| } |
| } |
| |
| impl AppendWriter { |
| fn append(&mut self, row: &GenericRowInner) -> ffi::FfiPtrResult { |
| let schema = self.table_info.get_schema(); |
| let generic_row = match types::resolve_row_types(&row.row, Some(schema)) { |
| Ok(r) => r, |
| Err(e) => return client_err_ptr(e.to_string()), |
| }; |
| |
| let result_future = match self.inner.append(&generic_row) { |
| Ok(f) => f, |
| Err(e) => return err_ptr_from_core(&e), |
| }; |
| |
| let ptr = Box::into_raw(Box::new(WriteResult { |
| inner: Some(result_future), |
| })); |
| ok_ptr(ptr as usize) |
| } |
| |
| fn append_arrow_batch(&mut self, array_ptr: usize, schema_ptr: usize) -> ffi::FfiPtrResult { |
| use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema}; |
| |
| // Safety: C++ allocates these via `new ArrowArray/ArrowSchema` after a |
| // successful `ExportRecordBatch`, so both pointers are valid heap |
| // allocations that we take ownership of here. |
| let ffi_array = unsafe { *Box::from_raw(array_ptr as *mut FFI_ArrowArray) }; |
| let ffi_schema = unsafe { Box::from_raw(schema_ptr as *mut FFI_ArrowSchema) }; |
| |
| // Safety: `from_ffi` requires that the array and schema conform to the |
| // Arrow C Data Interface, which is guaranteed by C++'s ExportRecordBatch. |
| let array_data = match unsafe { arrow::ffi::from_ffi(ffi_array, &ffi_schema) } { |
| Ok(d) => d, |
| Err(e) => return client_err_ptr(format!("Failed to import Arrow batch: {e}")), |
| }; |
| // ffi_array is consumed by from_ffi; ffi_schema is dropped here (Box goes out of scope) |
| |
| // Reconstruct RecordBatch from the imported StructArray data |
| let struct_array = arrow::array::StructArray::from(array_data); |
| let batch = arrow::record_batch::RecordBatch::from(struct_array); |
| |
| let result_future = match self.inner.append_arrow_batch(batch) { |
| Ok(f) => f, |
| Err(e) => return err_ptr_from_core(&e), |
| }; |
| |
| let ptr = Box::into_raw(Box::new(WriteResult { |
| inner: Some(result_future), |
| })); |
| ok_ptr(ptr as usize) |
| } |
| |
| fn flush(&mut self) -> ffi::FfiResult { |
| let result = RUNTIME.block_on(async { self.inner.flush().await }); |
| |
| match result { |
| Ok(_) => ok_result(), |
| Err(e) => err_from_core_error(&e), |
| } |
| } |
| } |
| |
| unsafe fn delete_write_result(wr: *mut WriteResult) { |
| if !wr.is_null() { |
| unsafe { |
| drop(Box::from_raw(wr)); |
| } |
| } |
| } |
| |
| impl WriteResult { |
| fn wait(&mut self) -> ffi::FfiResult { |
| if let Some(future) = self.inner.take() { |
| let result = RUNTIME.block_on(future); |
| match result { |
| Ok(_) => ok_result(), |
| Err(e) => err_from_core_error(&e), |
| } |
| } else { |
| client_err("WriteResult already consumed".to_string()) |
| } |
| } |
| } |
| |
| // UpsertWriter implementation |
| unsafe fn delete_upsert_writer(writer: *mut UpsertWriter) { |
| if !writer.is_null() { |
| unsafe { |
| drop(Box::from_raw(writer)); |
| } |
| } |
| } |
| |
| impl UpsertWriter { |
| /// Pad row with Null to full schema width. |
| /// This allows callers to only set the fields they care about. |
| fn pad_row<'a>(&self, mut row: fcore::row::GenericRow<'a>) -> fcore::row::GenericRow<'a> { |
| let num_columns = self.table_info.get_schema().columns().len(); |
| if row.values.len() < num_columns { |
| row.values.resize(num_columns, fcore::row::Datum::Null); |
| } |
| row |
| } |
| |
| fn upsert(&mut self, row: &GenericRowInner) -> ffi::FfiPtrResult { |
| let schema = self.table_info.get_schema(); |
| let generic_row = match types::resolve_row_types(&row.row, Some(schema)) { |
| Ok(r) => r, |
| Err(e) => return client_err_ptr(e.to_string()), |
| }; |
| let generic_row = self.pad_row(generic_row); |
| |
| let result_future = match self.inner.upsert(&generic_row) { |
| Ok(f) => f, |
| Err(e) => return err_ptr_from_core(&e), |
| }; |
| |
| let ptr = Box::into_raw(Box::new(WriteResult { |
| inner: Some(result_future), |
| })); |
| ok_ptr(ptr as usize) |
| } |
| |
| fn delete_row(&mut self, row: &GenericRowInner) -> ffi::FfiPtrResult { |
| let schema = self.table_info.get_schema(); |
| let generic_row = match types::resolve_row_types(&row.row, Some(schema)) { |
| Ok(r) => r, |
| Err(e) => return client_err_ptr(e.to_string()), |
| }; |
| let generic_row = self.pad_row(generic_row); |
| |
| let result_future = match self.inner.delete(&generic_row) { |
| Ok(f) => f, |
| Err(e) => return err_ptr_from_core(&e), |
| }; |
| |
| let ptr = Box::into_raw(Box::new(WriteResult { |
| inner: Some(result_future), |
| })); |
| ok_ptr(ptr as usize) |
| } |
| |
| fn upsert_flush(&mut self) -> ffi::FfiResult { |
| let result = RUNTIME.block_on(async { self.inner.flush().await }); |
| |
| match result { |
| Ok(_) => ok_result(), |
| Err(e) => err_from_core_error(&e), |
| } |
| } |
| } |
| |
| // Lookuper implementation |
| unsafe fn delete_lookuper(lookuper: *mut Lookuper) { |
| if !lookuper.is_null() { |
| unsafe { |
| drop(Box::from_raw(lookuper)); |
| } |
| } |
| } |
| |
| impl Lookuper { |
| /// Build a dense PK-only row from a (possibly sparse) input row. |
| /// The user may set PK values at their full schema positions (e.g. [0, 2]) |
| /// via name-based Set(). We compact them into [0, 1, …] to match |
| /// the lookup_row_type the core KeyEncoder expects. |
| fn dense_pk_row<'a>(&self, mut row: fcore::row::GenericRow<'a>) -> fcore::row::GenericRow<'a> { |
| let pk_indices = self.table_info.get_schema().primary_key_indexes(); |
| let mut dense = fcore::row::GenericRow::new(pk_indices.len()); |
| for (dense_idx, &schema_idx) in pk_indices.iter().enumerate() { |
| if schema_idx < row.values.len() { |
| dense.values[dense_idx] = |
| std::mem::replace(&mut row.values[schema_idx], fcore::row::Datum::Null); |
| } |
| } |
| dense |
| } |
| |
| fn lookup(&mut self, pk_row: &GenericRowInner) -> Box<LookupResultInner> { |
| let schema = self.table_info.get_schema(); |
| let generic_row = match types::resolve_row_types(&pk_row.row, Some(schema)) { |
| Ok(r) => self.dense_pk_row(r), |
| Err(e) => { |
| return Box::new(LookupResultInner::from_error( |
| CLIENT_ERROR_CODE, |
| e.to_string(), |
| )); |
| } |
| }; |
| |
| let lookup_result = match RUNTIME.block_on(self.inner.lookup(&generic_row)) { |
| Ok(r) => r, |
| Err(e) => { |
| let ffi_err = err_from_core_error(&e); |
| return Box::new(LookupResultInner::from_error( |
| ffi_err.error_code, |
| ffi_err.error_message, |
| )); |
| } |
| }; |
| |
| let columns = self.table_info.get_schema().columns().to_vec(); |
| match lookup_result.get_single_row() { |
| Ok(Some(row)) => match types::compacted_row_to_owned(&row, &self.table_info) { |
| Ok(owned_row) => Box::new(LookupResultInner { |
| error: None, |
| found: true, |
| row: Some(owned_row), |
| columns, |
| }), |
| Err(e) => Box::new(LookupResultInner::from_error( |
| CLIENT_ERROR_CODE, |
| e.to_string(), |
| )), |
| }, |
| Ok(None) => Box::new(LookupResultInner { |
| error: None, |
| found: false, |
| row: None, |
| columns, |
| }), |
| Err(e) => { |
| let ffi_err = err_from_core_error(&e); |
| Box::new(LookupResultInner::from_error( |
| ffi_err.error_code, |
| ffi_err.error_message, |
| )) |
| } |
| } |
| } |
| } |
| |
| // LogScanner implementation |
| unsafe fn delete_log_scanner(scanner: *mut LogScanner) { |
| if !scanner.is_null() { |
| unsafe { |
| drop(Box::from_raw(scanner)); |
| } |
| } |
| } |
| |
| // Helper function to free the Arrow FFI structures separately (for use after ImportRecordBatch) |
| pub extern "C" fn free_arrow_ffi_structures(array_ptr: usize, schema_ptr: usize) { |
| use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema}; |
| if array_ptr != 0 { |
| let _array = unsafe { Box::from_raw(array_ptr as *mut FFI_ArrowArray) }; |
| } |
| if schema_ptr != 0 { |
| let _schema = unsafe { Box::from_raw(schema_ptr as *mut FFI_ArrowSchema) }; |
| } |
| } |
| |
| /// Dispatch a method call to whichever scanner variant is active. |
| /// Both LogScanner and RecordBatchLogScanner share the same subscribe/unsubscribe interface. |
| macro_rules! dispatch_scanner { |
| ($self:expr, $method:ident($($arg:expr),*)) => { |
| match RUNTIME.block_on(async { |
| match &$self.scanner { |
| ScannerKind::Record(s) => s.$method($($arg),*).await, |
| ScannerKind::Batch(s) => s.$method($($arg),*).await, |
| } |
| }) { |
| Ok(_) => ok_result(), |
| Err(e) => err_from_core_error(&e), |
| } |
| }; |
| } |
| |
| impl LogScanner { |
| fn subscribe(&self, bucket_id: i32, start_offset: i64) -> ffi::FfiResult { |
| dispatch_scanner!(self, subscribe(bucket_id, start_offset)) |
| } |
| |
| fn subscribe_buckets(&self, subscriptions: Vec<ffi::FfiBucketSubscription>) -> ffi::FfiResult { |
| use std::collections::HashMap; |
| let bucket_offsets: HashMap<i32, i64> = subscriptions |
| .into_iter() |
| .map(|s| (s.bucket_id, s.offset)) |
| .collect(); |
| dispatch_scanner!(self, subscribe_buckets(&bucket_offsets)) |
| } |
| |
| fn subscribe_partition( |
| &self, |
| partition_id: PartitionId, |
| bucket_id: i32, |
| start_offset: i64, |
| ) -> ffi::FfiResult { |
| dispatch_scanner!( |
| self, |
| subscribe_partition(partition_id, bucket_id, start_offset) |
| ) |
| } |
| |
| fn subscribe_partition_buckets( |
| &self, |
| subscriptions: Vec<ffi::FfiPartitionBucketSubscription>, |
| ) -> ffi::FfiResult { |
| use std::collections::HashMap; |
| let offsets: HashMap<(PartitionId, i32), i64> = subscriptions |
| .into_iter() |
| .map(|s| ((s.partition_id, s.bucket_id), s.offset)) |
| .collect(); |
| dispatch_scanner!(self, subscribe_partition_buckets(&offsets)) |
| } |
| |
| fn unsubscribe(&self, bucket_id: i32) -> ffi::FfiResult { |
| dispatch_scanner!(self, unsubscribe(bucket_id)) |
| } |
| |
| fn unsubscribe_partition(&self, partition_id: PartitionId, bucket_id: i32) -> ffi::FfiResult { |
| dispatch_scanner!(self, unsubscribe_partition(partition_id, bucket_id)) |
| } |
| |
| fn poll(&self, timeout_ms: i64) -> Box<ScanResultInner> { |
| let ScannerKind::Record(ref inner) = self.scanner else { |
| return Box::new(ScanResultInner::from_error( |
| CLIENT_ERROR_CODE, |
| "Record-based scanner not available".to_string(), |
| )); |
| }; |
| |
| let timeout = Duration::from_millis(timeout_ms.max(0) as u64); |
| let result = RUNTIME.block_on(async { inner.poll(timeout).await }); |
| |
| match result { |
| Ok(records) => { |
| let columns = self.projected_columns.clone(); |
| let mut total_count = 0usize; |
| let mut buckets = Vec::new(); |
| let mut bucket_infos = Vec::new(); |
| for (table_bucket, bucket_records) in records.into_records_by_buckets() { |
| let count = bucket_records.len(); |
| total_count += count; |
| bucket_infos.push(ffi::FfiBucketInfo { |
| table_id: table_bucket.table_id(), |
| bucket_id: table_bucket.bucket_id(), |
| has_partition_id: table_bucket.partition_id().is_some(), |
| partition_id: table_bucket.partition_id().unwrap_or(0), |
| record_count: count, |
| }); |
| buckets.push((table_bucket, bucket_records)); |
| } |
| Box::new(ScanResultInner { |
| error: None, |
| buckets, |
| columns, |
| bucket_infos, |
| total_count, |
| }) |
| } |
| Err(e) => { |
| let ffi_err = err_from_core_error(&e); |
| Box::new(ScanResultInner::from_error( |
| ffi_err.error_code, |
| ffi_err.error_message, |
| )) |
| } |
| } |
| } |
| |
| fn poll_record_batch(&self, timeout_ms: i64) -> ffi::FfiArrowRecordBatchesResult { |
| let ScannerKind::Batch(ref inner_batch) = self.scanner else { |
| return ffi::FfiArrowRecordBatchesResult { |
| result: client_err("Batch-based scanner not available".to_string()), |
| arrow_batches: ffi::FfiArrowRecordBatches { batches: vec![] }, |
| }; |
| }; |
| |
| let timeout = Duration::from_millis(timeout_ms.max(0) as u64); |
| let result = RUNTIME.block_on(async { inner_batch.poll(timeout).await }); |
| |
| match result { |
| Ok(batches) => match types::core_scan_batches_to_ffi(&batches) { |
| Ok(arrow_batches) => ffi::FfiArrowRecordBatchesResult { |
| result: ok_result(), |
| arrow_batches, |
| }, |
| Err(e) => ffi::FfiArrowRecordBatchesResult { |
| result: client_err(e), |
| arrow_batches: ffi::FfiArrowRecordBatches { batches: vec![] }, |
| }, |
| }, |
| Err(e) => ffi::FfiArrowRecordBatchesResult { |
| result: err_from_core_error(&e), |
| arrow_batches: ffi::FfiArrowRecordBatches { batches: vec![] }, |
| }, |
| } |
| } |
| } |
| |
| // ============================================================================ |
| // Opaque types: GenericRowInner (write path) |
| // ============================================================================ |
| |
| pub struct GenericRowInner { |
| row: fcore::row::GenericRow<'static>, |
| } |
| |
| fn new_generic_row(field_count: usize) -> Box<GenericRowInner> { |
| Box::new(GenericRowInner { |
| row: fcore::row::GenericRow::new(field_count), |
| }) |
| } |
| |
| impl GenericRowInner { |
| fn gr_reset(&mut self) { |
| let len = self.row.values.len(); |
| self.row = fcore::row::GenericRow::new(len); |
| } |
| |
| fn gr_set_null(&mut self, idx: usize) { |
| self.ensure_size(idx); |
| self.row.set_field(idx, fcore::row::Datum::Null); |
| } |
| |
| fn gr_set_bool(&mut self, idx: usize, val: bool) { |
| self.ensure_size(idx); |
| self.row.set_field(idx, fcore::row::Datum::Bool(val)); |
| } |
| |
| fn gr_set_i32(&mut self, idx: usize, val: i32) { |
| self.ensure_size(idx); |
| self.row.set_field(idx, fcore::row::Datum::Int32(val)); |
| } |
| |
| fn gr_set_i64(&mut self, idx: usize, val: i64) { |
| self.ensure_size(idx); |
| self.row.set_field(idx, fcore::row::Datum::Int64(val)); |
| } |
| |
| fn gr_set_f32(&mut self, idx: usize, val: f32) { |
| self.ensure_size(idx); |
| self.row |
| .set_field(idx, fcore::row::Datum::Float32(val.into())); |
| } |
| |
| fn gr_set_f64(&mut self, idx: usize, val: f64) { |
| self.ensure_size(idx); |
| self.row |
| .set_field(idx, fcore::row::Datum::Float64(val.into())); |
| } |
| |
| fn gr_set_str(&mut self, idx: usize, val: &str) { |
| self.ensure_size(idx); |
| self.row.set_field( |
| idx, |
| fcore::row::Datum::String(std::borrow::Cow::Owned(val.to_string())), |
| ); |
| } |
| |
| fn gr_set_bytes(&mut self, idx: usize, val: &[u8]) { |
| self.ensure_size(idx); |
| self.row.set_field( |
| idx, |
| fcore::row::Datum::Blob(std::borrow::Cow::Owned(val.to_vec())), |
| ); |
| } |
| |
| fn gr_set_date(&mut self, idx: usize, days: i32) { |
| self.ensure_size(idx); |
| self.row |
| .set_field(idx, fcore::row::Datum::Date(fcore::row::Date::new(days))); |
| } |
| |
| fn gr_set_time(&mut self, idx: usize, millis: i32) { |
| self.ensure_size(idx); |
| self.row |
| .set_field(idx, fcore::row::Datum::Time(fcore::row::Time::new(millis))); |
| } |
| |
| fn gr_set_ts_ntz(&mut self, idx: usize, millis: i64, nanos: i32) { |
| self.ensure_size(idx); |
| // Use from_millis_nanos, falling back to millis-only on error |
| let ts = fcore::row::TimestampNtz::from_millis_nanos(millis, nanos) |
| .unwrap_or_else(|_| fcore::row::TimestampNtz::new(millis)); |
| self.row.set_field(idx, fcore::row::Datum::TimestampNtz(ts)); |
| } |
| |
| fn gr_set_ts_ltz(&mut self, idx: usize, millis: i64, nanos: i32) { |
| self.ensure_size(idx); |
| let ts = fcore::row::TimestampLtz::from_millis_nanos(millis, nanos) |
| .unwrap_or_else(|_| fcore::row::TimestampLtz::new(millis)); |
| self.row.set_field(idx, fcore::row::Datum::TimestampLtz(ts)); |
| } |
| |
| fn gr_set_decimal_str(&mut self, idx: usize, val: &str) { |
| self.ensure_size(idx); |
| // Store as string; resolve_row_types() will parse and validate against schema |
| self.row.set_field( |
| idx, |
| fcore::row::Datum::String(std::borrow::Cow::Owned(val.to_string())), |
| ); |
| } |
| |
| fn ensure_size(&mut self, idx: usize) { |
| if self.row.values.len() <= idx { |
| self.row.values.resize(idx + 1, fcore::row::Datum::Null); |
| } |
| } |
| } |
| |
| // ============================================================================ |
| // Shared row-reading helpers (used by both ScanResultInner and LookupResultInner) |
| // ============================================================================ |
| |
| mod row_reader { |
| use fcore::row::InternalRow; |
| use fluss as fcore; |
| |
| use crate::types; |
| |
| /// Get column at `field`, or error if out of bounds. |
| fn get_column( |
| columns: &[fcore::metadata::Column], |
| field: usize, |
| ) -> Result<&fcore::metadata::Column, String> { |
| columns.get(field).ok_or_else(|| { |
| format!( |
| "field index {field} out of range ({} columns)", |
| columns.len() |
| ) |
| }) |
| } |
| |
| /// Validate bounds, null, and type compatibility in a single pass. |
| /// Returns the data type on success for callers that need to dispatch on it. |
| fn validate<'a>( |
| row: &dyn InternalRow, |
| columns: &'a [fcore::metadata::Column], |
| field: usize, |
| getter: &str, |
| allowed: impl FnOnce(&fcore::metadata::DataType) -> bool, |
| ) -> Result<&'a fcore::metadata::DataType, String> { |
| let col = get_column(columns, field)?; |
| if row.is_null_at(field).map_err(|e| e.to_string())? { |
| return Err(format!("field {field} is null")); |
| } |
| let dt = col.data_type(); |
| if !allowed(dt) { |
| return Err(format!( |
| "{getter}: column {field} has incompatible type {dt}" |
| )); |
| } |
| Ok(dt) |
| } |
| |
| pub fn column_type(columns: &[fcore::metadata::Column], field: usize) -> Result<i32, String> { |
| Ok(types::core_data_type_to_ffi( |
| get_column(columns, field)?.data_type(), |
| )) |
| } |
| |
| pub fn column_name(columns: &[fcore::metadata::Column], field: usize) -> Result<&str, String> { |
| Ok(get_column(columns, field)?.name()) |
| } |
| |
| pub fn is_null( |
| row: &dyn InternalRow, |
| columns: &[fcore::metadata::Column], |
| field: usize, |
| ) -> Result<bool, String> { |
| get_column(columns, field)?; |
| row.is_null_at(field).map_err(|e| e.to_string()) |
| } |
| |
| pub fn get_bool( |
| row: &dyn InternalRow, |
| columns: &[fcore::metadata::Column], |
| field: usize, |
| ) -> Result<bool, String> { |
| validate(row, columns, field, "get_bool", |dt| { |
| matches!(dt, fcore::metadata::DataType::Boolean(_)) |
| })?; |
| row.get_boolean(field).map_err(|e| e.to_string()) |
| } |
| |
| pub fn get_i32( |
| row: &dyn InternalRow, |
| columns: &[fcore::metadata::Column], |
| field: usize, |
| ) -> Result<i32, String> { |
| let dt = validate(row, columns, field, "get_i32", |dt| { |
| matches!( |
| dt, |
| fcore::metadata::DataType::TinyInt(_) |
| | fcore::metadata::DataType::SmallInt(_) |
| | fcore::metadata::DataType::Int(_) |
| ) |
| })?; |
| match dt { |
| fcore::metadata::DataType::TinyInt(_) => row |
| .get_byte(field) |
| .map(|v| v as i32) |
| .map_err(|e| e.to_string()), |
| fcore::metadata::DataType::SmallInt(_) => row |
| .get_short(field) |
| .map(|v| v as i32) |
| .map_err(|e| e.to_string()), |
| _ => row.get_int(field).map_err(|e| e.to_string()), |
| } |
| } |
| |
| pub fn get_i64( |
| row: &dyn InternalRow, |
| columns: &[fcore::metadata::Column], |
| field: usize, |
| ) -> Result<i64, String> { |
| validate(row, columns, field, "get_i64", |dt| { |
| matches!(dt, fcore::metadata::DataType::BigInt(_)) |
| })?; |
| row.get_long(field).map_err(|e| e.to_string()) |
| } |
| |
| pub fn get_f32( |
| row: &dyn InternalRow, |
| columns: &[fcore::metadata::Column], |
| field: usize, |
| ) -> Result<f32, String> { |
| validate(row, columns, field, "get_f32", |dt| { |
| matches!(dt, fcore::metadata::DataType::Float(_)) |
| })?; |
| row.get_float(field).map_err(|e| e.to_string()) |
| } |
| |
| pub fn get_f64( |
| row: &dyn InternalRow, |
| columns: &[fcore::metadata::Column], |
| field: usize, |
| ) -> Result<f64, String> { |
| validate(row, columns, field, "get_f64", |dt| { |
| matches!(dt, fcore::metadata::DataType::Double(_)) |
| })?; |
| row.get_double(field).map_err(|e| e.to_string()) |
| } |
| |
| pub fn get_str<'a>( |
| row: &'a dyn InternalRow, |
| columns: &'a [fcore::metadata::Column], |
| field: usize, |
| ) -> Result<&'a str, String> { |
| let dt = validate(row, columns, field, "get_str", |dt| { |
| matches!( |
| dt, |
| fcore::metadata::DataType::Char(_) | fcore::metadata::DataType::String(_) |
| ) |
| })?; |
| match dt { |
| fcore::metadata::DataType::Char(ct) => row |
| .get_char(field, ct.length() as usize) |
| .map_err(|e| e.to_string()), |
| _ => row.get_string(field).map_err(|e| e.to_string()), |
| } |
| } |
| |
| pub fn get_bytes<'a>( |
| row: &'a dyn InternalRow, |
| columns: &'a [fcore::metadata::Column], |
| field: usize, |
| ) -> Result<&'a [u8], String> { |
| let dt = validate(row, columns, field, "get_bytes", |dt| { |
| matches!( |
| dt, |
| fcore::metadata::DataType::Binary(_) | fcore::metadata::DataType::Bytes(_) |
| ) |
| })?; |
| match dt { |
| fcore::metadata::DataType::Binary(bt) => row |
| .get_binary(field, bt.length()) |
| .map_err(|e| e.to_string()), |
| _ => row.get_bytes(field).map_err(|e| e.to_string()), |
| } |
| } |
| |
| pub fn get_date_days( |
| row: &dyn InternalRow, |
| columns: &[fcore::metadata::Column], |
| field: usize, |
| ) -> Result<i32, String> { |
| validate(row, columns, field, "get_date_days", |dt| { |
| matches!(dt, fcore::metadata::DataType::Date(_)) |
| })?; |
| row.get_date(field) |
| .map(|d| d.get_inner()) |
| .map_err(|e| e.to_string()) |
| } |
| |
| pub fn get_time_millis( |
| row: &dyn InternalRow, |
| columns: &[fcore::metadata::Column], |
| field: usize, |
| ) -> Result<i32, String> { |
| validate(row, columns, field, "get_time_millis", |dt| { |
| matches!(dt, fcore::metadata::DataType::Time(_)) |
| })?; |
| row.get_time(field) |
| .map(|t| t.get_inner()) |
| .map_err(|e| e.to_string()) |
| } |
| |
| pub fn get_ts_millis( |
| row: &dyn InternalRow, |
| columns: &[fcore::metadata::Column], |
| field: usize, |
| ) -> Result<i64, String> { |
| let dt = validate(row, columns, field, "get_ts_millis", |dt| { |
| matches!( |
| dt, |
| fcore::metadata::DataType::Timestamp(_) |
| | fcore::metadata::DataType::TimestampLTz(_) |
| ) |
| })?; |
| match dt { |
| fcore::metadata::DataType::TimestampLTz(ts) => row |
| .get_timestamp_ltz(field, ts.precision()) |
| .map(|v| v.get_epoch_millisecond()) |
| .map_err(|e| e.to_string()), |
| fcore::metadata::DataType::Timestamp(ts) => row |
| .get_timestamp_ntz(field, ts.precision()) |
| .map(|v| v.get_millisecond()) |
| .map_err(|e| e.to_string()), |
| dt => Err(format!("get_ts_millis: unexpected type {dt}")), |
| } |
| } |
| |
| pub fn get_ts_nanos( |
| row: &dyn InternalRow, |
| columns: &[fcore::metadata::Column], |
| field: usize, |
| ) -> Result<i32, String> { |
| let dt = validate(row, columns, field, "get_ts_nanos", |dt| { |
| matches!( |
| dt, |
| fcore::metadata::DataType::Timestamp(_) |
| | fcore::metadata::DataType::TimestampLTz(_) |
| ) |
| })?; |
| match dt { |
| fcore::metadata::DataType::TimestampLTz(ts) => row |
| .get_timestamp_ltz(field, ts.precision()) |
| .map(|v| v.get_nano_of_millisecond()) |
| .map_err(|e| e.to_string()), |
| fcore::metadata::DataType::Timestamp(ts) => row |
| .get_timestamp_ntz(field, ts.precision()) |
| .map(|v| v.get_nano_of_millisecond()) |
| .map_err(|e| e.to_string()), |
| dt => Err(format!("get_ts_nanos: unexpected type {dt}")), |
| } |
| } |
| |
| pub fn is_ts_ltz(columns: &[fcore::metadata::Column], field: usize) -> Result<bool, String> { |
| Ok(matches!( |
| get_column(columns, field)?.data_type(), |
| fcore::metadata::DataType::TimestampLTz(_) |
| )) |
| } |
| |
| pub fn get_decimal_str( |
| row: &dyn InternalRow, |
| columns: &[fcore::metadata::Column], |
| field: usize, |
| ) -> Result<String, String> { |
| let dt = validate(row, columns, field, "get_decimal_str", |dt| { |
| matches!(dt, fcore::metadata::DataType::Decimal(_)) |
| })?; |
| match dt { |
| fcore::metadata::DataType::Decimal(dd) => { |
| let decimal = row |
| .get_decimal(field, dd.precision() as usize, dd.scale() as usize) |
| .map_err(|e| e.to_string())?; |
| Ok(decimal.to_big_decimal().to_string()) |
| } |
| dt => Err(format!("get_decimal_str: unexpected type {dt}")), |
| } |
| } |
| } |
| |
| // ============================================================================ |
| // Opaque types: ScanResultInner (scan read path) |
| // ============================================================================ |
| |
| pub struct ScanResultInner { |
| error: Option<(i32, String)>, |
| buckets: Vec<(fcore::metadata::TableBucket, Vec<fcore::record::ScanRecord>)>, |
| columns: Vec<fcore::metadata::Column>, |
| bucket_infos: Vec<ffi::FfiBucketInfo>, |
| total_count: usize, |
| } |
| |
| impl ScanResultInner { |
| fn from_error(code: i32, msg: String) -> Self { |
| Self { |
| error: Some((code, msg)), |
| buckets: Vec::new(), |
| columns: Vec::new(), |
| bucket_infos: Vec::new(), |
| total_count: 0, |
| } |
| } |
| |
| fn resolve(&self, bucket: usize, rec: usize) -> &fcore::record::ScanRecord { |
| &self.buckets[bucket].1[rec] |
| } |
| |
| fn sv_has_error(&self) -> bool { |
| self.error.is_some() |
| } |
| |
| fn sv_error_code(&self) -> i32 { |
| self.error.as_ref().map_or(0, |e| e.0) |
| } |
| |
| fn sv_error_message(&self) -> &str { |
| self.error.as_ref().map_or("", |e| e.1.as_str()) |
| } |
| |
| fn sv_record_count(&self) -> usize { |
| self.total_count |
| } |
| |
| fn sv_column_count(&self) -> usize { |
| self.columns.len() |
| } |
| fn sv_column_name(&self, field: usize) -> Result<&str, String> { |
| row_reader::column_name(&self.columns, field) |
| } |
| fn sv_column_type(&self, field: usize) -> Result<i32, String> { |
| row_reader::column_type(&self.columns, field) |
| } |
| |
| fn sv_offset(&self, bucket: usize, rec: usize) -> i64 { |
| self.resolve(bucket, rec).offset() |
| } |
| fn sv_timestamp(&self, bucket: usize, rec: usize) -> i64 { |
| self.resolve(bucket, rec).timestamp() |
| } |
| fn sv_change_type(&self, bucket: usize, rec: usize) -> i32 { |
| self.resolve(bucket, rec).change_type().to_byte_value() as i32 |
| } |
| fn sv_field_count(&self) -> usize { |
| self.columns.len() |
| } |
| |
| // Field accessors — C++ validates bounds in BucketRecords/RecordAt, validate() checks field. |
| fn sv_is_null(&self, bucket: usize, rec: usize, field: usize) -> Result<bool, String> { |
| row_reader::is_null(self.resolve(bucket, rec).row(), &self.columns, field) |
| } |
| fn sv_get_bool(&self, bucket: usize, rec: usize, field: usize) -> Result<bool, String> { |
| row_reader::get_bool(self.resolve(bucket, rec).row(), &self.columns, field) |
| } |
| fn sv_get_i32(&self, bucket: usize, rec: usize, field: usize) -> Result<i32, String> { |
| row_reader::get_i32(self.resolve(bucket, rec).row(), &self.columns, field) |
| } |
| fn sv_get_i64(&self, bucket: usize, rec: usize, field: usize) -> Result<i64, String> { |
| row_reader::get_i64(self.resolve(bucket, rec).row(), &self.columns, field) |
| } |
| fn sv_get_f32(&self, bucket: usize, rec: usize, field: usize) -> Result<f32, String> { |
| row_reader::get_f32(self.resolve(bucket, rec).row(), &self.columns, field) |
| } |
| fn sv_get_f64(&self, bucket: usize, rec: usize, field: usize) -> Result<f64, String> { |
| row_reader::get_f64(self.resolve(bucket, rec).row(), &self.columns, field) |
| } |
| fn sv_get_str(&self, bucket: usize, rec: usize, field: usize) -> Result<&str, String> { |
| row_reader::get_str(self.resolve(bucket, rec).row(), &self.columns, field) |
| } |
| fn sv_get_bytes(&self, bucket: usize, rec: usize, field: usize) -> Result<&[u8], String> { |
| row_reader::get_bytes(self.resolve(bucket, rec).row(), &self.columns, field) |
| } |
| fn sv_get_date_days(&self, bucket: usize, rec: usize, field: usize) -> Result<i32, String> { |
| row_reader::get_date_days(self.resolve(bucket, rec).row(), &self.columns, field) |
| } |
| fn sv_get_time_millis(&self, bucket: usize, rec: usize, field: usize) -> Result<i32, String> { |
| row_reader::get_time_millis(self.resolve(bucket, rec).row(), &self.columns, field) |
| } |
| fn sv_get_ts_millis(&self, bucket: usize, rec: usize, field: usize) -> Result<i64, String> { |
| row_reader::get_ts_millis(self.resolve(bucket, rec).row(), &self.columns, field) |
| } |
| fn sv_get_ts_nanos(&self, bucket: usize, rec: usize, field: usize) -> Result<i32, String> { |
| row_reader::get_ts_nanos(self.resolve(bucket, rec).row(), &self.columns, field) |
| } |
| fn sv_is_ts_ltz(&self, _bucket: usize, _rec: usize, field: usize) -> Result<bool, String> { |
| row_reader::is_ts_ltz(&self.columns, field) |
| } |
| fn sv_get_decimal_str( |
| &self, |
| bucket: usize, |
| rec: usize, |
| field: usize, |
| ) -> Result<String, String> { |
| row_reader::get_decimal_str(self.resolve(bucket, rec).row(), &self.columns, field) |
| } |
| |
| fn sv_bucket_infos(&self) -> &Vec<ffi::FfiBucketInfo> { |
| &self.bucket_infos |
| } |
| } |
| |
| // ============================================================================ |
| // Opaque types: LookupResultInner (lookup read path) |
| // ============================================================================ |
| |
| pub struct LookupResultInner { |
| error: Option<(i32, String)>, |
| found: bool, |
| row: Option<fcore::row::GenericRow<'static>>, |
| columns: Vec<fcore::metadata::Column>, |
| } |
| |
| impl LookupResultInner { |
| fn from_error(code: i32, msg: String) -> Self { |
| Self { |
| error: Some((code, msg)), |
| found: false, |
| row: None, |
| columns: Vec::new(), |
| } |
| } |
| |
| fn lv_has_error(&self) -> bool { |
| self.error.is_some() |
| } |
| |
| fn lv_error_code(&self) -> i32 { |
| self.error.as_ref().map_or(0, |e| e.0) |
| } |
| |
| fn lv_error_message(&self) -> &str { |
| self.error.as_ref().map_or("", |e| e.1.as_str()) |
| } |
| |
| fn lv_found(&self) -> bool { |
| self.found |
| } |
| |
| fn lv_field_count(&self) -> usize { |
| self.columns.len() |
| } |
| |
| fn lv_column_type(&self, field: usize) -> Result<i32, String> { |
| row_reader::column_type(&self.columns, field) |
| } |
| |
| fn lv_column_name(&self, field: usize) -> Result<&str, String> { |
| row_reader::column_name(&self.columns, field) |
| } |
| |
| fn lv_row(&self) -> Result<&fcore::row::GenericRow<'static>, String> { |
| self.row |
| .as_ref() |
| .ok_or_else(|| "no row available (not found or error)".to_string()) |
| } |
| |
| // Field accessors — delegate to shared row_reader helpers. |
| fn lv_is_null(&self, field: usize) -> Result<bool, String> { |
| let r = self.lv_row()?; |
| row_reader::is_null(r, &self.columns, field) |
| } |
| fn lv_get_bool(&self, field: usize) -> Result<bool, String> { |
| let r = self.lv_row()?; |
| row_reader::get_bool(r, &self.columns, field) |
| } |
| fn lv_get_i32(&self, field: usize) -> Result<i32, String> { |
| let r = self.lv_row()?; |
| row_reader::get_i32(r, &self.columns, field) |
| } |
| fn lv_get_i64(&self, field: usize) -> Result<i64, String> { |
| let r = self.lv_row()?; |
| row_reader::get_i64(r, &self.columns, field) |
| } |
| fn lv_get_f32(&self, field: usize) -> Result<f32, String> { |
| let r = self.lv_row()?; |
| row_reader::get_f32(r, &self.columns, field) |
| } |
| fn lv_get_f64(&self, field: usize) -> Result<f64, String> { |
| let r = self.lv_row()?; |
| row_reader::get_f64(r, &self.columns, field) |
| } |
| fn lv_get_str(&self, field: usize) -> Result<&str, String> { |
| let r = self.lv_row()?; |
| row_reader::get_str(r, &self.columns, field) |
| } |
| fn lv_get_bytes(&self, field: usize) -> Result<&[u8], String> { |
| let r = self.lv_row()?; |
| row_reader::get_bytes(r, &self.columns, field) |
| } |
| fn lv_get_date_days(&self, field: usize) -> Result<i32, String> { |
| let r = self.lv_row()?; |
| row_reader::get_date_days(r, &self.columns, field) |
| } |
| fn lv_get_time_millis(&self, field: usize) -> Result<i32, String> { |
| let r = self.lv_row()?; |
| row_reader::get_time_millis(r, &self.columns, field) |
| } |
| fn lv_get_ts_millis(&self, field: usize) -> Result<i64, String> { |
| let r = self.lv_row()?; |
| row_reader::get_ts_millis(r, &self.columns, field) |
| } |
| fn lv_get_ts_nanos(&self, field: usize) -> Result<i32, String> { |
| let r = self.lv_row()?; |
| row_reader::get_ts_nanos(r, &self.columns, field) |
| } |
| fn lv_is_ts_ltz(&self, field: usize) -> Result<bool, String> { |
| row_reader::is_ts_ltz(&self.columns, field) |
| } |
| fn lv_get_decimal_str(&self, field: usize) -> Result<String, String> { |
| let r = self.lv_row()?; |
| row_reader::get_decimal_str(r, &self.columns, field) |
| } |
| } |