blob: 291082d10464ad90c49ce747b787b0d45447f028 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use crate::common::bit;
use crate::execution::operators::ExecutionError;
use arrow::buffer::Buffer as ArrowBuffer;
use std::{
alloc::{handle_alloc_error, Layout},
ptr::NonNull,
sync::Arc,
};
/// A buffer implementation. This is very similar to Arrow's [`MutableBuffer`] implementation,
/// except that there are two modes depending on whether `owned` is true or false.
///
/// If `owned` is true, this behaves the same way as a Arrow [`MutableBuffer`], and the struct is
/// the unique owner for the memory it wraps. The holder of this buffer can read or write the
/// buffer, and the buffer itself will be released when it goes out of scope.
///
/// Also note that, in `owned` mode, the buffer is always filled with 0s, and its length is always
/// equal to its capacity. It's up to the caller to decide which part of the buffer contains valid
/// data.
///
/// If `owned` is false, this buffer is an alias to another buffer. The buffer itself becomes
/// immutable and can only be read.
#[derive(Debug)]
pub struct CometBuffer {
data: NonNull<u8>,
len: usize,
capacity: usize,
/// Whether this buffer owns the data it points to.
owned: bool,
/// The allocation instance for this buffer.
allocation: Arc<CometBufferAllocation>,
}
unsafe impl Sync for CometBuffer {}
unsafe impl Send for CometBuffer {}
/// All buffers are aligned to 64 bytes.
const ALIGNMENT: usize = 64;
impl CometBuffer {
/// Initializes a owned buffer filled with 0.
pub fn new(capacity: usize) -> Self {
let aligned_capacity = bit::round_upto_power_of_2(capacity, ALIGNMENT);
unsafe {
let layout = Layout::from_size_align_unchecked(aligned_capacity, ALIGNMENT);
let ptr = std::alloc::alloc_zeroed(layout);
Self {
data: NonNull::new(ptr).unwrap_or_else(|| handle_alloc_error(layout)),
len: aligned_capacity,
capacity: aligned_capacity,
owned: true,
allocation: Arc::new(CometBufferAllocation::new()),
}
}
}
pub fn from_ptr(ptr: *const u8, len: usize, capacity: usize) -> Self {
assert_eq!(
capacity % ALIGNMENT,
0,
"input buffer is not aligned to {} bytes",
ALIGNMENT
);
Self {
data: NonNull::new(ptr as *mut u8).unwrap_or_else(|| {
panic!(
"cannot create CometBuffer from (ptr: {:?}, len: {}, capacity: {}",
ptr, len, capacity
)
}),
len,
capacity,
owned: false,
allocation: Arc::new(CometBufferAllocation::new()),
}
}
/// Returns the capacity of this buffer.
pub fn capacity(&self) -> usize {
self.capacity
}
/// Returns the length (i.e., number of bytes) in this buffer.
pub fn len(&self) -> usize {
self.len
}
/// Whether this buffer is empty.
pub fn is_empty(&self) -> bool {
self.len == 0
}
/// Returns the data stored in this buffer as a slice.
pub fn as_slice(&self) -> &[u8] {
self
}
/// Returns the data stored in this buffer as a mutable slice.
pub fn as_slice_mut(&mut self) -> &mut [u8] {
debug_assert!(self.owned, "cannot modify un-owned buffer");
self
}
/// Extends this buffer (must be an owned buffer) by appending bytes from `src`,
/// starting from `offset`.
pub fn extend_from_slice(&mut self, offset: usize, src: &[u8]) {
debug_assert!(self.owned, "cannot modify un-owned buffer");
debug_assert!(
offset + src.len() <= self.capacity(),
"buffer overflow, offset = {}, src.len = {}, capacity = {}",
offset,
src.len(),
self.capacity()
);
unsafe {
let dst = self.data.as_ptr().add(offset);
std::ptr::copy_nonoverlapping(src.as_ptr(), dst, src.len())
}
}
/// Returns a raw pointer to this buffer's internal memory
/// This pointer is guaranteed to be aligned along cache-lines.
#[inline]
pub const fn as_ptr(&self) -> *const u8 {
self.data.as_ptr()
}
/// Returns a mutable raw pointer to this buffer's internal memory
/// This pointer is guaranteed to be aligned along cache-lines.
#[inline]
pub fn as_mut_ptr(&mut self) -> *mut u8 {
debug_assert!(self.owned, "cannot modify un-owned buffer");
self.data.as_ptr()
}
/// Returns an immutable Arrow buffer on the content of this buffer.
///
/// # Safety
///
/// This function is highly unsafe since it leaks the raw pointer to the memory region that the
/// originally this buffer is tracking. Because of this, the caller of this function is
/// expected to make sure the returned immutable [`ArrowBuffer`] will never live longer than the
/// this buffer. Otherwise it will result to dangling pointers.
///
/// In the particular case of the columnar reader, we'll guarantee the above since the reader
/// itself is closed at the very end, after the Spark task is completed (either successfully or
/// unsuccessfully) through task completion listener.
///
/// When re-using [`MutableVector`] in Comet native operators, across multiple input batches,
/// because of the iterator-style pattern, the content of the original mutable buffer will only
/// be updated once upstream operators fully consumed the previous output batch. For breaking
/// operators, they are responsible for copying content out of the buffers.
pub unsafe fn to_arrow(&self) -> Result<ArrowBuffer, ExecutionError> {
let ptr = NonNull::new_unchecked(self.data.as_ptr());
self.check_reference()?;
Ok(ArrowBuffer::from_custom_allocation(
ptr,
self.len,
Arc::<CometBufferAllocation>::clone(&self.allocation),
))
}
/// Checks if this buffer is exclusively owned by Comet. If not, an error is returned.
/// We run this check when we want to update the buffer. If the buffer is also shared by
/// other components, e.g. one DataFusion operator stores the buffer, Comet cannot safely
/// modify the buffer.
pub fn check_reference(&self) -> Result<(), ExecutionError> {
if Arc::strong_count(&self.allocation) > 1 {
Err(ExecutionError::GeneralError(
"Error on modifying a buffer which is not exclusively owned by Comet".to_string(),
))
} else {
Ok(())
}
}
/// Resets this buffer by filling all bytes with zeros.
pub fn reset(&mut self) {
debug_assert!(self.owned, "cannot modify un-owned buffer");
unsafe {
std::ptr::write_bytes(self.as_mut_ptr(), 0, self.len);
}
}
/// Resize this buffer to the `new_capacity`. For additional bytes allocated, they are filled
/// with 0. if `new_capacity` is less than the current capacity of this buffer, this is a no-op.
#[inline(always)]
pub fn resize(&mut self, new_capacity: usize) {
debug_assert!(self.owned, "cannot modify un-owned buffer");
if new_capacity > self.len {
let (ptr, new_capacity) =
unsafe { Self::reallocate(self.data, self.capacity, new_capacity) };
let diff = new_capacity - self.len;
self.data = ptr;
self.capacity = new_capacity;
// write the value
unsafe { self.data.as_ptr().add(self.len).write_bytes(0, diff) };
self.len = new_capacity;
}
}
unsafe fn reallocate(
ptr: NonNull<u8>,
old_capacity: usize,
new_capacity: usize,
) -> (NonNull<u8>, usize) {
let new_capacity = bit::round_upto_power_of_2(new_capacity, ALIGNMENT);
let new_capacity = std::cmp::max(new_capacity, old_capacity * 2);
let raw_ptr = std::alloc::realloc(
ptr.as_ptr(),
Layout::from_size_align_unchecked(old_capacity, ALIGNMENT),
new_capacity,
);
let ptr = NonNull::new(raw_ptr).unwrap_or_else(|| {
handle_alloc_error(Layout::from_size_align_unchecked(new_capacity, ALIGNMENT))
});
(ptr, new_capacity)
}
}
impl Drop for CometBuffer {
fn drop(&mut self) {
if self.owned {
unsafe {
std::alloc::dealloc(
self.data.as_ptr(),
Layout::from_size_align_unchecked(self.capacity, ALIGNMENT),
)
}
}
}
}
impl PartialEq for CometBuffer {
fn eq(&self, other: &CometBuffer) -> bool {
if self.data.as_ptr() == other.data.as_ptr() {
return true;
}
if self.len != other.len {
return false;
}
if self.capacity != other.capacity {
return false;
}
self.as_slice() == other.as_slice()
}
}
impl std::ops::Deref for CometBuffer {
type Target = [u8];
fn deref(&self) -> &[u8] {
unsafe { std::slice::from_raw_parts(self.as_ptr(), self.len) }
}
}
impl std::ops::DerefMut for CometBuffer {
fn deref_mut(&mut self) -> &mut [u8] {
assert!(self.owned, "cannot modify un-owned buffer");
unsafe { std::slice::from_raw_parts_mut(self.as_mut_ptr(), self.capacity) }
}
}
#[derive(Debug)]
struct CometBufferAllocation {}
impl CometBufferAllocation {
fn new() -> Self {
Self {}
}
}
#[cfg(test)]
mod tests {
use super::*;
use arrow::buffer::Buffer as ArrowBuffer;
#[test]
fn test_buffer_new() {
let buf = CometBuffer::new(63);
assert_eq!(64, buf.capacity());
assert_eq!(64, buf.len());
assert!(!buf.is_empty());
}
#[test]
fn test_resize() {
let mut buf = CometBuffer::new(1);
assert_eq!(64, buf.capacity());
assert_eq!(64, buf.len());
buf.resize(100);
assert_eq!(128, buf.capacity());
assert_eq!(128, buf.len());
// resize with less capacity is no-op
buf.resize(20);
assert_eq!(128, buf.capacity());
assert_eq!(128, buf.len());
}
#[test]
fn test_extend_from_slice() {
let mut buf = CometBuffer::new(100);
buf.extend_from_slice(0, b"hello");
assert_eq!(b"hello", &buf.as_slice()[0..5]);
buf.extend_from_slice(5, b" world");
assert_eq!(b"hello world", &buf.as_slice()[0..11]);
buf.reset();
buf.extend_from_slice(0, b"hello arrow");
assert_eq!(b"hello arrow", &buf.as_slice()[0..11]);
}
#[test]
fn test_to_arrow() {
let mut buf = CometBuffer::new(1);
let str = b"aaaa bbbb cccc dddd";
buf.extend_from_slice(0, str.as_slice());
assert_eq!(64, buf.len());
assert_eq!(64, buf.capacity());
assert_eq!(b"aaaa bbbb cccc dddd", &buf.as_slice()[0..str.len()]);
unsafe {
let immutable_buf: ArrowBuffer = buf.to_arrow().unwrap();
assert_eq!(64, immutable_buf.len());
assert_eq!(str, &immutable_buf.as_slice()[0..str.len()]);
}
}
#[test]
fn test_unowned() {
let arrow_buf = ArrowBuffer::from(b"hello comet");
let buf = CometBuffer::from_ptr(arrow_buf.as_ptr(), arrow_buf.len(), arrow_buf.capacity());
assert_eq!(11, buf.len());
assert_eq!(64, buf.capacity());
assert_eq!(b"hello comet", &buf.as_slice()[0..11]);
unsafe {
let arrow_buf2 = buf.to_arrow().unwrap();
assert_eq!(arrow_buf, arrow_buf2);
}
}
}