blob: f4aeae9c5ac0332f8e01faa3e875d40fdcb94c49 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::fmt::Debug;
use std::iter::FromIterator;
use std::ptr::NonNull;
use std::sync::Arc;
use std::{convert::AsRef, usize};
use crate::util::bit_chunk_iterator::BitChunks;
use crate::{
bytes::{Bytes, Deallocation},
datatypes::ArrowNativeType,
ffi,
};
use super::ops::bitwise_unary_op_helper;
use super::MutableBuffer;
/// Buffer represents a contiguous memory region that can be shared with other buffers and across
/// thread boundaries.
#[derive(Clone, PartialEq, Debug)]
pub struct Buffer {
/// the internal byte buffer.
data: Arc<Bytes>,
/// The offset into the buffer.
offset: usize,
}
impl Buffer {
/// Auxiliary method to create a new Buffer
#[inline]
pub fn from_bytes(bytes: Bytes) -> Self {
Buffer {
data: Arc::new(bytes),
offset: 0,
}
}
/// Initializes a [Buffer] from a slice of items.
pub fn from_slice_ref<U: ArrowNativeType, T: AsRef<[U]>>(items: &T) -> Self {
let slice = items.as_ref();
let capacity = slice.len() * std::mem::size_of::<U>();
let mut buffer = MutableBuffer::with_capacity(capacity);
buffer.extend_from_slice(slice);
buffer.into()
}
/// Creates a buffer from an existing memory region (must already be byte-aligned), this
/// `Buffer` will free this piece of memory when dropped.
///
/// # Arguments
///
/// * `ptr` - Pointer to raw parts
/// * `len` - Length of raw parts in **bytes**
/// * `capacity` - Total allocated memory for the pointer `ptr`, in **bytes**
///
/// # Safety
///
/// This function is unsafe as there is no guarantee that the given pointer is valid for `len`
/// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed.
pub unsafe fn from_raw_parts(ptr: NonNull<u8>, len: usize, capacity: usize) -> Self {
assert!(len <= capacity);
Buffer::build_with_arguments(ptr, len, Deallocation::Native(capacity))
}
/// Creates a buffer from an existing memory region (must already be byte-aligned), this
/// `Buffer` **does not** free this piece of memory when dropped.
///
/// # Arguments
///
/// * `ptr` - Pointer to raw parts
/// * `len` - Length of raw parts in **bytes**
/// * `data` - An [ffi::FFI_ArrowArray] with the data
///
/// # Safety
///
/// This function is unsafe as there is no guarantee that the given pointer is valid for `len`
/// bytes and that the foreign deallocator frees the region.
pub unsafe fn from_unowned(
ptr: NonNull<u8>,
len: usize,
data: Arc<ffi::FFI_ArrowArray>,
) -> Self {
Buffer::build_with_arguments(ptr, len, Deallocation::Foreign(data))
}
/// Auxiliary method to create a new Buffer
unsafe fn build_with_arguments(
ptr: NonNull<u8>,
len: usize,
deallocation: Deallocation,
) -> Self {
let bytes = Bytes::new(ptr, len, deallocation);
Buffer {
data: Arc::new(bytes),
offset: 0,
}
}
/// Returns the number of bytes in the buffer
pub fn len(&self) -> usize {
self.data.len() - self.offset
}
/// Returns the capacity of this buffer.
/// For exernally owned buffers, this returns zero
pub fn capacity(&self) -> usize {
self.data.capacity()
}
/// Returns whether the buffer is empty.
pub fn is_empty(&self) -> bool {
self.data.len() - self.offset == 0
}
/// Returns the byte slice stored in this buffer
pub fn as_slice(&self) -> &[u8] {
&self.data[self.offset..]
}
/// Returns a new [Buffer] that is a slice of this buffer starting at `offset`.
/// Doing so allows the same memory region to be shared between buffers.
/// # Panics
/// Panics iff `offset` is larger than `len`.
pub fn slice(&self, offset: usize) -> Self {
assert!(
offset <= self.len(),
"the offset of the new Buffer cannot exceed the existing length"
);
Self {
data: self.data.clone(),
offset: self.offset + offset,
}
}
/// Returns a pointer to the start of this buffer.
///
/// Note that this should be used cautiously, and the returned pointer should not be
/// stored anywhere, to avoid dangling pointers.
pub fn as_ptr(&self) -> *const u8 {
unsafe { self.data.ptr().as_ptr().add(self.offset) }
}
/// View buffer as typed slice.
///
/// # Safety
///
/// `ArrowNativeType` is public so that it can be used as a trait bound for other public
/// components, such as the `ToByteSlice` trait. However, this means that it can be
/// implemented by user defined types, which it is not intended for.
///
/// Also `typed_data::<bool>` is unsafe as `0x00` and `0x01` are the only valid values for
/// `bool` in Rust. However, `bool` arrays in Arrow are bit-packed which breaks this condition.
/// View buffer as typed slice.
pub unsafe fn typed_data<T: ArrowNativeType + num::Num>(&self) -> &[T] {
// JUSTIFICATION
// Benefit
// Many of the buffers represent specific types, and consumers of `Buffer` often need to re-interpret them.
// Soundness
// * The pointer is non-null by construction
// * alignment asserted below.
let (prefix, offsets, suffix) = self.as_slice().align_to::<T>();
assert!(prefix.is_empty() && suffix.is_empty());
offsets
}
/// Returns a slice of this buffer starting at a certain bit offset.
/// If the offset is byte-aligned the returned buffer is a shallow clone,
/// otherwise a new buffer is allocated and filled with a copy of the bits in the range.
pub fn bit_slice(&self, offset: usize, len: usize) -> Self {
if offset % 8 == 0 && len % 8 == 0 {
return self.slice(offset / 8);
}
bitwise_unary_op_helper(&self, offset, len, |a| a)
}
/// Returns a `BitChunks` instance which can be used to iterate over this buffers bits
/// in larger chunks and starting at arbitrary bit offsets.
/// Note that both `offset` and `length` are measured in bits.
pub fn bit_chunks(&self, offset: usize, len: usize) -> BitChunks {
BitChunks::new(&self.as_slice(), offset, len)
}
/// Returns the number of 1-bits in this buffer.
pub fn count_set_bits(&self) -> usize {
let len_in_bits = self.len() * 8;
// self.offset is already taken into consideration by the bit_chunks implementation
self.count_set_bits_offset(0, len_in_bits)
}
/// Returns the number of 1-bits in this buffer, starting from `offset` with `length` bits
/// inspected. Note that both `offset` and `length` are measured in bits.
pub fn count_set_bits_offset(&self, offset: usize, len: usize) -> usize {
let chunks = self.bit_chunks(offset, len);
let mut count = chunks.iter().map(|c| c.count_ones() as usize).sum();
count += chunks.remainder_bits().count_ones() as usize;
count
}
}
/// Creating a `Buffer` instance by copying the memory from a `AsRef<[u8]>` into a newly
/// allocated memory region.
impl<T: AsRef<[u8]>> From<T> for Buffer {
fn from(p: T) -> Self {
// allocate aligned memory buffer
let slice = p.as_ref();
let len = slice.len();
let mut buffer = MutableBuffer::new(len);
buffer.extend_from_slice(slice);
buffer.into()
}
}
/// Creating a `Buffer` instance by storing the boolean values into the buffer
impl std::iter::FromIterator<bool> for Buffer {
fn from_iter<I>(iter: I) -> Self
where
I: IntoIterator<Item = bool>,
{
MutableBuffer::from_iter(iter).into()
}
}
impl std::ops::Deref for Buffer {
type Target = [u8];
fn deref(&self) -> &[u8] {
unsafe { std::slice::from_raw_parts(self.as_ptr(), self.len()) }
}
}
unsafe impl Sync for Buffer {}
unsafe impl Send for Buffer {}
impl From<MutableBuffer> for Buffer {
#[inline]
fn from(buffer: MutableBuffer) -> Self {
buffer.into_buffer()
}
}
impl Buffer {
/// Creates a [`Buffer`] from an [`Iterator`] with a trusted (upper) length.
/// Prefer this to `collect` whenever possible, as it is ~60% faster.
/// # Example
/// ```
/// # use arrow::buffer::Buffer;
/// let v = vec![1u32];
/// let iter = v.iter().map(|x| x * 2);
/// let buffer = unsafe { Buffer::from_trusted_len_iter(iter) };
/// assert_eq!(buffer.len(), 4) // u32 has 4 bytes
/// ```
/// # Safety
/// This method assumes that the iterator's size is correct and is undefined behavior
/// to use it on an iterator that reports an incorrect length.
// This implementation is required for two reasons:
// 1. there is no trait `TrustedLen` in stable rust and therefore
// we can't specialize `extend` for `TrustedLen` like `Vec` does.
// 2. `from_trusted_len_iter` is faster.
#[inline]
pub unsafe fn from_trusted_len_iter<T: ArrowNativeType, I: Iterator<Item = T>>(
iterator: I,
) -> Self {
MutableBuffer::from_trusted_len_iter(iterator).into()
}
/// Creates a [`Buffer`] from an [`Iterator`] with a trusted (upper) length or errors
/// if any of the items of the iterator is an error.
/// Prefer this to `collect` whenever possible, as it is ~60% faster.
/// # Safety
/// This method assumes that the iterator's size is correct and is undefined behavior
/// to use it on an iterator that reports an incorrect length.
#[inline]
pub unsafe fn try_from_trusted_len_iter<
E,
T: ArrowNativeType,
I: Iterator<Item = std::result::Result<T, E>>,
>(
iterator: I,
) -> std::result::Result<Self, E> {
Ok(MutableBuffer::try_from_trusted_len_iter(iterator)?.into())
}
}
impl<T: ArrowNativeType> FromIterator<T> for Buffer {
fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
let mut iterator = iter.into_iter();
let size = std::mem::size_of::<T>();
// first iteration, which will likely reserve sufficient space for the buffer.
let mut buffer = match iterator.next() {
None => MutableBuffer::new(0),
Some(element) => {
let (lower, _) = iterator.size_hint();
let mut buffer = MutableBuffer::new(lower.saturating_add(1) * size);
unsafe {
std::ptr::write(buffer.as_mut_ptr() as *mut T, element);
buffer.set_len(size);
}
buffer
}
};
buffer.extend_from_iter(iterator);
buffer.into()
}
}
#[cfg(test)]
mod tests {
use std::thread;
use super::*;
#[test]
fn test_buffer_data_equality() {
let buf1 = Buffer::from(&[0, 1, 2, 3, 4]);
let buf2 = Buffer::from(&[0, 1, 2, 3, 4]);
assert_eq!(buf1, buf2);
// slice with same offset should still preserve equality
let buf3 = buf1.slice(2);
assert_ne!(buf1, buf3);
let buf4 = buf2.slice(2);
assert_eq!(buf3, buf4);
// Different capacities should still preserve equality
let mut buf2 = MutableBuffer::new(65);
buf2.extend_from_slice(&[0u8, 1, 2, 3, 4]);
let buf2 = buf2.into();
assert_eq!(buf1, buf2);
// unequal because of different elements
let buf2 = Buffer::from(&[0, 0, 2, 3, 4]);
assert_ne!(buf1, buf2);
// unequal because of different length
let buf2 = Buffer::from(&[0, 1, 2, 3]);
assert_ne!(buf1, buf2);
}
#[test]
fn test_from_raw_parts() {
let buf = Buffer::from(&[0, 1, 2, 3, 4]);
assert_eq!(5, buf.len());
assert!(!buf.as_ptr().is_null());
assert_eq!([0, 1, 2, 3, 4], buf.as_slice());
}
#[test]
fn test_from_vec() {
let buf = Buffer::from(&[0, 1, 2, 3, 4]);
assert_eq!(5, buf.len());
assert!(!buf.as_ptr().is_null());
assert_eq!([0, 1, 2, 3, 4], buf.as_slice());
}
#[test]
fn test_copy() {
let buf = Buffer::from(&[0, 1, 2, 3, 4]);
let buf2 = buf;
assert_eq!(5, buf2.len());
assert_eq!(64, buf2.capacity());
assert!(!buf2.as_ptr().is_null());
assert_eq!([0, 1, 2, 3, 4], buf2.as_slice());
}
#[test]
fn test_slice() {
let buf = Buffer::from(&[2, 4, 6, 8, 10]);
let buf2 = buf.slice(2);
assert_eq!([6, 8, 10], buf2.as_slice());
assert_eq!(3, buf2.len());
assert_eq!(unsafe { buf.as_ptr().offset(2) }, buf2.as_ptr());
let buf3 = buf2.slice(1);
assert_eq!([8, 10], buf3.as_slice());
assert_eq!(2, buf3.len());
assert_eq!(unsafe { buf.as_ptr().offset(3) }, buf3.as_ptr());
let buf4 = buf.slice(5);
let empty_slice: [u8; 0] = [];
assert_eq!(empty_slice, buf4.as_slice());
assert_eq!(0, buf4.len());
assert!(buf4.is_empty());
assert_eq!(buf2.slice(2).as_slice(), &[10]);
}
#[test]
#[should_panic(
expected = "the offset of the new Buffer cannot exceed the existing length"
)]
fn test_slice_offset_out_of_bound() {
let buf = Buffer::from(&[2, 4, 6, 8, 10]);
buf.slice(6);
}
#[test]
fn test_access_concurrently() {
let buffer = Buffer::from(vec![1, 2, 3, 4, 5]);
let buffer2 = buffer.clone();
assert_eq!([1, 2, 3, 4, 5], buffer.as_slice());
let buffer_copy = thread::spawn(move || {
// access buffer in another thread.
buffer
})
.join();
assert!(buffer_copy.is_ok());
assert_eq!(buffer2, buffer_copy.ok().unwrap());
}
macro_rules! check_as_typed_data {
($input: expr, $native_t: ty) => {{
let buffer = Buffer::from_slice_ref($input);
let slice: &[$native_t] = unsafe { buffer.typed_data::<$native_t>() };
assert_eq!($input, slice);
}};
}
#[test]
#[allow(clippy::float_cmp)]
fn test_as_typed_data() {
check_as_typed_data!(&[1i8, 3i8, 6i8], i8);
check_as_typed_data!(&[1u8, 3u8, 6u8], u8);
check_as_typed_data!(&[1i16, 3i16, 6i16], i16);
check_as_typed_data!(&[1i32, 3i32, 6i32], i32);
check_as_typed_data!(&[1i64, 3i64, 6i64], i64);
check_as_typed_data!(&[1u16, 3u16, 6u16], u16);
check_as_typed_data!(&[1u32, 3u32, 6u32], u32);
check_as_typed_data!(&[1u64, 3u64, 6u64], u64);
check_as_typed_data!(&[1f32, 3f32, 6f32], f32);
check_as_typed_data!(&[1f64, 3f64, 6f64], f64);
}
#[test]
fn test_count_bits() {
assert_eq!(0, Buffer::from(&[0b00000000]).count_set_bits());
assert_eq!(8, Buffer::from(&[0b11111111]).count_set_bits());
assert_eq!(3, Buffer::from(&[0b00001101]).count_set_bits());
assert_eq!(6, Buffer::from(&[0b01001001, 0b01010010]).count_set_bits());
assert_eq!(16, Buffer::from(&[0b11111111, 0b11111111]).count_set_bits());
}
#[test]
fn test_count_bits_slice() {
assert_eq!(
0,
Buffer::from(&[0b11111111, 0b00000000])
.slice(1)
.count_set_bits()
);
assert_eq!(
8,
Buffer::from(&[0b11111111, 0b11111111])
.slice(1)
.count_set_bits()
);
assert_eq!(
3,
Buffer::from(&[0b11111111, 0b11111111, 0b00001101])
.slice(2)
.count_set_bits()
);
assert_eq!(
6,
Buffer::from(&[0b11111111, 0b01001001, 0b01010010])
.slice(1)
.count_set_bits()
);
assert_eq!(
16,
Buffer::from(&[0b11111111, 0b11111111, 0b11111111, 0b11111111])
.slice(2)
.count_set_bits()
);
}
#[test]
fn test_count_bits_offset_slice() {
assert_eq!(8, Buffer::from(&[0b11111111]).count_set_bits_offset(0, 8));
assert_eq!(3, Buffer::from(&[0b11111111]).count_set_bits_offset(0, 3));
assert_eq!(5, Buffer::from(&[0b11111111]).count_set_bits_offset(3, 5));
assert_eq!(1, Buffer::from(&[0b11111111]).count_set_bits_offset(3, 1));
assert_eq!(0, Buffer::from(&[0b11111111]).count_set_bits_offset(8, 0));
assert_eq!(2, Buffer::from(&[0b01010101]).count_set_bits_offset(0, 3));
assert_eq!(
16,
Buffer::from(&[0b11111111, 0b11111111]).count_set_bits_offset(0, 16)
);
assert_eq!(
10,
Buffer::from(&[0b11111111, 0b11111111]).count_set_bits_offset(0, 10)
);
assert_eq!(
10,
Buffer::from(&[0b11111111, 0b11111111]).count_set_bits_offset(3, 10)
);
assert_eq!(
8,
Buffer::from(&[0b11111111, 0b11111111]).count_set_bits_offset(8, 8)
);
assert_eq!(
5,
Buffer::from(&[0b11111111, 0b11111111]).count_set_bits_offset(11, 5)
);
assert_eq!(
0,
Buffer::from(&[0b11111111, 0b11111111]).count_set_bits_offset(16, 0)
);
assert_eq!(
2,
Buffer::from(&[0b01101101, 0b10101010]).count_set_bits_offset(7, 5)
);
assert_eq!(
4,
Buffer::from(&[0b01101101, 0b10101010]).count_set_bits_offset(7, 9)
);
}
}