make sure that only concat preallocates buffers (#382)
* MutableArrayData::with_capacities
* better pattern matching
* add binary capacities
* add list child data
* add struct capacities
* add panic for dictionary type
* change dictionary capacity enum variant
diff --git a/arrow/src/array/mod.rs b/arrow/src/array/mod.rs
index 65cf308..6a0b94a 100644
--- a/arrow/src/array/mod.rs
+++ b/arrow/src/array/mod.rs
@@ -256,7 +256,7 @@
pub type DurationMicrosecondBuilder = PrimitiveBuilder<DurationMicrosecondType>;
pub type DurationNanosecondBuilder = PrimitiveBuilder<DurationNanosecondType>;
-pub use self::transform::MutableArrayData;
+pub use self::transform::{Capacities, MutableArrayData};
// --------------------- Array Iterator ---------------------
diff --git a/arrow/src/array/transform/mod.rs b/arrow/src/array/transform/mod.rs
index 5611671..3fae220 100644
--- a/arrow/src/array/transform/mod.rs
+++ b/arrow/src/array/transform/mod.rs
@@ -326,9 +326,9 @@
})
}
-fn preallocate_str_buffer<Offset: StringOffsetSizeTrait>(
+fn preallocate_offset_and_binary_buffer<Offset: StringOffsetSizeTrait>(
capacity: usize,
- arrays: &[&ArrayData],
+ binary_size: usize,
) -> [MutableBuffer; 2] {
// offsets
let mut buffer = MutableBuffer::new((1 + capacity) * mem::size_of::<Offset>());
@@ -338,25 +338,37 @@
} else {
buffer.push(0i32)
}
- let str_values_size = arrays
- .iter()
- .map(|data| {
- // get the length of the value buffer
- let buf_len = data.buffers()[1].len();
- // find the offset of the buffer
- // this returns a slice of offsets, starting from the offset of the array
- // so we can take the first value
- let offset = data.buffer::<Offset>(0)[0];
- buf_len - offset.to_usize().unwrap()
- })
- .sum::<usize>();
[
buffer,
- MutableBuffer::new(str_values_size * mem::size_of::<u8>()),
+ MutableBuffer::new(binary_size * mem::size_of::<u8>()),
]
}
+/// Define capacities of child data or data buffers.
+#[derive(Debug, Clone)]
+pub enum Capacities {
+ /// Binary, Utf8 and LargeUtf8 data types
+ /// Define
+ /// * the capacity of the array offsets
+ /// * the capacity of the binary/ str buffer
+ Binary(usize, Option<usize>),
+ /// List and LargeList data types
+ /// Define
+ /// * the capacity of the array offsets
+ /// * the capacity of the child data
+ List(usize, Option<Box<Capacities>>),
+ /// Struct type
+ /// * the capacity of the array
+ /// * the capacities of the fields
+ Struct(usize, Option<Vec<Capacities>>),
+ /// Dictionary type
+ /// * the capacity of the array/keys
+ /// * the capacity of the values
+ Dictionary(usize, Option<Box<Capacities>>),
+ /// Don't preallocate inner buffers and rely on array growth strategy
+ Array(usize),
+}
impl<'a> MutableArrayData<'a> {
/// returns a new [MutableArrayData] with capacity to `capacity` slots and specialized to create an
/// [ArrayData] from multiple `arrays`.
@@ -364,7 +376,21 @@
/// `use_nulls` is a flag used to optimize insertions. It should be `false` if the only source of nulls
/// are the arrays themselves and `true` if the user plans to call [MutableArrayData::extend_nulls].
/// In other words, if `use_nulls` is `false`, calling [MutableArrayData::extend_nulls] should not be used.
- pub fn new(arrays: Vec<&'a ArrayData>, mut use_nulls: bool, capacity: usize) -> Self {
+ pub fn new(arrays: Vec<&'a ArrayData>, use_nulls: bool, capacity: usize) -> Self {
+ Self::with_capacities(arrays, use_nulls, Capacities::Array(capacity))
+ }
+
+ /// Similar to [MutableArray::new], but lets users define the preallocated capacities of the array.
+ /// See also [MutableArray::new] for more information on the arguments.
+ ///
+ /// # Panic
+ /// This function panics if the given `capacities` don't match the data type of `arrays`. Or when
+ /// a [Capacities] variant is not yet supported.
+ pub fn with_capacities(
+ arrays: Vec<&'a ArrayData>,
+ mut use_nulls: bool,
+ capacities: Capacities,
+ ) -> Self {
let data_type = arrays[0].data_type();
use crate::datatypes::*;
@@ -374,12 +400,24 @@
use_nulls = true;
};
- // We can prevent reallocation by precomputing the needed size.
- // This is faster and more memory efficient.
- let [buffer1, buffer2] = match data_type {
- DataType::LargeUtf8 => preallocate_str_buffer::<i64>(capacity, &arrays),
- DataType::Utf8 => preallocate_str_buffer::<i32>(capacity, &arrays),
- _ => new_buffers(data_type, capacity),
+ let mut array_capacity;
+
+ let [buffer1, buffer2] = match (data_type, &capacities) {
+ (DataType::LargeUtf8, Capacities::Binary(capacity, Some(value_cap)))
+ | (DataType::LargeBinary, Capacities::Binary(capacity, Some(value_cap))) => {
+ array_capacity = *capacity;
+ preallocate_offset_and_binary_buffer::<i64>(*capacity, *value_cap)
+ }
+ (DataType::Utf8, Capacities::Binary(capacity, Some(value_cap)))
+ | (DataType::Binary, Capacities::Binary(capacity, Some(value_cap))) => {
+ array_capacity = *capacity;
+ preallocate_offset_and_binary_buffer::<i32>(*capacity, *value_cap)
+ }
+ (_, Capacities::Array(capacity)) => {
+ array_capacity = *capacity;
+ new_buffers(data_type, *capacity)
+ }
+ _ => panic!("Capacities: {:?} not yet supported", capacities),
};
let child_data = match &data_type {
@@ -412,20 +450,66 @@
.iter()
.map(|array| &array.child_data()[0])
.collect::<Vec<_>>();
- vec![MutableArrayData::new(childs, use_nulls, capacity)]
+
+ let capacities = if let Capacities::List(capacity, ref child_capacities) =
+ capacities
+ {
+ array_capacity = capacity;
+ child_capacities
+ .clone()
+ .map(|c| *c)
+ .unwrap_or(Capacities::Array(array_capacity))
+ } else {
+ Capacities::Array(array_capacity)
+ };
+
+ vec![MutableArrayData::with_capacities(
+ childs, use_nulls, capacities,
+ )]
}
// the dictionary type just appends keys and clones the values.
DataType::Dictionary(_, _) => vec![],
DataType::Float16 => unreachable!(),
- DataType::Struct(fields) => (0..fields.len())
- .map(|i| {
- let child_arrays = arrays
- .iter()
- .map(|array| &array.child_data()[i])
- .collect::<Vec<_>>();
- MutableArrayData::new(child_arrays, use_nulls, capacity)
- })
- .collect::<Vec<_>>(),
+ DataType::Struct(fields) => match capacities {
+ Capacities::Struct(capacity, Some(ref child_capacities)) => {
+ array_capacity = capacity;
+ (0..fields.len())
+ .zip(child_capacities)
+ .map(|(i, child_cap)| {
+ let child_arrays = arrays
+ .iter()
+ .map(|array| &array.child_data()[i])
+ .collect::<Vec<_>>();
+ MutableArrayData::with_capacities(
+ child_arrays,
+ use_nulls,
+ child_cap.clone(),
+ )
+ })
+ .collect::<Vec<_>>()
+ }
+ Capacities::Struct(capacity, None) => {
+ array_capacity = capacity;
+ (0..fields.len())
+ .map(|i| {
+ let child_arrays = arrays
+ .iter()
+ .map(|array| &array.child_data()[i])
+ .collect::<Vec<_>>();
+ MutableArrayData::new(child_arrays, use_nulls, capacity)
+ })
+ .collect::<Vec<_>>()
+ }
+ _ => (0..fields.len())
+ .map(|i| {
+ let child_arrays = arrays
+ .iter()
+ .map(|array| &array.child_data()[i])
+ .collect::<Vec<_>>();
+ MutableArrayData::new(child_arrays, use_nulls, array_capacity)
+ })
+ .collect::<Vec<_>>(),
+ },
_ => {
todo!("Take and filter operations still not supported for this datatype")
}
@@ -436,6 +520,9 @@
0 => unreachable!(),
1 => Some(arrays[0].child_data()[0].clone()),
_ => {
+ if let Capacities::Dictionary(_, _) = capacities {
+ panic!("dictionary capacity not yet supported")
+ }
// Concat dictionaries together
let dictionaries: Vec<_> =
arrays.iter().map(|array| &array.child_data()[0]).collect();
@@ -465,7 +552,7 @@
.map(|array| build_extend_null_bits(array, use_nulls))
.collect();
- let null_bytes = bit_util::ceil(capacity, 8);
+ let null_bytes = bit_util::ceil(array_capacity, 8);
let null_buffer = MutableBuffer::from_len_zeroed(null_bytes);
let extend_values = match &data_type {
diff --git a/arrow/src/compute/kernels/concat.rs b/arrow/src/compute/kernels/concat.rs
index 83140c8..cc976a4 100644
--- a/arrow/src/compute/kernels/concat.rs
+++ b/arrow/src/compute/kernels/concat.rs
@@ -31,8 +31,26 @@
//! ```
use crate::array::*;
+use crate::datatypes::DataType;
use crate::error::{ArrowError, Result};
+fn compute_str_values_length<Offset: StringOffsetSizeTrait>(
+ arrays: &[&ArrayData],
+) -> usize {
+ arrays
+ .iter()
+ .map(|&data| {
+ // get the length of the value buffer
+ let buf_len = data.buffers()[1].len();
+ // find the offset of the buffer
+ // this returns a slice of offsets, starting from the offset of the array
+ // so we can take the first value
+ let offset = data.buffer::<Offset>(0)[0];
+ buf_len - offset.to_usize().unwrap()
+ })
+ .sum()
+}
+
/// Concatenate multiple [Array] of the same type into a single [ArrayRef].
pub fn concat(arrays: &[&Array]) -> Result<ArrayRef> {
if arrays.is_empty() {
@@ -56,7 +74,25 @@
let arrays = arrays.iter().map(|a| a.data()).collect::<Vec<_>>();
- let mut mutable = MutableArrayData::new(arrays, false, capacity);
+ let mut mutable = match arrays[0].data_type() {
+ DataType::Utf8 => {
+ let str_values_size = compute_str_values_length::<i32>(&arrays);
+ MutableArrayData::with_capacities(
+ arrays,
+ false,
+ Capacities::Binary(capacity, Some(str_values_size)),
+ )
+ }
+ DataType::LargeUtf8 => {
+ let str_values_size = compute_str_values_length::<i64>(&arrays);
+ MutableArrayData::with_capacities(
+ arrays,
+ false,
+ Capacities::Binary(capacity, Some(str_values_size)),
+ )
+ }
+ _ => MutableArrayData::new(arrays, false, capacity),
+ };
for (i, len) in lengths.iter().enumerate() {
mutable.extend(i, 0, *len)