blob: c6052817bfb6a36c1c4fd99aa1bc1ed72499cadc [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Low-level array data abstractions.
//!
//! Provides utilities for creating, manipulating, and converting Arrow arrays
//! made of primitive types, strings, and nested types.
use super::{ArrayData, ArrayDataBuilder, ByteView, data::new_buffers};
use crate::bit_mask::set_bits;
use arrow_buffer::buffer::{BooleanBuffer, NullBuffer};
use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer, bit_util, i256};
use arrow_schema::{ArrowError, DataType, IntervalUnit, UnionMode};
use half::f16;
use num_integer::Integer;
use std::mem;
mod boolean;
mod fixed_binary;
mod fixed_size_list;
mod list;
mod list_view;
mod null;
mod primitive;
mod run;
mod structure;
mod union;
mod utils;
mod variable_size;
type ExtendNullBits<'a> = Box<dyn Fn(&mut _MutableArrayData, usize, usize) + 'a>;
// function that extends `[start..start+len]` to the mutable array.
// this is dynamic because different data_types influence how buffers and children are extended.
type Extend<'a> = Box<dyn Fn(&mut _MutableArrayData, usize, usize, usize) + 'a>;
type ExtendNulls = Box<dyn Fn(&mut _MutableArrayData, usize)>;
/// A mutable [ArrayData] that knows how to freeze itself into an [ArrayData].
/// This is just a data container.
#[derive(Debug)]
struct _MutableArrayData<'a> {
pub data_type: DataType,
pub null_count: usize,
pub len: usize,
pub null_buffer: Option<MutableBuffer>,
// arrow specification only allows up to 3 buffers (2 ignoring the nulls above).
// Thus, we place them in the stack to avoid bound checks and greater data locality.
pub buffer1: MutableBuffer,
pub buffer2: MutableBuffer,
pub child_data: Vec<MutableArrayData<'a>>,
}
impl _MutableArrayData<'_> {
fn null_buffer(&mut self) -> &mut MutableBuffer {
self.null_buffer
.as_mut()
.expect("MutableArrayData not nullable")
}
}
fn build_extend_null_bits(array: &ArrayData, use_nulls: bool) -> ExtendNullBits<'_> {
if let Some(nulls) = array.nulls() {
let bytes = nulls.validity();
Box::new(move |mutable, start, len| {
let mutable_len = mutable.len;
let out = mutable.null_buffer();
utils::resize_for_bits(out, mutable_len + len);
mutable.null_count += set_bits(
out.as_slice_mut(),
bytes,
mutable_len,
nulls.offset() + start,
len,
);
})
} else if use_nulls {
Box::new(|mutable, _, len| {
let mutable_len = mutable.len;
let out = mutable.null_buffer();
utils::resize_for_bits(out, mutable_len + len);
let write_data = out.as_slice_mut();
(0..len).for_each(|i| {
bit_util::set_bit(write_data, mutable_len + i);
});
})
} else {
Box::new(|_, _, _| {})
}
}
/// Efficiently create an [ArrayData] from one or more existing [ArrayData]s by
/// copying chunks.
///
/// The main use case of this struct is to perform unary operations to arrays of
/// arbitrary types, such as `filter` and `take`.
///
/// # Example
/// ```
/// use arrow_buffer::Buffer;
/// use arrow_data::ArrayData;
/// use arrow_data::transform::MutableArrayData;
/// use arrow_schema::DataType;
/// fn i32_array(values: &[i32]) -> ArrayData {
/// ArrayData::try_new(DataType::Int32, 5, None, 0, vec![Buffer::from_slice_ref(values)], vec![]).unwrap()
/// }
/// let arr1 = i32_array(&[1, 2, 3, 4, 5]);
/// let arr2 = i32_array(&[6, 7, 8, 9, 10]);
/// // Create a mutable array for copying values from arr1 and arr2, with a capacity for 6 elements
/// let capacity = 3 * std::mem::size_of::<i32>();
/// let mut mutable = MutableArrayData::new(vec![&arr1, &arr2], false, 10);
/// // Copy the first 3 elements from arr1
/// mutable.extend(0, 0, 3);
/// // Copy the last 3 elements from arr2
/// mutable.extend(1, 2, 4);
/// // Complete the MutableArrayData into a new ArrayData
/// let frozen = mutable.freeze();
/// assert_eq!(frozen, i32_array(&[1, 2, 3, 8, 9, 10]));
/// ```
pub struct MutableArrayData<'a> {
/// Input arrays: the data being read FROM.
///
/// Note this is "dead code" because all actual references to the arrays are
/// stored in closures for extending values and nulls.
#[allow(dead_code)]
arrays: Vec<&'a ArrayData>,
/// In progress output array: The data being written TO
///
/// Note these fields are in a separate struct, [_MutableArrayData], as they
/// cannot be in [MutableArrayData] itself due to mutability invariants (interior
/// mutability): [MutableArrayData] contains a function that can only mutate
/// [_MutableArrayData], not [MutableArrayData] itself
data: _MutableArrayData<'a>,
/// The child data of the `Array` in Dictionary arrays.
///
/// This is not stored in `_MutableArrayData` because these values are
/// constant and only needed at the end, when freezing [_MutableArrayData].
dictionary: Option<ArrayData>,
/// Variadic data buffers referenced by views.
///
/// Note this this is not stored in `_MutableArrayData` because these values
/// are constant and only needed at the end, when freezing
/// [_MutableArrayData]
variadic_data_buffers: Vec<Buffer>,
/// function used to extend output array with values from input arrays.
///
/// This function's lifetime is bound to the input arrays because it reads
/// values from them.
extend_values: Vec<Extend<'a>>,
/// function used to extend the output array with nulls from input arrays.
///
/// This function's lifetime is bound to the input arrays because it reads
/// nulls from it.
extend_null_bits: Vec<ExtendNullBits<'a>>,
/// function used to extend the output array with null elements.
///
/// This function is independent of the arrays and therefore has no lifetime.
extend_nulls: ExtendNulls,
}
impl std::fmt::Debug for MutableArrayData<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
// ignores the closures.
f.debug_struct("MutableArrayData")
.field("data", &self.data)
.finish()
}
}
/// Builds an extend that adds `offset` to the source primitive
/// Additionally validates that `max` fits into the
/// the underlying primitive returning None if not
fn build_extend_dictionary(array: &ArrayData, offset: usize, max: usize) -> Option<Extend<'_>> {
macro_rules! validate_and_build {
($dt: ty) => {{
let _: $dt = max.try_into().ok()?;
let offset: $dt = offset.try_into().ok()?;
Some(primitive::build_extend_with_offset(array, offset))
}};
}
match array.data_type() {
DataType::Dictionary(child_data_type, _) => match child_data_type.as_ref() {
DataType::UInt8 => validate_and_build!(u8),
DataType::UInt16 => validate_and_build!(u16),
DataType::UInt32 => validate_and_build!(u32),
DataType::UInt64 => validate_and_build!(u64),
DataType::Int8 => validate_and_build!(i8),
DataType::Int16 => validate_and_build!(i16),
DataType::Int32 => validate_and_build!(i32),
DataType::Int64 => validate_and_build!(i64),
_ => unreachable!(),
},
_ => None,
}
}
/// Builds an extend that adds `buffer_offset` to any buffer indices encountered
fn build_extend_view(array: &ArrayData, buffer_offset: u32) -> Extend<'_> {
let views = array.buffer::<u128>(0);
Box::new(
move |mutable: &mut _MutableArrayData, _, start: usize, len: usize| {
mutable
.buffer1
.extend(views[start..start + len].iter().map(|v| {
let len = *v as u32;
if len <= 12 {
return *v; // Stored inline
}
let mut view = ByteView::from(*v);
view.buffer_index += buffer_offset;
view.into()
}))
},
)
}
fn build_extend(array: &ArrayData) -> Extend<'_> {
match array.data_type() {
DataType::Null => null::build_extend(array),
DataType::Boolean => boolean::build_extend(array),
DataType::UInt8 => primitive::build_extend::<u8>(array),
DataType::UInt16 => primitive::build_extend::<u16>(array),
DataType::UInt32 => primitive::build_extend::<u32>(array),
DataType::UInt64 => primitive::build_extend::<u64>(array),
DataType::Int8 => primitive::build_extend::<i8>(array),
DataType::Int16 => primitive::build_extend::<i16>(array),
DataType::Int32 => primitive::build_extend::<i32>(array),
DataType::Int64 => primitive::build_extend::<i64>(array),
DataType::Float32 => primitive::build_extend::<f32>(array),
DataType::Float64 => primitive::build_extend::<f64>(array),
DataType::Date32 | DataType::Time32(_) | DataType::Interval(IntervalUnit::YearMonth) => {
primitive::build_extend::<i32>(array)
}
DataType::Date64
| DataType::Time64(_)
| DataType::Timestamp(_, _)
| DataType::Duration(_)
| DataType::Interval(IntervalUnit::DayTime) => primitive::build_extend::<i64>(array),
DataType::Interval(IntervalUnit::MonthDayNano) => primitive::build_extend::<i128>(array),
DataType::Decimal32(_, _) => primitive::build_extend::<i32>(array),
DataType::Decimal64(_, _) => primitive::build_extend::<i64>(array),
DataType::Decimal128(_, _) => primitive::build_extend::<i128>(array),
DataType::Decimal256(_, _) => primitive::build_extend::<i256>(array),
DataType::Utf8 | DataType::Binary => variable_size::build_extend::<i32>(array),
DataType::LargeUtf8 | DataType::LargeBinary => variable_size::build_extend::<i64>(array),
DataType::BinaryView | DataType::Utf8View => unreachable!("should use build_extend_view"),
DataType::Map(_, _) | DataType::List(_) => list::build_extend::<i32>(array),
DataType::LargeList(_) => list::build_extend::<i64>(array),
DataType::ListView(_) => list_view::build_extend::<i32>(array),
DataType::LargeListView(_) => list_view::build_extend::<i64>(array),
DataType::Dictionary(_, _) => unreachable!("should use build_extend_dictionary"),
DataType::Struct(_) => structure::build_extend(array),
DataType::FixedSizeBinary(_) => fixed_binary::build_extend(array),
DataType::Float16 => primitive::build_extend::<f16>(array),
DataType::FixedSizeList(_, _) => fixed_size_list::build_extend(array),
DataType::Union(_, mode) => match mode {
UnionMode::Sparse => union::build_extend_sparse(array),
UnionMode::Dense => union::build_extend_dense(array),
},
DataType::RunEndEncoded(_, _) => run::build_extend(array),
}
}
fn build_extend_nulls(data_type: &DataType) -> ExtendNulls {
Box::new(match data_type {
DataType::Null => null::extend_nulls,
DataType::Boolean => boolean::extend_nulls,
DataType::UInt8 => primitive::extend_nulls::<u8>,
DataType::UInt16 => primitive::extend_nulls::<u16>,
DataType::UInt32 => primitive::extend_nulls::<u32>,
DataType::UInt64 => primitive::extend_nulls::<u64>,
DataType::Int8 => primitive::extend_nulls::<i8>,
DataType::Int16 => primitive::extend_nulls::<i16>,
DataType::Int32 => primitive::extend_nulls::<i32>,
DataType::Int64 => primitive::extend_nulls::<i64>,
DataType::Float32 => primitive::extend_nulls::<f32>,
DataType::Float64 => primitive::extend_nulls::<f64>,
DataType::Date32 | DataType::Time32(_) | DataType::Interval(IntervalUnit::YearMonth) => {
primitive::extend_nulls::<i32>
}
DataType::Date64
| DataType::Time64(_)
| DataType::Timestamp(_, _)
| DataType::Duration(_)
| DataType::Interval(IntervalUnit::DayTime) => primitive::extend_nulls::<i64>,
DataType::Interval(IntervalUnit::MonthDayNano) => primitive::extend_nulls::<i128>,
DataType::Decimal32(_, _) => primitive::extend_nulls::<i32>,
DataType::Decimal64(_, _) => primitive::extend_nulls::<i64>,
DataType::Decimal128(_, _) => primitive::extend_nulls::<i128>,
DataType::Decimal256(_, _) => primitive::extend_nulls::<i256>,
DataType::Utf8 | DataType::Binary => variable_size::extend_nulls::<i32>,
DataType::LargeUtf8 | DataType::LargeBinary => variable_size::extend_nulls::<i64>,
DataType::BinaryView | DataType::Utf8View => primitive::extend_nulls::<u128>,
DataType::Map(_, _) | DataType::List(_) => list::extend_nulls::<i32>,
DataType::LargeList(_) => list::extend_nulls::<i64>,
DataType::ListView(_) => list_view::extend_nulls::<i32>,
DataType::LargeListView(_) => list_view::extend_nulls::<i64>,
DataType::Dictionary(child_data_type, _) => match child_data_type.as_ref() {
DataType::UInt8 => primitive::extend_nulls::<u8>,
DataType::UInt16 => primitive::extend_nulls::<u16>,
DataType::UInt32 => primitive::extend_nulls::<u32>,
DataType::UInt64 => primitive::extend_nulls::<u64>,
DataType::Int8 => primitive::extend_nulls::<i8>,
DataType::Int16 => primitive::extend_nulls::<i16>,
DataType::Int32 => primitive::extend_nulls::<i32>,
DataType::Int64 => primitive::extend_nulls::<i64>,
_ => unreachable!(),
},
DataType::Struct(_) => structure::extend_nulls,
DataType::FixedSizeBinary(_) => fixed_binary::extend_nulls,
DataType::Float16 => primitive::extend_nulls::<f16>,
DataType::FixedSizeList(_, _) => fixed_size_list::extend_nulls,
DataType::Union(_, mode) => match mode {
UnionMode::Sparse => union::extend_nulls_sparse,
UnionMode::Dense => union::extend_nulls_dense,
},
DataType::RunEndEncoded(_, _) => run::extend_nulls,
})
}
fn preallocate_offset_and_binary_buffer<Offset: ArrowNativeType + Integer>(
capacity: usize,
binary_size: usize,
) -> [MutableBuffer; 2] {
// offsets
let mut buffer = MutableBuffer::new((1 + capacity) * mem::size_of::<Offset>());
// safety: `unsafe` code assumes that this buffer is initialized with one element
buffer.push(Offset::zero());
[
buffer,
MutableBuffer::new(binary_size * mem::size_of::<u8>()),
]
}
/// Define capacities to pre-allocate for child data or data buffers.
#[derive(Debug, Clone)]
pub enum Capacities {
/// Binary, Utf8 and LargeUtf8 data types
///
/// Defines
/// * the capacity of the array offsets
/// * the capacity of the binary/ str buffer
Binary(usize, Option<usize>),
/// List and LargeList data types
///
/// Defines
/// * the capacity of the array offsets
/// * the capacity of the child data
List(usize, Option<Box<Capacities>>),
/// Struct type
///
/// Defines
/// * the capacity of the array
/// * the capacities of the fields
Struct(usize, Option<Vec<Capacities>>),
/// Dictionary type
///
/// Defines
/// * the capacity of the array/keys
/// * the capacity of the values
Dictionary(usize, Option<Box<Capacities>>),
/// Don't preallocate inner buffers and rely on array growth strategy
Array(usize),
}
impl<'a> MutableArrayData<'a> {
/// Returns a new [MutableArrayData] with capacity to `capacity` slots and
/// specialized to create an [ArrayData] from multiple `arrays`.
///
/// # Arguments
/// * `arrays` - the source arrays to copy from
/// * `use_nulls` - a flag used to optimize insertions
/// - `false` if the only source of nulls are the arrays themselves
/// - `true` if the user plans to call [MutableArrayData::extend_nulls].
/// * capacity - the preallocated capacity of the output array, in bytes
///
/// Thus, if `use_nulls` is `false`, calling
/// [MutableArrayData::extend_nulls] should not be used.
pub fn new(arrays: Vec<&'a ArrayData>, use_nulls: bool, capacity: usize) -> Self {
Self::with_capacities(arrays, use_nulls, Capacities::Array(capacity))
}
/// Similar to [MutableArrayData::new], but lets users define the
/// preallocated capacities of the array with more granularity.
///
/// See [MutableArrayData::new] for more information on the arguments.
///
/// # Panics
///
/// This function panics if the given `capacities` don't match the data type
/// of `arrays`. Or when a [Capacities] variant is not yet supported.
pub fn with_capacities(
arrays: Vec<&'a ArrayData>,
use_nulls: bool,
capacities: Capacities,
) -> Self {
let data_type = arrays[0].data_type();
for a in arrays.iter().skip(1) {
assert_eq!(
data_type,
a.data_type(),
"Arrays with inconsistent types passed to MutableArrayData"
)
}
// if any of the arrays has nulls, insertions from any array requires setting bits
// as there is at least one array with nulls.
let use_nulls = use_nulls | arrays.iter().any(|array| array.null_count() > 0);
let mut array_capacity;
let [buffer1, buffer2] = match (data_type, &capacities) {
(
DataType::LargeUtf8 | DataType::LargeBinary,
Capacities::Binary(capacity, Some(value_cap)),
) => {
array_capacity = *capacity;
preallocate_offset_and_binary_buffer::<i64>(*capacity, *value_cap)
}
(DataType::Utf8 | DataType::Binary, Capacities::Binary(capacity, Some(value_cap))) => {
array_capacity = *capacity;
preallocate_offset_and_binary_buffer::<i32>(*capacity, *value_cap)
}
(_, Capacities::Array(capacity)) => {
array_capacity = *capacity;
new_buffers(data_type, *capacity)
}
(
DataType::List(_)
| DataType::LargeList(_)
| DataType::ListView(_)
| DataType::LargeListView(_)
| DataType::FixedSizeList(_, _),
Capacities::List(capacity, _),
) => {
array_capacity = *capacity;
new_buffers(data_type, *capacity)
}
_ => panic!("Capacities: {capacities:?} not yet supported"),
};
let child_data = match &data_type {
DataType::Decimal32(_, _)
| DataType::Decimal64(_, _)
| DataType::Decimal128(_, _)
| DataType::Decimal256(_, _)
| DataType::Null
| DataType::Boolean
| DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64
| DataType::Int8
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::Float16
| DataType::Float32
| DataType::Float64
| DataType::Date32
| DataType::Date64
| DataType::Time32(_)
| DataType::Time64(_)
| DataType::Duration(_)
| DataType::Timestamp(_, _)
| DataType::Utf8
| DataType::Binary
| DataType::LargeUtf8
| DataType::LargeBinary
| DataType::BinaryView
| DataType::Utf8View
| DataType::Interval(_)
| DataType::FixedSizeBinary(_) => vec![],
DataType::Map(_, _)
| DataType::List(_)
| DataType::LargeList(_)
| DataType::ListView(_)
| DataType::LargeListView(_) => {
let children = arrays
.iter()
.map(|array| &array.child_data()[0])
.collect::<Vec<_>>();
let capacities =
if let Capacities::List(capacity, ref child_capacities) = capacities {
child_capacities
.clone()
.map(|c| *c)
.unwrap_or(Capacities::Array(capacity))
} else {
Capacities::Array(array_capacity)
};
vec![MutableArrayData::with_capacities(
children, use_nulls, capacities,
)]
}
// the dictionary type just appends keys and clones the values.
DataType::Dictionary(_, _) => vec![],
DataType::Struct(fields) => match capacities {
Capacities::Struct(capacity, Some(ref child_capacities)) => {
array_capacity = capacity;
(0..fields.len())
.zip(child_capacities)
.map(|(i, child_cap)| {
let child_arrays = arrays
.iter()
.map(|array| &array.child_data()[i])
.collect::<Vec<_>>();
MutableArrayData::with_capacities(
child_arrays,
use_nulls,
child_cap.clone(),
)
})
.collect::<Vec<_>>()
}
Capacities::Struct(capacity, None) => {
array_capacity = capacity;
(0..fields.len())
.map(|i| {
let child_arrays = arrays
.iter()
.map(|array| &array.child_data()[i])
.collect::<Vec<_>>();
MutableArrayData::new(child_arrays, use_nulls, capacity)
})
.collect::<Vec<_>>()
}
_ => (0..fields.len())
.map(|i| {
let child_arrays = arrays
.iter()
.map(|array| &array.child_data()[i])
.collect::<Vec<_>>();
MutableArrayData::new(child_arrays, use_nulls, array_capacity)
})
.collect::<Vec<_>>(),
},
DataType::RunEndEncoded(_, _) => {
let run_ends_child = arrays
.iter()
.map(|array| &array.child_data()[0])
.collect::<Vec<_>>();
let value_child = arrays
.iter()
.map(|array| &array.child_data()[1])
.collect::<Vec<_>>();
vec![
MutableArrayData::new(run_ends_child, false, array_capacity),
MutableArrayData::new(value_child, use_nulls, array_capacity),
]
}
DataType::FixedSizeList(_, size) => {
let children = arrays
.iter()
.map(|array| &array.child_data()[0])
.collect::<Vec<_>>();
let capacities =
if let Capacities::List(capacity, ref child_capacities) = capacities {
child_capacities
.clone()
.map(|c| *c)
.unwrap_or(Capacities::Array(capacity * *size as usize))
} else {
Capacities::Array(array_capacity * *size as usize)
};
vec![MutableArrayData::with_capacities(
children, use_nulls, capacities,
)]
}
DataType::Union(fields, _) => (0..fields.len())
.map(|i| {
let child_arrays = arrays
.iter()
.map(|array| &array.child_data()[i])
.collect::<Vec<_>>();
MutableArrayData::new(child_arrays, use_nulls, array_capacity)
})
.collect::<Vec<_>>(),
};
// Get the dictionary if any, and if it is a concatenation of multiple
let (dictionary, dict_concat) = match &data_type {
DataType::Dictionary(_, _) => {
// If more than one dictionary, concatenate dictionaries together
let dict_concat = !arrays
.windows(2)
.all(|a| a[0].child_data()[0].ptr_eq(&a[1].child_data()[0]));
match dict_concat {
false => (Some(arrays[0].child_data()[0].clone()), false),
true => {
if let Capacities::Dictionary(_, _) = capacities {
panic!("dictionary capacity not yet supported")
}
let dictionaries: Vec<_> =
arrays.iter().map(|array| &array.child_data()[0]).collect();
let lengths: Vec<_> = dictionaries
.iter()
.map(|dictionary| dictionary.len())
.collect();
let capacity = lengths.iter().sum();
let mut mutable = MutableArrayData::new(dictionaries, false, capacity);
for (i, len) in lengths.iter().enumerate() {
mutable.extend(i, 0, *len)
}
(Some(mutable.freeze()), true)
}
}
}
_ => (None, false),
};
let variadic_data_buffers = match &data_type {
DataType::BinaryView | DataType::Utf8View => arrays
.iter()
.flat_map(|x| x.buffers().iter().skip(1))
.map(Buffer::clone)
.collect(),
_ => vec![],
};
let extend_nulls = build_extend_nulls(data_type);
let extend_null_bits = arrays
.iter()
.map(|array| build_extend_null_bits(array, use_nulls))
.collect();
let null_buffer = use_nulls.then(|| {
let null_bytes = bit_util::ceil(array_capacity, 8);
MutableBuffer::from_len_zeroed(null_bytes)
});
let extend_values = match &data_type {
DataType::Dictionary(_, _) => {
let mut next_offset = 0;
let extend_values: Result<Vec<_>, _> = arrays
.iter()
.map(|array| {
let offset = next_offset;
let dict_len = array.child_data()[0].len();
if dict_concat {
next_offset += dict_len;
}
build_extend_dictionary(array, offset, offset + dict_len)
.ok_or(ArrowError::DictionaryKeyOverflowError)
})
.collect();
extend_values.expect("MutableArrayData::new is infallible")
}
DataType::BinaryView | DataType::Utf8View => {
let mut next_offset = 0u32;
arrays
.iter()
.map(|arr| {
let num_data_buffers = (arr.buffers().len() - 1) as u32;
let offset = next_offset;
next_offset = next_offset
.checked_add(num_data_buffers)
.expect("view buffer index overflow");
build_extend_view(arr, offset)
})
.collect()
}
_ => arrays.iter().map(|array| build_extend(array)).collect(),
};
let data = _MutableArrayData {
data_type: data_type.clone(),
len: 0,
null_count: 0,
null_buffer,
buffer1,
buffer2,
child_data,
};
Self {
arrays,
data,
dictionary,
variadic_data_buffers,
extend_values,
extend_null_bits,
extend_nulls,
}
}
/// Extends the in progress array with a region of the input arrays
///
/// # Arguments
/// * `index` - the index of array that you what to copy values from
/// * `start` - the start index of the chunk (inclusive)
/// * `end` - the end index of the chunk (exclusive)
///
/// # Panic
/// This function panics if there is an invalid index,
/// i.e. `index` >= the number of source arrays
/// or `end` > the length of the `index`th array
pub fn extend(&mut self, index: usize, start: usize, end: usize) {
let len = end - start;
(self.extend_null_bits[index])(&mut self.data, start, len);
(self.extend_values[index])(&mut self.data, index, start, len);
self.data.len += len;
}
/// Extends the in progress array with null elements, ignoring the input arrays.
///
/// # Panics
///
/// Panics if [`MutableArrayData`] not created with `use_nulls` or nullable source arrays
pub fn extend_nulls(&mut self, len: usize) {
self.data.len += len;
let bit_len = bit_util::ceil(self.data.len, 8);
let nulls = self.data.null_buffer();
nulls.resize(bit_len, 0);
self.data.null_count += len;
(self.extend_nulls)(&mut self.data, len);
}
/// Returns the current length
#[inline]
pub fn len(&self) -> usize {
self.data.len
}
/// Returns true if len is 0
#[inline]
pub fn is_empty(&self) -> bool {
self.data.len == 0
}
/// Returns the current null count
#[inline]
pub fn null_count(&self) -> usize {
self.data.null_count
}
/// Creates a [ArrayData] from the in progress array, consuming `self`.
pub fn freeze(self) -> ArrayData {
unsafe { self.into_builder().build_unchecked() }
}
/// Consume self and returns the in progress array as [`ArrayDataBuilder`].
///
/// This is useful for extending the default behavior of MutableArrayData.
pub fn into_builder(self) -> ArrayDataBuilder {
let data = self.data;
let buffers = match data.data_type {
DataType::Null
| DataType::Struct(_)
| DataType::FixedSizeList(_, _)
| DataType::RunEndEncoded(_, _) => {
vec![]
}
DataType::BinaryView | DataType::Utf8View => {
let mut b = self.variadic_data_buffers;
b.insert(0, data.buffer1.into());
b
}
DataType::Utf8
| DataType::Binary
| DataType::LargeUtf8
| DataType::LargeBinary
| DataType::ListView(_)
| DataType::LargeListView(_) => {
vec![data.buffer1.into(), data.buffer2.into()]
}
DataType::Union(_, mode) => {
match mode {
// Based on Union's DataTypeLayout
UnionMode::Sparse => vec![data.buffer1.into()],
UnionMode::Dense => vec![data.buffer1.into(), data.buffer2.into()],
}
}
_ => vec![data.buffer1.into()],
};
let child_data = match data.data_type {
DataType::Dictionary(_, _) => vec![self.dictionary.unwrap()],
_ => data.child_data.into_iter().map(|x| x.freeze()).collect(),
};
let nulls = match data.data_type {
// RunEndEncoded and Null arrays cannot have top-level null bitmasks
DataType::RunEndEncoded(_, _) | DataType::Null => None,
_ => data
.null_buffer
.map(|nulls| {
let bools = BooleanBuffer::new(nulls.into(), 0, data.len);
unsafe { NullBuffer::new_unchecked(bools, data.null_count) }
})
.filter(|n| n.null_count() > 0),
};
ArrayDataBuilder::new(data.data_type)
.offset(0)
.len(data.len)
.nulls(nulls)
.buffers(buffers)
.child_data(child_data)
}
}
// See arrow/tests/array_transform.rs for tests of transform functionality
#[cfg(test)]
mod test {
use super::*;
use arrow_schema::Field;
use std::sync::Arc;
#[test]
fn test_list_append_with_capacities() {
let array = ArrayData::new_empty(&DataType::List(Arc::new(Field::new(
"element",
DataType::Int64,
false,
))));
let mutable = MutableArrayData::with_capacities(
vec![&array],
false,
Capacities::List(6, Some(Box::new(Capacities::Array(17)))),
);
// capacities are rounded up to multiples of 64 by MutableBuffer
assert_eq!(mutable.data.buffer1.capacity(), 64);
assert_eq!(mutable.data.child_data[0].data.buffer1.capacity(), 192);
}
}