blob: 9e6f2a56e770fd25ab8e05feff156378e40d3422 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
/// Utils for array vector, etc.
use crate::errors::ExpressionError;
use crate::execution::operators::ExecutionError;
use arrow::{
array::ArrayData,
error::ArrowError,
ffi::{from_ffi, FFI_ArrowArray, FFI_ArrowSchema},
};
impl From<ArrowError> for ExecutionError {
fn from(error: ArrowError) -> ExecutionError {
ExecutionError::ArrowError(error.to_string())
}
}
impl From<ArrowError> for ExpressionError {
fn from(error: ArrowError) -> ExpressionError {
ExpressionError::ArrowError(error.to_string())
}
}
impl From<ExpressionError> for ArrowError {
fn from(error: ExpressionError) -> ArrowError {
ArrowError::ComputeError(error.to_string())
}
}
pub trait SparkArrowConvert {
/// Build Arrow Arrays from C data interface passed from Spark.
/// It accepts a tuple (ArrowArray address, ArrowSchema address).
fn from_spark(addresses: (i64, i64)) -> Result<Self, ExecutionError>
where
Self: Sized;
/// Move Arrow Arrays to C data interface.
fn move_to_spark(&self, array: i64, schema: i64) -> Result<(), ExecutionError>;
}
impl SparkArrowConvert for ArrayData {
fn from_spark(addresses: (i64, i64)) -> Result<Self, ExecutionError> {
let (array_ptr, schema_ptr) = addresses;
let array_ptr = array_ptr as *mut FFI_ArrowArray;
let schema_ptr = schema_ptr as *mut FFI_ArrowSchema;
if array_ptr.is_null() || schema_ptr.is_null() {
return Err(ExecutionError::ArrowError(
"At least one of passed pointers is null".to_string(),
));
};
// `ArrowArray` will convert raw pointers back to `Arc`. No worries
// about memory leak.
let mut ffi_array = unsafe {
let array_data = std::ptr::replace(array_ptr, FFI_ArrowArray::empty());
let schema_data = std::ptr::replace(schema_ptr, FFI_ArrowSchema::empty());
from_ffi(array_data, &schema_data)?
};
// Align imported buffers from Java.
ffi_array.align_buffers();
Ok(ffi_array)
}
/// Move this ArrowData to pointers of Arrow C data interface.
fn move_to_spark(&self, array: i64, schema: i64) -> Result<(), ExecutionError> {
let array_ptr = array as *mut FFI_ArrowArray;
let schema_ptr = schema as *mut FFI_ArrowSchema;
let array_align = std::mem::align_of::<FFI_ArrowArray>();
let schema_align = std::mem::align_of::<FFI_ArrowSchema>();
// Check if the pointer alignment is correct.
if array_ptr.align_offset(array_align) != 0 || schema_ptr.align_offset(schema_align) != 0 {
unsafe {
std::ptr::write_unaligned(array_ptr, FFI_ArrowArray::new(self));
std::ptr::write_unaligned(schema_ptr, FFI_ArrowSchema::try_from(self.data_type())?);
}
} else {
// SAFETY: `array_ptr` and `schema_ptr` are aligned correctly.
debug_assert_eq!(
array_ptr.align_offset(array_align),
0,
"move_to_spark: array_ptr not aligned"
);
debug_assert_eq!(
schema_ptr.align_offset(schema_align),
0,
"move_to_spark: schema_ptr not aligned"
);
unsafe {
std::ptr::write(array_ptr, FFI_ArrowArray::new(self));
std::ptr::write(schema_ptr, FFI_ArrowSchema::try_from(self.data_type())?);
}
}
Ok(())
}
}
/// Converts a slice of bytes to i128. The bytes are serialized in big-endian order by
/// `BigInteger.toByteArray()` in Java.
pub fn bytes_to_i128(slice: &[u8]) -> i128 {
let mut bytes = [0; 16];
let mut i = 0;
while i != 16 && i != slice.len() {
bytes[i] = slice[slice.len() - 1 - i];
i += 1;
}
// if the decimal is negative, we need to flip all the bits
if (slice[0] as i8) < 0 {
while i < 16 {
bytes[i] = !bytes[i];
i += 1;
}
}
i128::from_le_bytes(bytes)
}