blob: f8efe677f4e168d19872385d4b42e7a8b834e670 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use crate::ffi;
use anyhow::{Result, anyhow};
use arrow::array::Array;
use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
use fluss as fcore;
use std::borrow::Cow;
use std::str::FromStr;
pub const DATA_TYPE_BOOLEAN: i32 = 1;
pub const DATA_TYPE_TINYINT: i32 = 2;
pub const DATA_TYPE_SMALLINT: i32 = 3;
pub const DATA_TYPE_INT: i32 = 4;
pub const DATA_TYPE_BIGINT: i32 = 5;
pub const DATA_TYPE_FLOAT: i32 = 6;
pub const DATA_TYPE_DOUBLE: i32 = 7;
pub const DATA_TYPE_STRING: i32 = 8;
pub const DATA_TYPE_BYTES: i32 = 9;
pub const DATA_TYPE_DATE: i32 = 10;
pub const DATA_TYPE_TIME: i32 = 11;
pub const DATA_TYPE_TIMESTAMP: i32 = 12;
pub const DATA_TYPE_TIMESTAMP_LTZ: i32 = 13;
pub const DATA_TYPE_DECIMAL: i32 = 14;
pub const DATA_TYPE_CHAR: i32 = 15;
pub const DATA_TYPE_BINARY: i32 = 16;
// DATUM_TYPE_* constants removed — no longer needed with opaque types.
fn ffi_data_type_to_core(dt: i32, precision: u32, scale: u32) -> Result<fcore::metadata::DataType> {
match dt {
DATA_TYPE_BOOLEAN => Ok(fcore::metadata::DataTypes::boolean()),
DATA_TYPE_TINYINT => Ok(fcore::metadata::DataTypes::tinyint()),
DATA_TYPE_SMALLINT => Ok(fcore::metadata::DataTypes::smallint()),
DATA_TYPE_INT => Ok(fcore::metadata::DataTypes::int()),
DATA_TYPE_BIGINT => Ok(fcore::metadata::DataTypes::bigint()),
DATA_TYPE_FLOAT => Ok(fcore::metadata::DataTypes::float()),
DATA_TYPE_DOUBLE => Ok(fcore::metadata::DataTypes::double()),
DATA_TYPE_STRING => Ok(fcore::metadata::DataTypes::string()),
DATA_TYPE_BYTES => Ok(fcore::metadata::DataTypes::bytes()),
DATA_TYPE_DATE => Ok(fcore::metadata::DataTypes::date()),
DATA_TYPE_TIME => Ok(fcore::metadata::DataTypes::time()),
DATA_TYPE_TIMESTAMP => Ok(fcore::metadata::DataTypes::timestamp_with_precision(
precision,
)),
DATA_TYPE_TIMESTAMP_LTZ => Ok(fcore::metadata::DataTypes::timestamp_ltz_with_precision(
precision,
)),
DATA_TYPE_DECIMAL => {
let dt = fcore::metadata::DecimalType::new(precision, scale)?;
Ok(fcore::metadata::DataType::Decimal(dt))
}
DATA_TYPE_CHAR => Ok(fcore::metadata::DataTypes::char(precision)),
DATA_TYPE_BINARY => Ok(fcore::metadata::DataTypes::binary(precision as usize)),
_ => Err(anyhow!("Unknown data type: {dt}")),
}
}
pub fn core_data_type_to_ffi(dt: &fcore::metadata::DataType) -> i32 {
match dt {
fcore::metadata::DataType::Boolean(_) => DATA_TYPE_BOOLEAN,
fcore::metadata::DataType::TinyInt(_) => DATA_TYPE_TINYINT,
fcore::metadata::DataType::SmallInt(_) => DATA_TYPE_SMALLINT,
fcore::metadata::DataType::Int(_) => DATA_TYPE_INT,
fcore::metadata::DataType::BigInt(_) => DATA_TYPE_BIGINT,
fcore::metadata::DataType::Float(_) => DATA_TYPE_FLOAT,
fcore::metadata::DataType::Double(_) => DATA_TYPE_DOUBLE,
fcore::metadata::DataType::String(_) => DATA_TYPE_STRING,
fcore::metadata::DataType::Bytes(_) => DATA_TYPE_BYTES,
fcore::metadata::DataType::Date(_) => DATA_TYPE_DATE,
fcore::metadata::DataType::Time(_) => DATA_TYPE_TIME,
fcore::metadata::DataType::Timestamp(_) => DATA_TYPE_TIMESTAMP,
fcore::metadata::DataType::TimestampLTz(_) => DATA_TYPE_TIMESTAMP_LTZ,
fcore::metadata::DataType::Decimal(_) => DATA_TYPE_DECIMAL,
fcore::metadata::DataType::Char(_) => DATA_TYPE_CHAR,
fcore::metadata::DataType::Binary(_) => DATA_TYPE_BINARY,
_ => 0,
}
}
pub fn ffi_descriptor_to_core(
descriptor: &ffi::FfiTableDescriptor,
) -> Result<fcore::metadata::TableDescriptor> {
let mut schema_builder = fcore::metadata::Schema::builder();
for col in &descriptor.schema.columns {
if col.precision < 0 || col.scale < 0 {
return Err(anyhow!(
"Column '{}': precision and scale must be non-negative",
col.name
));
}
let dt = ffi_data_type_to_core(col.data_type, col.precision as u32, col.scale as u32)?;
schema_builder = schema_builder.column(&col.name, dt);
if !col.comment.is_empty() {
schema_builder = schema_builder.with_comment(&col.comment);
}
}
if !descriptor.schema.primary_keys.is_empty() {
schema_builder = schema_builder.primary_key(descriptor.schema.primary_keys.clone());
}
let schema = schema_builder.build()?;
let mut builder = fcore::metadata::TableDescriptor::builder()
.schema(schema)
.partitioned_by(descriptor.partition_keys.clone());
if descriptor.bucket_count > 0 {
builder = builder.distributed_by(
Some(descriptor.bucket_count),
descriptor.bucket_keys.clone(),
);
} else {
builder = builder.distributed_by(None, descriptor.bucket_keys.clone());
}
for prop in &descriptor.properties {
builder = builder.property(&prop.key, &prop.value);
}
if !descriptor.custom_properties.is_empty() {
let custom: std::collections::HashMap<String, String> = descriptor
.custom_properties
.iter()
.map(|kv| (kv.key.clone(), kv.value.clone()))
.collect();
builder = builder.custom_properties(custom);
}
if !descriptor.comment.is_empty() {
builder = builder.comment(&descriptor.comment);
}
Ok(builder.build()?)
}
pub fn core_table_info_to_ffi(info: &fcore::metadata::TableInfo) -> ffi::FfiTableInfo {
let schema = info.get_schema();
let columns: Vec<ffi::FfiColumn> = schema
.columns()
.iter()
.map(|col| {
let (precision, scale) = match col.data_type() {
fcore::metadata::DataType::Decimal(dt) => {
(dt.precision() as i32, dt.scale() as i32)
}
fcore::metadata::DataType::Timestamp(dt) => (dt.precision() as i32, 0),
fcore::metadata::DataType::TimestampLTz(dt) => (dt.precision() as i32, 0),
fcore::metadata::DataType::Char(dt) => (dt.length() as i32, 0),
fcore::metadata::DataType::Binary(dt) => (dt.length() as i32, 0),
_ => (0, 0),
};
ffi::FfiColumn {
name: col.name().to_string(),
data_type: core_data_type_to_ffi(col.data_type()),
comment: col.comment().unwrap_or("").to_string(),
precision,
scale,
}
})
.collect();
let primary_keys: Vec<String> = schema
.primary_key()
.map(|pk| pk.column_names().to_vec())
.unwrap_or_default();
let properties: Vec<ffi::HashMapValue> = info
.get_properties()
.iter()
.map(|(k, v)| ffi::HashMapValue {
key: k.clone(),
value: v.clone(),
})
.collect();
let custom_properties: Vec<ffi::HashMapValue> = info
.get_custom_properties()
.iter()
.map(|(k, v)| ffi::HashMapValue {
key: k.clone(),
value: v.clone(),
})
.collect();
ffi::FfiTableInfo {
table_id: info.get_table_id(),
schema_id: info.get_schema_id(),
table_path: ffi::FfiTablePath {
database_name: info.get_table_path().database().to_string(),
table_name: info.get_table_path().table().to_string(),
},
created_time: info.get_created_time(),
modified_time: info.get_modified_time(),
primary_keys: info.get_primary_keys().clone(),
bucket_keys: info.get_bucket_keys().to_vec(),
partition_keys: info.get_partition_keys().to_vec(),
num_buckets: info.get_num_buckets(),
has_primary_key: info.has_primary_key(),
is_partitioned: info.is_partitioned(),
properties,
custom_properties,
comment: info.get_comment().unwrap_or("").to_string(),
schema: ffi::FfiSchema {
columns,
primary_keys,
},
}
}
pub fn empty_table_info() -> ffi::FfiTableInfo {
ffi::FfiTableInfo {
table_id: 0,
schema_id: 0,
table_path: ffi::FfiTablePath {
database_name: String::new(),
table_name: String::new(),
},
created_time: 0,
modified_time: 0,
primary_keys: vec![],
bucket_keys: vec![],
partition_keys: vec![],
num_buckets: 0,
has_primary_key: false,
is_partitioned: false,
properties: vec![],
custom_properties: vec![],
comment: String::new(),
schema: ffi::FfiSchema {
columns: vec![],
primary_keys: vec![],
},
}
}
/// Convert FFI database descriptor to core. Returns None if descriptor is effectively empty
/// (no comment and no properties), so create_database can pass Option::None to core.
pub fn ffi_database_descriptor_to_core(
d: &ffi::FfiDatabaseDescriptor,
) -> Option<fcore::metadata::DatabaseDescriptor> {
if d.comment.is_empty() && d.properties.is_empty() {
return None;
}
let mut builder = fcore::metadata::DatabaseDescriptor::builder();
if !d.comment.is_empty() {
builder = builder.comment(&d.comment);
}
if !d.properties.is_empty() {
let props: std::collections::HashMap<String, String> = d
.properties
.iter()
.map(|kv| (kv.key.clone(), kv.value.clone()))
.collect();
builder = builder.custom_properties(props);
}
Some(builder.build())
}
/// Convert core DatabaseInfo to FFI.
pub fn core_database_info_to_ffi(info: &fcore::metadata::DatabaseInfo) -> ffi::FfiDatabaseInfo {
let desc = info.database_descriptor();
let properties: Vec<ffi::HashMapValue> = desc
.custom_properties()
.iter()
.map(|(k, v)| ffi::HashMapValue {
key: k.clone(),
value: v.clone(),
})
.collect();
ffi::FfiDatabaseInfo {
database_name: info.database_name().to_string(),
comment: desc.comment().unwrap_or("").to_string(),
properties,
created_time: info.created_time(),
modified_time: info.modified_time(),
}
}
/// Resolve types in a GenericRow using schema metadata.
/// Narrows Int32 → Int8/Int16, parses decimal strings, etc.
/// Used by both AppendWriter and UpsertWriter.
pub fn resolve_row_types(
row: &fcore::row::GenericRow<'_>,
schema: Option<&fcore::metadata::Schema>,
) -> Result<fcore::row::GenericRow<'static>> {
use fcore::row::Datum;
let mut out = fcore::row::GenericRow::new(row.values.len());
for (idx, datum) in row.values.iter().enumerate() {
let resolved = match datum {
Datum::Null => Datum::Null,
Datum::Bool(v) => Datum::Bool(*v),
Datum::Int32(v) => match schema
.and_then(|s| s.columns().get(idx))
.map(|c| c.data_type())
{
Some(fcore::metadata::DataType::TinyInt(_)) => Datum::Int8(
i8::try_from(*v).map_err(|_| anyhow!("Column {idx}: {v} overflows TinyInt"))?,
),
Some(fcore::metadata::DataType::SmallInt(_)) => Datum::Int16(
i16::try_from(*v)
.map_err(|_| anyhow!("Column {idx}: {v} overflows SmallInt"))?,
),
_ => Datum::Int32(*v),
},
Datum::Int64(v) => Datum::Int64(*v),
Datum::Float32(v) => Datum::Float32(*v),
Datum::Float64(v) => Datum::Float64(*v),
Datum::Int8(v) => Datum::Int8(*v),
Datum::Int16(v) => Datum::Int16(*v),
Datum::String(cow) => {
// Check if the schema column is Decimal — if so, parse the string as decimal
match schema
.and_then(|s| s.columns().get(idx))
.map(|c| c.data_type())
{
Some(fcore::metadata::DataType::Decimal(dt)) => {
let (precision, scale) = (dt.precision(), dt.scale());
let bd = bigdecimal::BigDecimal::from_str(cow.as_ref()).map_err(|e| {
anyhow!("Column {idx}: invalid decimal string '{}': {e}", cow)
})?;
let decimal = fcore::row::Decimal::from_big_decimal(bd, precision, scale)
.map_err(|e| anyhow!("Column {idx}: {e}"))?;
Datum::Decimal(decimal)
}
_ => Datum::String(Cow::Owned(cow.to_string())),
}
}
Datum::Blob(cow) => Datum::Blob(Cow::Owned(cow.to_vec())),
Datum::Decimal(d) => Datum::Decimal(d.clone()),
Datum::Date(d) => Datum::Date(*d),
Datum::Time(t) => Datum::Time(*t),
Datum::TimestampNtz(ts) => Datum::TimestampNtz(*ts),
Datum::TimestampLtz(ts) => Datum::TimestampLtz(*ts),
};
out.set_field(idx, resolved);
}
Ok(out)
}
/// Convert a CompactedRow (lookup result) to an owned GenericRow<'static>.
/// One copy for strings/bytes (Cow::Owned), but no second copy into FfiDatum.
pub fn compacted_row_to_owned(
row: &dyn fcore::row::InternalRow,
table_info: &fcore::metadata::TableInfo,
) -> Result<fcore::row::GenericRow<'static>> {
use fcore::row::Datum;
let schema = table_info.get_schema();
let columns = schema.columns();
let mut out = fcore::row::GenericRow::new(columns.len());
for (i, col) in columns.iter().enumerate() {
if row.is_null_at(i)? {
out.set_field(i, Datum::Null);
continue;
}
let datum = match col.data_type() {
fcore::metadata::DataType::Boolean(_) => Datum::Bool(row.get_boolean(i)?),
fcore::metadata::DataType::TinyInt(_) => Datum::Int8(row.get_byte(i)?),
fcore::metadata::DataType::SmallInt(_) => Datum::Int16(row.get_short(i)?),
fcore::metadata::DataType::Int(_) => Datum::Int32(row.get_int(i)?),
fcore::metadata::DataType::BigInt(_) => Datum::Int64(row.get_long(i)?),
fcore::metadata::DataType::Float(_) => Datum::Float32(row.get_float(i)?.into()),
fcore::metadata::DataType::Double(_) => Datum::Float64(row.get_double(i)?.into()),
fcore::metadata::DataType::String(_) => {
Datum::String(Cow::Owned(row.get_string(i)?.to_string()))
}
fcore::metadata::DataType::Bytes(_) => {
Datum::Blob(Cow::Owned(row.get_bytes(i)?.to_vec()))
}
fcore::metadata::DataType::Date(_) => Datum::Date(row.get_date(i)?),
fcore::metadata::DataType::Time(_) => Datum::Time(row.get_time(i)?),
fcore::metadata::DataType::Timestamp(dt) => {
Datum::TimestampNtz(row.get_timestamp_ntz(i, dt.precision())?)
}
fcore::metadata::DataType::TimestampLTz(dt) => {
Datum::TimestampLtz(row.get_timestamp_ltz(i, dt.precision())?)
}
fcore::metadata::DataType::Decimal(dt) => {
let decimal = row.get_decimal(i, dt.precision() as usize, dt.scale() as usize)?;
Datum::Decimal(decimal)
}
fcore::metadata::DataType::Char(dt) => Datum::String(Cow::Owned(
row.get_char(i, dt.length() as usize)?.to_string(),
)),
fcore::metadata::DataType::Binary(dt) => {
Datum::Blob(Cow::Owned(row.get_binary(i, dt.length())?.to_vec()))
}
other => return Err(anyhow!("Unsupported data type for column {i}: {other:?}")),
};
out.set_field(i, datum);
}
Ok(out)
}
pub fn core_lake_snapshot_to_ffi(snapshot: &fcore::metadata::LakeSnapshot) -> ffi::FfiLakeSnapshot {
let bucket_offsets: Vec<ffi::FfiBucketOffset> = snapshot
.table_buckets_offset
.iter()
.map(|(bucket, offset)| ffi::FfiBucketOffset {
table_id: bucket.table_id(),
partition_id: bucket.partition_id().unwrap_or(-1),
bucket_id: bucket.bucket_id(),
offset: *offset,
})
.collect();
ffi::FfiLakeSnapshot {
snapshot_id: snapshot.snapshot_id,
bucket_offsets,
}
}
pub fn core_scan_batches_to_ffi(
batches: &[fcore::record::ScanBatch],
) -> Result<ffi::FfiArrowRecordBatches, String> {
let mut ffi_batches = Vec::new();
for batch in batches {
let record_batch = batch.batch();
// Convert RecordBatch to StructArray first, then get the data
let struct_array = arrow::array::StructArray::from(record_batch.clone());
let ffi_array = Box::new(FFI_ArrowArray::new(&struct_array.into_data()));
let ffi_schema = Box::new(
FFI_ArrowSchema::try_from(record_batch.schema().as_ref()).map_err(|e| e.to_string())?,
);
// Export as raw pointers
ffi_batches.push(ffi::FfiArrowRecordBatch {
array_ptr: Box::into_raw(ffi_array) as usize,
schema_ptr: Box::into_raw(ffi_schema) as usize,
table_id: batch.bucket().table_id(),
partition_id: batch.bucket().partition_id().unwrap_or(-1),
bucket_id: batch.bucket().bucket_id(),
base_offset: batch.base_offset(),
});
}
Ok(ffi::FfiArrowRecordBatches {
batches: ffi_batches,
})
}