blob: d0d6c480cf5310faca442a2c46a4129e47bbf479 [file] [log] [blame]
/*
* Copyright 2024-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
extern crate lance;
use tokio;
use std::ffi::{CStr, CString};
use std::slice;
use std::os::raw::c_char;
use std::pin::Pin;
use std::ptr;
use std::sync::Arc;
use futures::StreamExt;
use arrow_array::{Array, RecordBatch, StructArray};
use arrow_schema::Schema as ArrowSchema;
use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema, from_ffi, to_ffi};
use lance_core::{
datatypes::{Schema as LanceSchema},
};
use lance_core::cache::LanceCache;
use lance_file::{
v2::{
reader::{FileReader, FileReaderOptions, ReaderProjection},
writer::{FileWriter, FileWriterOptions},
},
version::LanceFileVersion,
};
use lance_encoding::decoder::{DecoderPlugins, FilterExpression};
use lance_io::{
scheduler::{ScanScheduler, SchedulerConfig},
stream::RecordBatchStream,
utils::CachedFileSize,
object_store::ObjectStore,
ReadBatchParams,
};
use object_store::path::Path;
/// Set error message to error_message and return error code -1
/// Parameters:
/// - input_string: The Rust string to copy into the C buffer.
/// - error_message: A mutable pointer to a C buffer where the string should be copied.
/// - error_size: Size of the C buffer.
fn set_error_and_return(input_string: &str, error_message: *mut c_char, error_size: usize) -> i32 {
if error_message.is_null() {
// Handle null pointers if necessary
return -1;
}
match CString::new(input_string) {
Ok(c_string) => {
// Get the byte representation including the null terminator
let bytes = c_string.as_bytes_with_nul();
let bytes_to_copy = bytes.len().min(error_size - 1); // Ensure room for null terminator
unsafe {
// Copy the bytes into the C buffer
ptr::copy_nonoverlapping(bytes.as_ptr(), error_message as *mut u8, bytes_to_copy);
// Ensure the buffer is null-terminated
*error_message.add(bytes_to_copy) = 0;
}
},
Err(_) => {
// Handle error: Unable to create a CString (e.g., due to interior null byte)
let err_msg = "Failed to convert to CString";
let err_bytes = err_msg.as_bytes();
let err_bytes_to_copy = err_bytes.len().min(error_size - 1);
unsafe {
ptr::copy_nonoverlapping(err_bytes.as_ptr(), error_message as *mut u8, err_bytes_to_copy);
*error_message.add(err_bytes_to_copy) = 0;
}
}
}
-1 // Error code
}
use lazy_static::lazy_static;
lazy_static! {
pub static ref RT: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("Failed to create tokio runtime");
}
async fn async_create_writer(
c_file_path: *const c_char,
schema_ptr: *mut FFI_ArrowSchema,
file_writer_ptr: *mut *mut FileWriter,
error_message: *mut c_char,
error_size: usize,
) -> i32 {
unsafe {
if c_file_path.is_null() || schema_ptr.is_null() || file_writer_ptr.is_null() {
return set_error_and_return("Null pointer passed to function for file_path or schema_ptr or file_writer_ptr", error_message, error_size);
}
let file_path = CStr::from_ptr(c_file_path);
let path = match file_path.to_str() {
Ok(path_str) => std::path::Path::new(path_str),
Err(_) => return set_error_and_return("Invalid UTF-8 sequence in file path", error_message, error_size),
};
let object_writer = match ObjectStore::create_local_writer(&path).await {
Ok(writer) => writer,
Err(e) => return set_error_and_return(&format!("Failed to create local writer: {}", e), error_message, error_size),
};
let c_schema = FFI_ArrowSchema::from_raw(schema_ptr);
let arrow_schema = match ArrowSchema::try_from(&c_schema) {
Ok(schema) => schema,
Err(e) => return set_error_and_return(&format!("Failed to convert FFI schema: {}", e), error_message, error_size),
};
let lance_schema = match LanceSchema::try_from(&arrow_schema) {
Ok(ls) => ls,
Err(e) => return set_error_and_return(&format!("Failed to convert Arrow schema to Lance schema: {}", e), error_message, error_size),
};
// TODO: add write options
let lance_file_writer = match FileWriter::try_new(object_writer, lance_schema, FileWriterOptions {
format_version: Some(LanceFileVersion::V2_1),
..Default::default()
}) {
Ok(writer) => writer,
Err(e) => return set_error_and_return(&format!("Failed to create file writer: {}", e), error_message, error_size),
};
*file_writer_ptr = Box::into_raw(Box::new(lance_file_writer));
0 // Success
}
}
#[unsafe(no_mangle)]
pub extern "C" fn create_writer(
file_path: *const std::os::raw::c_char,
schema_ptr: *mut FFI_ArrowSchema,
file_writer_ptr: *mut *mut FileWriter,
error_message: *mut c_char,
error_size: usize
) -> i32 {
RT.block_on(async_create_writer(
file_path,
schema_ptr,
file_writer_ptr,
error_message,
error_size,
))
}
async fn async_write_c_arrow_array(
file_writer_ptr: *mut FileWriter,
input_array_ptr: *mut FFI_ArrowArray,
input_schema_ptr: *mut FFI_ArrowSchema,
error_message: *mut c_char,
error_size: usize,
) -> i32 {
if file_writer_ptr.is_null() || input_array_ptr.is_null() || input_schema_ptr.is_null() {
return set_error_and_return("Null pointer passed to function for file_writer_ptr or input_array_ptr or input_schema_ptr", error_message, error_size);
}
unsafe {
let arrow_array = FFI_ArrowArray::from_raw(input_array_ptr);
let arrow_schema = FFI_ArrowSchema::from_raw(input_schema_ptr);
let array_data = match from_ffi(arrow_array, &arrow_schema) {
Ok(data) => data,
Err(err) => return set_error_and_return(&format!("Failed to convert from FFI: {}", err), error_message, error_size),
};
let struct_array = StructArray::from(array_data);
let record_batch = RecordBatch::from(&struct_array);
let writer = &mut *file_writer_ptr;
match writer.write_batch(&record_batch).await {
Ok(_) => 0,
Err(err) => set_error_and_return(&format!("Failed to write batch: {}", err), error_message, error_size),
}
}
}
#[unsafe(no_mangle)]
pub extern "C" fn write_c_arrow_array(
file_writer_ptr: *mut FileWriter,
input_array_ptr: *mut FFI_ArrowArray,
input_schema_ptr: *mut FFI_ArrowSchema,
error_message: *mut c_char,
error_size: usize,
) -> i32 {
RT.block_on(async_write_c_arrow_array(
file_writer_ptr,
input_array_ptr,
input_schema_ptr,
error_message,
error_size,
))
}
#[unsafe(no_mangle)]
pub extern "C" fn writer_tell(file_writer_ptr: *mut FileWriter, tell_pos: *mut u64, error_message: *mut c_char, error_size: usize) -> i32 {
match RT.block_on(async {
if file_writer_ptr.is_null() || tell_pos.is_null() {
return Err("Null pointer passed to function for file_writer_ptr or tell_pos".into());
}
// Safely dereference the pointer within an unsafe block
let result = unsafe {
let writer = &mut *file_writer_ptr;
writer.tell().await
};
result.map_err(|e| e.to_string())
}) {
Ok(pos) => {
unsafe {
*tell_pos = pos;
}
0// Indicate success
}
Err(err_msg) => {
return set_error_and_return(&err_msg.to_string(), error_message, error_size);
}
}
}
#[unsafe(no_mangle)]
pub extern "C" fn finish_writer(file_writer_ptr: *mut FileWriter, error_message: *mut c_char, error_size: usize) -> i32 {
match RT.block_on(async {
if file_writer_ptr.is_null() {
return Err("Null pointer passed to function for file_writer_ptr".into());
}
// Safely dereference the pointer within an unsafe block
let result = unsafe {
let writer = &mut *file_writer_ptr;
writer.finish().await
};
result.map_err(|e| e.to_string())
}) {
Ok(_num_of_rows) => {
0 // Indicate success
}
Err(err_msg) => {
return set_error_and_return(&err_msg.to_string(), error_message, error_size);
}
}
}
#[unsafe(no_mangle)]
pub extern "C" fn release_writer(file_writer_ptr: *mut FileWriter, error_message: *mut c_char, error_size: usize) -> i32 {
if file_writer_ptr.is_null() {
return set_error_and_return("Null pointer passed to function for file_writer_ptr", error_message, error_size);
}
unsafe {
// Automatically drops the FileWriter
let _ = Box::from_raw(file_writer_ptr);
}
0 // Return 0 to indicate success
}
////////////////////////////////////////////
pub struct LanceReaderAdapter {
pub stream: Pin<Box<dyn RecordBatchStream>>,
}
impl LanceReaderAdapter {
pub fn new(stream: Pin<Box<dyn RecordBatchStream>>) -> Self {
LanceReaderAdapter { stream }
}
}
async fn async_create_reader(
c_file_path: *const c_char,
file_reader_ptr: *mut *mut FileReader,
error_message: *mut c_char,
error_size: usize,
) -> i32 {
unsafe {
if c_file_path.is_null() || file_reader_ptr.is_null() {
return set_error_and_return("Null pointer passed to function for file_path or file_reader_ptr", error_message, error_size);
}
let file_path = CStr::from_ptr(c_file_path);
let file_path_str = match file_path.to_str() {
Ok(path_str) => path_str,
Err(_) => return set_error_and_return("Invalid UTF-8 sequence in file path", error_message, error_size),
};
let (obj_store, path) = match ObjectStore::from_uri(&file_path_str).await {
Ok((o, p)) => (o, p),
Err(e) => return set_error_and_return(&format!("Failed to create object store from uri: {}", e), error_message, error_size),
};
let config = SchedulerConfig::max_bandwidth(&obj_store);
let scan_scheduler = ScanScheduler::new(obj_store, config);
let parsed_path = match Path::parse(&path) {
Ok(p) => p,
Err(e) => return set_error_and_return(&format!("Failed to parse path: {}", e), error_message, error_size),
};
let file_scheduler = match scan_scheduler
.open_file(&parsed_path, &CachedFileSize::unknown())
.await {
Ok(scheduler) => scheduler,
Err(e) => return set_error_and_return(&format!("Failed to open file for scheduler {}", e), error_message, error_size),
};
// TODO: add more reader options
let lance_file_reader = match FileReader::try_open(
file_scheduler,
/*base_projection*/None,
Arc::<DecoderPlugins>::default(),
&LanceCache::no_cache(),
FileReaderOptions::default(),
).await {
Ok(reader) => reader,
Err(e) => return set_error_and_return(&format!("Failed to create lance file reader {}", e), error_message, error_size),
};
*file_reader_ptr = Box::into_raw(Box::new(lance_file_reader));
0 // Success
}
}
#[unsafe(no_mangle)]
pub extern "C" fn create_reader(
c_file_path: *const c_char,
file_reader_ptr: *mut *mut FileReader,
error_message: *mut c_char,
error_size: usize,
) -> i32 {
RT.block_on(async_create_reader(
c_file_path,
file_reader_ptr,
error_message,
error_size,
))
}
#[unsafe(no_mangle)]
pub extern "C" fn get_schema(
file_reader_ptr: *mut FileReader,
output_schema_ptr: *mut FFI_ArrowSchema,
error_message: *mut c_char,
error_size: usize,
) -> i32 {
if file_reader_ptr.is_null() || output_schema_ptr.is_null() {
return set_error_and_return("Null pointer passed to function for file_reader_ptr or output_schema_ptr", error_message, error_size);
}
unsafe {
let reader = &mut *file_reader_ptr;
let schema = reader.schema();
let arrow_schema = ArrowSchema::from(schema.as_ref());
let c_schema = match FFI_ArrowSchema::try_from(&arrow_schema) {
Ok(schema) => schema,
Err(e) => return set_error_and_return(&format!("Failed to convert to FFI schema: {}", e), error_message, error_size),
};
std::ptr::write_unaligned(output_schema_ptr, c_schema);
0
}
}
#[unsafe(no_mangle)]
pub extern "C" fn num_rows(
file_reader_ptr: *mut FileReader,
num_rows: *mut u64,
error_message: *mut c_char,
error_size: usize,
) -> i32 {
if file_reader_ptr.is_null() || num_rows.is_null() {
return set_error_and_return("Null pointer passed to function for file_reader_ptr or num_rows", error_message, error_size);
}
unsafe {
let reader = &mut *file_reader_ptr;
*num_rows = reader.num_rows();
0
}
}
async fn async_create_stream_reader(
file_reader_ptr: *mut FileReader,
stream_reader_ptr : *mut *mut LanceReaderAdapter,
batch_size : u32,
batch_readahead : u32,
projection_column_names: *const *const c_char,
projection_column_count: usize,
read_row_ids: *const u32,
read_row_count: usize,
error_message: *mut c_char,
error_size: usize,
) -> i32 {
if file_reader_ptr.is_null() || stream_reader_ptr.is_null() {
return set_error_and_return("Null pointer passed to function for file_reader_ptr or stream_reader_ptr", error_message, error_size);
}
unsafe {
let reader = &mut *file_reader_ptr;
let projection = if projection_column_count == 0 {
ReaderProjection::from_whole_schema(reader.schema(), reader.metadata().version())
} else {
if projection_column_names.is_null() {
return set_error_and_return("Null pointer passed to function for projection_column_names, while projection_column_count > 0", error_message, error_size);
}
let projection_column_strings: &[*const c_char] =
std::slice::from_raw_parts(projection_column_names, projection_column_count);
let column_names: Vec<&str> = projection_column_strings.iter()
.filter_map(|&s| {
if s.is_null() {
None
} else {
let c_string = CStr::from_ptr(s);
c_string.to_str().ok()
}
})
.collect();
match ReaderProjection::from_column_names(reader.metadata().version(), reader.schema(), &column_names) {
Ok(p) => p,
Err(e) => return set_error_and_return(&format!("Failed to create projection {}", e), error_message, error_size),
}
};
let read_range = if read_row_count == 0 {
lance_io::ReadBatchParams::RangeFull
} else {
if read_row_ids.is_null() {
return set_error_and_return("Null pointer passed to function for read_row_ids, while read_row_count > 0", error_message, error_size);
}
let indices: &[u32] = slice::from_raw_parts(read_row_ids, read_row_count);
ReadBatchParams::from(indices)
};
let stream = match reader.read_stream_projected(
read_range,
batch_size,
batch_readahead,
projection,
FilterExpression::no_filter(),
) {
Ok(s) => s,
Err(e) => return set_error_and_return(&format!("Failed to read stream {}", e), error_message, error_size),
};
*stream_reader_ptr = Box::into_raw(Box::new(LanceReaderAdapter{stream: stream}));
0
}
}
#[unsafe(no_mangle)]
pub extern "C" fn create_stream_reader(
file_reader_ptr: *mut FileReader,
stream_reader_ptr : *mut *mut LanceReaderAdapter,
batch_size : u32,
batch_readahead : u32,
projection_column_names: *const *const c_char,
projection_column_count: usize,
read_row_ids: *const u32,
read_row_count: usize,
error_message: *mut c_char,
error_size: usize,
) -> i32 {
RT.block_on(async_create_stream_reader(
file_reader_ptr,
stream_reader_ptr,
batch_size,
batch_readahead,
projection_column_names,
projection_column_count,
read_row_ids,
read_row_count,
error_message,
error_size,
))
}
#[unsafe(no_mangle)]
pub extern "C" fn next_batch(
stream_reader_ptr: *mut LanceReaderAdapter,
output_array_ptr: *mut FFI_ArrowArray,
output_schema_ptr: *mut FFI_ArrowSchema,
is_eof: *mut bool,
error_message: *mut c_char,
error_size: usize,
) -> i32 {
if stream_reader_ptr.is_null() || output_array_ptr.is_null() || output_schema_ptr.is_null() || is_eof.is_null() {
return set_error_and_return("Null pointer passed to function for stream_reader_ptr or output_array_ptr or output_schema_ptr or is_eof", error_message, error_size);
}
unsafe {
let reader = &mut *stream_reader_ptr;
match RT.block_on(reader.stream.next()) {
Some(Ok(b)) => {
let array = StructArray::from(b);
let (out_array, out_schema) = match to_ffi(&array.to_data()) {
Ok((a, s)) => (a, s),
Err(e) => return set_error_and_return(&format!("Failed to convert to array & schema, {}", e), error_message, error_size),
};
std::ptr::write_unaligned(output_array_ptr, out_array);
std::ptr::write_unaligned(output_schema_ptr, out_schema);
*is_eof = false;
return 0
},
Some(Err(e)) => {
return set_error_and_return(&format!("Failed to next batch, {}", e), error_message, error_size)
},
None => {
// read eof
*is_eof = true;
return 0;
}
}
}
}
#[unsafe(no_mangle)]
pub extern "C" fn release_reader(file_reader_ptr: *mut FileReader, error_message: *mut c_char, error_size: usize) -> i32 {
if file_reader_ptr.is_null() {
return set_error_and_return("Null pointer passed to function for file_reader_ptr", error_message, error_size);
}
unsafe {
// Automatically drops the FileReader
let _ = Box::from_raw(file_reader_ptr);
}
0
}
#[unsafe(no_mangle)]
pub extern "C" fn release_stream_reader(stream_reader_ptr: *mut LanceReaderAdapter, error_message: *mut c_char, error_size: usize) -> i32 {
if stream_reader_ptr.is_null() {
return set_error_and_return("Null pointer passed to function for stream_reader_ptr", error_message, error_size);
}
unsafe {
// Automatically drops the ReaderAdapater
let _ = Box::from_raw(stream_reader_ptr);
}
0
}