| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| //! Contains declarations to bind to the [C Stream Interface](https://arrow.apache.org/docs/format/CStreamInterface.html). |
| //! |
| //! This module has two main interfaces: |
| //! One interface maps C ABI to native Rust types, i.e. convert c-pointers, c_char, to native rust. |
| //! This is handled by [FFI_ArrowArrayStream]. |
| //! |
| //! The second interface is used to import `FFI_ArrowArrayStream` as Rust implementation `RecordBatch` reader. |
| //! This is handled by `ArrowArrayStreamReader`. |
| //! |
| //! ```ignore |
| //! # use std::fs::File; |
| //! # use std::sync::Arc; |
| //! # use arrow::error::Result; |
| //! # use arrow::ffi_stream::{export_reader_into_raw, ArrowArrayStreamReader, FFI_ArrowArrayStream}; |
| //! # use arrow::ipc::reader::FileReader; |
| //! # use arrow::record_batch::RecordBatchReader; |
| //! # fn main() -> Result<()> { |
| //! // create an record batch reader natively |
| //! let file = File::open("arrow_file").unwrap(); |
| //! let reader = Box::new(FileReader::try_new(file).unwrap()); |
| //! |
| //! // export it |
| //! let mut stream = FFI_ArrowArrayStream::empty(); |
| //! unsafe { export_reader_into_raw(reader, &mut stream) }; |
| //! |
| //! // consumed and used by something else... |
| //! |
| //! // import it |
| //! let stream_reader = unsafe { ArrowArrayStreamReader::from_raw(&mut stream).unwrap() }; |
| //! let imported_schema = stream_reader.schema(); |
| //! |
| //! let mut produced_batches = vec![]; |
| //! for batch in stream_reader { |
| //! produced_batches.push(batch.unwrap()); |
| //! } |
| //! Ok(()) |
| //! } |
| //! ``` |
| |
| use arrow_schema::DataType; |
| use std::ffi::CStr; |
| use std::ptr::addr_of; |
| use std::{ |
| ffi::CString, |
| os::raw::{c_char, c_int, c_void}, |
| sync::Arc, |
| }; |
| |
| use arrow_data::ffi::FFI_ArrowArray; |
| use arrow_schema::{ArrowError, Schema, SchemaRef, ffi::FFI_ArrowSchema}; |
| |
| use crate::array::Array; |
| use crate::array::StructArray; |
| use crate::ffi::from_ffi_and_data_type; |
| use crate::record_batch::{RecordBatch, RecordBatchReader}; |
| |
| type Result<T> = std::result::Result<T, ArrowError>; |
| |
| const ENOMEM: i32 = 12; |
| const EIO: i32 = 5; |
| const EINVAL: i32 = 22; |
| const ENOSYS: i32 = 78; |
| |
| /// ABI-compatible struct for `ArrayStream` from C Stream Interface |
| /// See <https://arrow.apache.org/docs/format/CStreamInterface.html#structure-definitions> |
| /// This was created by bindgen |
| #[repr(C)] |
| #[derive(Debug)] |
| #[allow(non_camel_case_types)] |
| pub struct FFI_ArrowArrayStream { |
| /// C function to get schema from the stream |
| pub get_schema: |
| Option<unsafe extern "C" fn(arg1: *mut Self, out: *mut FFI_ArrowSchema) -> c_int>, |
| /// C function to get next array from the stream |
| pub get_next: Option<unsafe extern "C" fn(arg1: *mut Self, out: *mut FFI_ArrowArray) -> c_int>, |
| /// C function to get the error from last operation on the stream |
| pub get_last_error: Option<unsafe extern "C" fn(arg1: *mut Self) -> *const c_char>, |
| /// C function to release the stream |
| pub release: Option<unsafe extern "C" fn(arg1: *mut Self)>, |
| /// Private data used by the stream |
| pub private_data: *mut c_void, |
| } |
| |
| unsafe impl Send for FFI_ArrowArrayStream {} |
| |
| // callback used to drop [FFI_ArrowArrayStream] when it is exported. |
| unsafe extern "C" fn release_stream(stream: *mut FFI_ArrowArrayStream) { |
| if stream.is_null() { |
| return; |
| } |
| let stream = unsafe { &mut *stream }; |
| |
| stream.get_schema = None; |
| stream.get_next = None; |
| stream.get_last_error = None; |
| |
| let private_data = unsafe { Box::from_raw(stream.private_data as *mut StreamPrivateData) }; |
| drop(private_data); |
| |
| stream.release = None; |
| } |
| |
| struct StreamPrivateData { |
| batch_reader: Box<dyn RecordBatchReader + Send>, |
| last_error: Option<CString>, |
| } |
| |
| // The callback used to get array schema |
| unsafe extern "C" fn get_schema( |
| stream: *mut FFI_ArrowArrayStream, |
| schema: *mut FFI_ArrowSchema, |
| ) -> c_int { |
| ExportedArrayStream { stream }.get_schema(schema) |
| } |
| |
| // The callback used to get next array |
| unsafe extern "C" fn get_next( |
| stream: *mut FFI_ArrowArrayStream, |
| array: *mut FFI_ArrowArray, |
| ) -> c_int { |
| ExportedArrayStream { stream }.get_next(array) |
| } |
| |
| // The callback used to get the error from last operation on the `FFI_ArrowArrayStream` |
| unsafe extern "C" fn get_last_error(stream: *mut FFI_ArrowArrayStream) -> *const c_char { |
| let mut ffi_stream = ExportedArrayStream { stream }; |
| // The consumer should not take ownership of this string, we should return |
| // a const pointer to it. |
| match ffi_stream.get_last_error() { |
| Some(err_string) => err_string.as_ptr(), |
| None => std::ptr::null(), |
| } |
| } |
| |
| impl Drop for FFI_ArrowArrayStream { |
| fn drop(&mut self) { |
| match self.release { |
| None => (), |
| Some(release) => unsafe { release(self) }, |
| }; |
| } |
| } |
| |
| impl FFI_ArrowArrayStream { |
| /// Creates a new [`FFI_ArrowArrayStream`]. |
| pub fn new(batch_reader: Box<dyn RecordBatchReader + Send>) -> Self { |
| let private_data = Box::new(StreamPrivateData { |
| batch_reader, |
| last_error: None, |
| }); |
| |
| Self { |
| get_schema: Some(get_schema), |
| get_next: Some(get_next), |
| get_last_error: Some(get_last_error), |
| release: Some(release_stream), |
| private_data: Box::into_raw(private_data) as *mut c_void, |
| } |
| } |
| |
| /// Takes ownership of the pointed to [`FFI_ArrowArrayStream`] |
| /// |
| /// This acts to [move] the data out of `raw_stream`, setting the release callback to NULL |
| /// |
| /// # Safety |
| /// |
| /// * `raw_stream` must be [valid] for reads and writes |
| /// * `raw_stream` must be properly aligned |
| /// * `raw_stream` must point to a properly initialized value of [`FFI_ArrowArrayStream`] |
| /// |
| /// [move]: https://arrow.apache.org/docs/format/CDataInterface.html#moving-an-array |
| /// [valid]: https://doc.rust-lang.org/std/ptr/index.html#safety |
| pub unsafe fn from_raw(raw_stream: *mut FFI_ArrowArrayStream) -> Self { |
| unsafe { std::ptr::replace(raw_stream, Self::empty()) } |
| } |
| |
| /// Creates a new empty [FFI_ArrowArrayStream]. Used to import from the C Stream Interface. |
| pub fn empty() -> Self { |
| Self { |
| get_schema: None, |
| get_next: None, |
| get_last_error: None, |
| release: None, |
| private_data: std::ptr::null_mut(), |
| } |
| } |
| } |
| |
| struct ExportedArrayStream { |
| stream: *mut FFI_ArrowArrayStream, |
| } |
| |
| impl ExportedArrayStream { |
| fn get_private_data(&mut self) -> &mut StreamPrivateData { |
| unsafe { &mut *((*self.stream).private_data as *mut StreamPrivateData) } |
| } |
| |
| pub fn get_schema(&mut self, out: *mut FFI_ArrowSchema) -> i32 { |
| let private_data = self.get_private_data(); |
| let reader = &private_data.batch_reader; |
| |
| let schema = FFI_ArrowSchema::try_from(reader.schema().as_ref()); |
| |
| match schema { |
| Ok(schema) => { |
| unsafe { std::ptr::copy(addr_of!(schema), out, 1) }; |
| std::mem::forget(schema); |
| 0 |
| } |
| Err(ref err) => { |
| private_data.last_error = Some( |
| CString::new(err.to_string()).expect("Error string has a null byte in it."), |
| ); |
| get_error_code(err) |
| } |
| } |
| } |
| |
| pub fn get_next(&mut self, out: *mut FFI_ArrowArray) -> i32 { |
| let private_data = self.get_private_data(); |
| let reader = &mut private_data.batch_reader; |
| |
| match reader.next() { |
| None => { |
| // Marks ArrowArray released to indicate reaching the end of stream. |
| unsafe { std::ptr::write(out, FFI_ArrowArray::empty()) } |
| 0 |
| } |
| Some(next_batch) => { |
| if let Ok(batch) = next_batch { |
| let struct_array = StructArray::from(batch); |
| let array = FFI_ArrowArray::new(&struct_array.to_data()); |
| |
| unsafe { std::ptr::write_unaligned(out, array) }; |
| 0 |
| } else { |
| let err = &next_batch.unwrap_err(); |
| private_data.last_error = Some( |
| CString::new(err.to_string()).expect("Error string has a null byte in it."), |
| ); |
| get_error_code(err) |
| } |
| } |
| } |
| } |
| |
| pub fn get_last_error(&mut self) -> Option<&CString> { |
| self.get_private_data().last_error.as_ref() |
| } |
| } |
| |
| fn get_error_code(err: &ArrowError) -> i32 { |
| match err { |
| ArrowError::NotYetImplemented(_) => ENOSYS, |
| ArrowError::MemoryError(_) => ENOMEM, |
| ArrowError::IoError(_, _) => EIO, |
| _ => EINVAL, |
| } |
| } |
| |
| /// A `RecordBatchReader` which imports Arrays from `FFI_ArrowArrayStream`. |
| /// |
| /// Struct used to fetch `RecordBatch` from the C Stream Interface. |
| /// Its main responsibility is to expose `RecordBatchReader` functionality |
| /// that requires [FFI_ArrowArrayStream]. |
| #[derive(Debug)] |
| pub struct ArrowArrayStreamReader { |
| stream: FFI_ArrowArrayStream, |
| schema: SchemaRef, |
| } |
| |
| /// Gets schema from a raw pointer of `FFI_ArrowArrayStream`. This is used when constructing |
| /// `ArrowArrayStreamReader` to cache schema. |
| fn get_stream_schema(stream_ptr: *mut FFI_ArrowArrayStream) -> Result<SchemaRef> { |
| let mut schema = FFI_ArrowSchema::empty(); |
| |
| let ret_code = unsafe { (*stream_ptr).get_schema.unwrap()(stream_ptr, &mut schema) }; |
| |
| if ret_code == 0 { |
| let schema = Schema::try_from(&schema)?; |
| Ok(Arc::new(schema)) |
| } else { |
| Err(ArrowError::CDataInterface(format!( |
| "Cannot get schema from input stream. Error code: {ret_code:?}" |
| ))) |
| } |
| } |
| |
| impl ArrowArrayStreamReader { |
| /// Creates a new `ArrowArrayStreamReader` from a `FFI_ArrowArrayStream`. |
| /// This is used to import from the C Stream Interface. |
| #[allow(dead_code)] |
| pub fn try_new(mut stream: FFI_ArrowArrayStream) -> Result<Self> { |
| if stream.release.is_none() { |
| return Err(ArrowError::CDataInterface( |
| "input stream is already released".to_string(), |
| )); |
| } |
| |
| let schema = get_stream_schema(&mut stream)?; |
| |
| Ok(Self { stream, schema }) |
| } |
| |
| /// Creates a new `ArrowArrayStreamReader` from a raw pointer of `FFI_ArrowArrayStream`. |
| /// |
| /// Assumes that the pointer represents valid C Stream Interfaces. |
| /// This function copies the content from the raw pointer and cleans up it to prevent |
| /// double-dropping. The caller is responsible for freeing up the memory allocated for |
| /// the pointer. |
| /// |
| /// # Safety |
| /// |
| /// See [`FFI_ArrowArrayStream::from_raw`] |
| pub unsafe fn from_raw(raw_stream: *mut FFI_ArrowArrayStream) -> Result<Self> { |
| Self::try_new(unsafe { FFI_ArrowArrayStream::from_raw(raw_stream) }) |
| } |
| |
| /// Get the last error from `ArrowArrayStreamReader` |
| fn get_stream_last_error(&mut self) -> Option<String> { |
| let get_last_error = self.stream.get_last_error?; |
| |
| let error_str = unsafe { get_last_error(&mut self.stream) }; |
| if error_str.is_null() { |
| return None; |
| } |
| |
| let error_str = unsafe { CStr::from_ptr(error_str) }; |
| Some(error_str.to_string_lossy().to_string()) |
| } |
| } |
| |
| impl Iterator for ArrowArrayStreamReader { |
| type Item = Result<RecordBatch>; |
| |
| fn next(&mut self) -> Option<Self::Item> { |
| let mut array = FFI_ArrowArray::empty(); |
| |
| let ret_code = unsafe { self.stream.get_next.unwrap()(&mut self.stream, &mut array) }; |
| |
| if ret_code == 0 { |
| // The end of stream has been reached |
| if array.is_released() { |
| return None; |
| } |
| |
| let result = unsafe { |
| from_ffi_and_data_type(array, DataType::Struct(self.schema().fields().clone())) |
| }; |
| Some(result.and_then(|data| { |
| RecordBatch::try_new(self.schema.clone(), StructArray::from(data).into_parts().1) |
| })) |
| } else { |
| let last_error = self.get_stream_last_error(); |
| let err = ArrowError::CDataInterface(last_error.unwrap()); |
| Some(Err(err)) |
| } |
| } |
| } |
| |
| impl RecordBatchReader for ArrowArrayStreamReader { |
| fn schema(&self) -> SchemaRef { |
| self.schema.clone() |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| use std::collections::HashMap; |
| |
| use arrow_schema::Field; |
| |
| use crate::array::Int32Array; |
| use crate::ffi::from_ffi; |
| |
| struct TestRecordBatchReader { |
| schema: SchemaRef, |
| iter: Box<dyn Iterator<Item = Result<RecordBatch>> + Send>, |
| } |
| |
| impl TestRecordBatchReader { |
| pub fn new( |
| schema: SchemaRef, |
| iter: Box<dyn Iterator<Item = Result<RecordBatch>> + Send>, |
| ) -> Box<TestRecordBatchReader> { |
| Box::new(TestRecordBatchReader { schema, iter }) |
| } |
| } |
| |
| impl Iterator for TestRecordBatchReader { |
| type Item = Result<RecordBatch>; |
| |
| fn next(&mut self) -> Option<Self::Item> { |
| self.iter.next() |
| } |
| } |
| |
| impl RecordBatchReader for TestRecordBatchReader { |
| fn schema(&self) -> SchemaRef { |
| self.schema.clone() |
| } |
| } |
| |
| fn _test_round_trip_export(arrays: Vec<Arc<dyn Array>>) -> Result<()> { |
| let metadata = HashMap::from([("foo".to_owned(), "bar".to_owned())]); |
| let schema = Arc::new(Schema::new_with_metadata( |
| vec![ |
| Field::new("a", arrays[0].data_type().clone(), true) |
| .with_metadata(metadata.clone()), |
| Field::new("b", arrays[1].data_type().clone(), true) |
| .with_metadata(metadata.clone()), |
| Field::new("c", arrays[2].data_type().clone(), true) |
| .with_metadata(metadata.clone()), |
| ], |
| metadata, |
| )); |
| let batch = RecordBatch::try_new(schema.clone(), arrays).unwrap(); |
| let iter = Box::new(vec![batch.clone(), batch.clone()].into_iter().map(Ok)) as _; |
| |
| let reader = TestRecordBatchReader::new(schema.clone(), iter); |
| |
| // Export a `RecordBatchReader` through `FFI_ArrowArrayStream` |
| let mut ffi_stream = FFI_ArrowArrayStream::new(reader); |
| |
| // Get schema from `FFI_ArrowArrayStream` |
| let mut ffi_schema = FFI_ArrowSchema::empty(); |
| let ret_code = unsafe { get_schema(&mut ffi_stream, &mut ffi_schema) }; |
| assert_eq!(ret_code, 0); |
| |
| let exported_schema = Schema::try_from(&ffi_schema).unwrap(); |
| assert_eq!(&exported_schema, schema.as_ref()); |
| |
| // Get array from `FFI_ArrowArrayStream` |
| let mut produced_batches = vec![]; |
| loop { |
| let mut ffi_array = FFI_ArrowArray::empty(); |
| let ret_code = unsafe { get_next(&mut ffi_stream, &mut ffi_array) }; |
| assert_eq!(ret_code, 0); |
| |
| // The end of stream has been reached |
| if ffi_array.is_released() { |
| break; |
| } |
| |
| let array = unsafe { from_ffi(ffi_array, &ffi_schema) }.unwrap(); |
| |
| let record_batch = RecordBatch::try_new( |
| SchemaRef::from(exported_schema.clone()), |
| StructArray::from(array).into_parts().1, |
| ) |
| .unwrap(); |
| produced_batches.push(record_batch); |
| } |
| |
| assert_eq!(produced_batches, vec![batch.clone(), batch]); |
| |
| Ok(()) |
| } |
| |
| fn _test_round_trip_import(arrays: Vec<Arc<dyn Array>>) -> Result<()> { |
| let metadata = HashMap::from([("foo".to_owned(), "bar".to_owned())]); |
| let schema = Arc::new(Schema::new_with_metadata( |
| vec![ |
| Field::new("a", arrays[0].data_type().clone(), true) |
| .with_metadata(metadata.clone()), |
| Field::new("b", arrays[1].data_type().clone(), true) |
| .with_metadata(metadata.clone()), |
| Field::new("c", arrays[2].data_type().clone(), true) |
| .with_metadata(metadata.clone()), |
| ], |
| metadata, |
| )); |
| let batch = RecordBatch::try_new(schema.clone(), arrays).unwrap(); |
| let iter = Box::new(vec![batch.clone(), batch.clone()].into_iter().map(Ok)) as _; |
| |
| let reader = TestRecordBatchReader::new(schema.clone(), iter); |
| |
| // Import through `FFI_ArrowArrayStream` as `ArrowArrayStreamReader` |
| let stream = FFI_ArrowArrayStream::new(reader); |
| let stream_reader = ArrowArrayStreamReader::try_new(stream).unwrap(); |
| |
| let imported_schema = stream_reader.schema(); |
| assert_eq!(imported_schema, schema); |
| |
| let mut produced_batches = vec![]; |
| for batch in stream_reader { |
| produced_batches.push(batch.unwrap()); |
| } |
| |
| assert_eq!(produced_batches, vec![batch.clone(), batch]); |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_stream_round_trip_export() -> Result<()> { |
| let array = Int32Array::from(vec![Some(2), None, Some(1), None]); |
| let array: Arc<dyn Array> = Arc::new(array); |
| |
| _test_round_trip_export(vec![array.clone(), array.clone(), array]) |
| } |
| |
| #[test] |
| fn test_stream_round_trip_import() -> Result<()> { |
| let array = Int32Array::from(vec![Some(2), None, Some(1), None]); |
| let array: Arc<dyn Array> = Arc::new(array); |
| |
| _test_round_trip_import(vec![array.clone(), array.clone(), array]) |
| } |
| |
| #[test] |
| fn test_error_import() -> Result<()> { |
| let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])); |
| |
| let iter = Box::new(vec![Err(ArrowError::MemoryError("".to_string()))].into_iter()); |
| |
| let reader = TestRecordBatchReader::new(schema.clone(), iter); |
| |
| // Import through `FFI_ArrowArrayStream` as `ArrowArrayStreamReader` |
| let stream = FFI_ArrowArrayStream::new(reader); |
| let stream_reader = ArrowArrayStreamReader::try_new(stream).unwrap(); |
| |
| let imported_schema = stream_reader.schema(); |
| assert_eq!(imported_schema, schema); |
| |
| let mut produced_batches = vec![]; |
| for batch in stream_reader { |
| produced_batches.push(batch); |
| } |
| |
| // The results should outlive the lifetime of the stream itself. |
| assert_eq!(produced_batches.len(), 1); |
| assert!(produced_batches[0].is_err()); |
| |
| Ok(()) |
| } |
| } |