blob: 2e95039c16b9b6b7d6d3bdb94e03db658d590f3c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Parquet definition and repetition levels
//!
//! Contains the algorithm for computing definition and repetition levels.
//! The algorithm works by tracking the slots of an array that should
//! ultimately be populated when writing to Parquet.
//! Parquet achieves nesting through definition levels and repetition levels \[1\].
//! Definition levels specify how many optional fields in the part for the column
//! are defined.
//! Repetition levels specify at what repeated field (list) in the path a column
//! is defined.
//!
//! In a nested data structure such as `a.b.c`, one can see levels as defining
//! whether a record is defined at `a`, `a.b`, or `a.b.c`.
//! Optional fields are nullable fields, thus if all 3 fields
//! are nullable, the maximum definition could be = 3 if there are no lists.
//!
//! The algorithm in this module computes the necessary information to enable
//! the writer to keep track of which columns are at which levels, and to extract
//! the correct values at the correct slots from Arrow arrays.
//!
//! It works by walking a record batch's arrays, keeping track of what values
//! are non-null, their positions and computing what their levels are.
//!
//! \[1\] [parquet-format#nested-encoding](https://github.com/apache/parquet-format#nested-encoding)
use arrow::array::{make_array, ArrayRef, StructArray};
use arrow::datatypes::{DataType, Field};
/// Keeps track of the level information per array that is needed to write an Arrow array to Parquet.
///
/// When a nested schema is traversed, intermediate [LevelInfo] structs are created to track
/// the state of parent arrays. When a primitive Arrow array is encountered, a final [LevelInfo]
/// is created, and this is what is used to index into the array when writing data to Parquet.
#[derive(Debug, Eq, PartialEq, Clone)]
pub(crate) struct LevelInfo {
/// Array's definition levels
pub definition: Vec<i16>,
/// Array's optional repetition levels
pub repetition: Option<Vec<i16>>,
/// Array's offsets, 64-bit is used to accommodate large offset arrays
pub array_offsets: Vec<i64>,
// TODO: Convert to an Arrow Buffer after ARROW-10766 is merged.
/// Array's logical validity mask, whcih gets unpacked for list children.
/// If the parent of an array is null, all children are logically treated as
/// null. This mask keeps track of that.
///
pub array_mask: Vec<bool>,
/// The maximum definition at this level, 0 at the record batch
pub max_definition: i16,
/// The type of array represented by this level info
pub level_type: LevelType,
/// The offset of the current level's array
pub offset: usize,
/// The length of the current level's array
pub length: usize,
}
/// LevelType defines the type of level, and whether it is nullable or not
#[derive(Debug, Eq, PartialEq, Clone, Copy)]
pub(crate) enum LevelType {
Root,
List(bool),
Struct(bool),
Primitive(bool),
}
impl LevelType {
#[inline]
const fn level_increment(&self) -> i16 {
match self {
LevelType::Root => 0,
LevelType::List(is_nullable)
| LevelType::Struct(is_nullable)
| LevelType::Primitive(is_nullable) => *is_nullable as i16,
}
}
}
impl LevelInfo {
/// Create a new [LevelInfo] by filling `length` slots, and setting an initial offset.
///
/// This is a convenience function to populate the starting point of the traversal.
pub(crate) fn new(offset: usize, length: usize) -> Self {
Self {
// a batch has no definition level yet
definition: vec![0; length],
// a batch has no repetition as it is not a list
repetition: None,
// a batch has sequential offsets, should be num_rows + 1
array_offsets: (0..=(length as i64)).collect(),
// all values at a batch-level are non-null
array_mask: vec![true; length],
max_definition: 0,
level_type: LevelType::Root,
offset,
length,
}
}
/// Compute nested levels of the Arrow array, recursing into lists and structs.
///
/// Returns a list of `LevelInfo`, where each level is for nested primitive arrays.
///
/// The parent struct's nullness is tracked, as it determines whether the child
/// max_definition should be incremented.
/// The 'is_parent_struct' variable asks "is this field's parent a struct?".
/// * If we are starting at a [RecordBatch], this is `false`.
/// * If we are calculating a list's child, this is `false`.
/// * If we are calculating a struct (i.e. `field.data_type90 == Struct`),
/// this depends on whether the struct is a child of a struct.
/// * If we are calculating a field inside a [StructArray], this is 'true'.
pub(crate) fn calculate_array_levels(
&self,
array: &ArrayRef,
field: &Field,
) -> Vec<Self> {
let (array_offsets, array_mask) =
Self::get_array_offsets_and_masks(array, self.offset, self.length);
match array.data_type() {
DataType::Null => vec![Self {
definition: self.definition.clone(),
repetition: self.repetition.clone(),
array_offsets,
array_mask,
max_definition: self.max_definition.max(1),
// Null type is always nullable
level_type: LevelType::Primitive(true),
offset: self.offset,
length: self.length,
}],
DataType::Boolean
| DataType::Int8
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64
| DataType::Float16
| DataType::Float32
| DataType::Float64
| DataType::Utf8
| DataType::LargeUtf8
| DataType::Timestamp(_, _)
| DataType::Date32
| DataType::Date64
| DataType::Time32(_)
| DataType::Time64(_)
| DataType::Duration(_)
| DataType::Interval(_)
| DataType::Binary
| DataType::LargeBinary
| DataType::Decimal(_, _)
| DataType::FixedSizeBinary(_) => {
// we return a vector of 1 value to represent the primitive
vec![self.calculate_child_levels(
array_offsets,
array_mask,
LevelType::Primitive(field.is_nullable()),
)]
}
DataType::List(list_field) | DataType::LargeList(list_field) => {
let child_offset = array_offsets[0] as usize;
let child_len = *array_offsets.last().unwrap() as usize;
// Calculate the list level
let list_level = self.calculate_child_levels(
array_offsets,
array_mask,
LevelType::List(field.is_nullable()),
);
// Construct the child array of the list, and get its offset + mask
let array_data = array.data();
let child_data = array_data.child_data().get(0).unwrap();
let child_array = make_array(child_data.clone());
let (child_offsets, child_mask) = Self::get_array_offsets_and_masks(
&child_array,
child_offset,
child_len - child_offset,
);
match child_array.data_type() {
// TODO: The behaviour of a <list<null>> is untested
DataType::Null => vec![list_level],
DataType::Boolean
| DataType::Int8
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64
| DataType::Float16
| DataType::Float32
| DataType::Float64
| DataType::Timestamp(_, _)
| DataType::Date32
| DataType::Date64
| DataType::Time32(_)
| DataType::Time64(_)
| DataType::Duration(_)
| DataType::Interval(_)
| DataType::Binary
| DataType::LargeBinary
| DataType::Utf8
| DataType::LargeUtf8
| DataType::Dictionary(_, _)
| DataType::Decimal(_, _)
| DataType::FixedSizeBinary(_) => {
vec![list_level.calculate_child_levels(
child_offsets,
child_mask,
LevelType::Primitive(list_field.is_nullable()),
)]
}
DataType::List(_) | DataType::LargeList(_) | DataType::Struct(_) => {
list_level.calculate_array_levels(&child_array, list_field)
}
DataType::FixedSizeList(_, _) => unimplemented!(),
DataType::Union(_) => unimplemented!(),
}
}
DataType::FixedSizeList(_, _) => unimplemented!(),
DataType::Struct(struct_fields) => {
let struct_array: &StructArray = array
.as_any()
.downcast_ref::<StructArray>()
.expect("Unable to get struct array");
let struct_level = self.calculate_child_levels(
array_offsets,
array_mask,
LevelType::Struct(field.is_nullable()),
);
let mut struct_levels = vec![];
struct_array
.columns()
.into_iter()
.zip(struct_fields)
.for_each(|(child_array, child_field)| {
let mut levels =
struct_level.calculate_array_levels(child_array, child_field);
struct_levels.append(&mut levels);
});
struct_levels
}
DataType::Union(_) => unimplemented!(),
DataType::Dictionary(_, _) => {
// Need to check for these cases not implemented in C++:
// - "Writing DictionaryArray with nested dictionary type not yet supported"
// - "Writing DictionaryArray with null encoded in dictionary type not yet supported"
// vec![self.get_primitive_def_levels(array, field, array_mask)]
vec![self.calculate_child_levels(
array_offsets,
array_mask,
LevelType::Primitive(field.is_nullable()),
)]
}
}
}
/// Calculate child/leaf array levels.
///
/// The algorithm works by incrementing definitions of array values based on whether:
/// - a value is optional or required (is_nullable)
/// - a list value is repeated + optional or required (is_list)
///
/// A record batch always starts at a populated definition = level 0.
/// When a batch only has a primitive, i.e. `<batch<primitive[a]>>, column `a`
/// can only have a maximum level of 1 if it is not null.
/// If it is not null, we increment by 1, such that the null slots will = level 1.
/// The above applies to types that have no repetition (anything not a list or map).
///
/// If a batch has lists, then we increment by up to 2 levels:
/// - 1 level for the list (repeated)
/// - 1 level if the list itself is nullable (optional)
///
/// A list's child then gets incremented using the above rules.
///
/// *Exceptions*
///
/// There are 2 exceptions from the above rules:
///
/// 1. When at the root of the schema: We always increment the
/// level regardless of whether the child is nullable or not. If we do not do
/// this, we could have a non-nullable array having a definition of 0.
///
/// 2. List parent, non-list child: We always increment the level in this case,
/// regardless of whether the child is nullable or not.
///
/// *Examples*
///
/// A batch with only a primitive that's non-nullable. `<primitive[required]>`:
/// * We don't increment the definition level as the array is not optional.
/// * This would leave us with a definition of 0, so the first exception applies.
/// * The definition level becomes 1.
///
/// A batch with only a primitive that's nullable. `<primitive[optional]>`:
/// * The definition level becomes 1, as we increment it once.
///
/// A batch with a single non-nullable list (both list and child not null):
/// * We calculate the level twice, for the list, and for the child.
/// * At the list, the level becomes 1, where 0 indicates that the list is
/// empty, and 1 says it's not (determined through offsets).
/// * At the primitive level, the second exception applies. The level becomes 2.
fn calculate_child_levels(
&self,
// we use 64-bit offsets to also accommodate large arrays
array_offsets: Vec<i64>,
array_mask: Vec<bool>,
level_type: LevelType,
) -> Self {
let min_len = *(array_offsets.last().unwrap()) as usize;
let mut definition = Vec::with_capacity(min_len);
let mut repetition = Vec::with_capacity(min_len);
let mut merged_array_mask = Vec::with_capacity(min_len);
let max_definition = match (self.level_type, level_type) {
(LevelType::Root, LevelType::Struct(is_nullable)) => {
// If the struct is non-nullable, its def level doesn't increment
is_nullable as i16
}
(LevelType::Root, _) => 1,
(_, LevelType::Root) => {
unreachable!("Cannot have a root as a child")
}
(LevelType::List(_), _) => {
self.max_definition + 1 + level_type.level_increment()
}
(LevelType::Struct(_), _) => {
self.max_definition + level_type.level_increment()
}
(_, LevelType::List(is_nullable)) => {
// if the child is a list, even if its parent is a root
self.max_definition + 1 + is_nullable as i16
}
(LevelType::Primitive(_), _) => {
unreachable!("Cannot have a primitive parent for any type")
}
};
match (self.level_type, level_type) {
(LevelType::List(_), LevelType::List(is_nullable)) => {
// parent is a list or descendant of a list, and child is a list
let reps = self.repetition.clone().unwrap();
// Calculate the 2 list hierarchy definitions in advance
// List is not empty, but null
let l2 = max_definition - is_nullable as i16;
// List is not empty, and not null
let l3 = max_definition;
let mut nulls_seen = 0;
self.array_offsets.windows(2).for_each(|w| {
let start = w[0] as usize;
let end = w[1] as usize;
let parent_len = end - start;
if parent_len == 0 {
// If the parent length is 0, there won't be a slot for the child
let index = start + nulls_seen - self.offset;
definition.push(self.definition[index]);
repetition.push(0);
merged_array_mask.push(self.array_mask[index]);
nulls_seen += 1;
} else {
(start..end).for_each(|parent_index| {
let index = parent_index + nulls_seen - self.offset;
let parent_index = parent_index - self.offset;
// parent is either defined at this level, or earlier
let parent_def = self.definition[index];
let parent_rep = reps[index];
let parent_mask = self.array_mask[index];
// valid parent, index into children
let child_start = array_offsets[parent_index] as usize;
let child_end = array_offsets[parent_index + 1] as usize;
let child_len = child_end - child_start;
let child_mask = array_mask[parent_index];
let merged_mask = parent_mask && child_mask;
if child_len == 0 {
definition.push(parent_def);
repetition.push(parent_rep);
merged_array_mask.push(merged_mask);
} else {
(child_start..child_end).for_each(|child_index| {
let rep = match (
parent_index == start,
child_index == child_start,
) {
(true, true) => parent_rep,
(true, false) => parent_rep + 2,
(false, true) => parent_rep,
(false, false) => parent_rep + 1,
};
definition.push(if !parent_mask {
parent_def
} else if child_mask {
l3
} else {
l2
});
repetition.push(rep);
merged_array_mask.push(merged_mask);
});
}
});
}
});
debug_assert_eq!(definition.len(), merged_array_mask.len());
let offset = *array_offsets.first().unwrap() as usize;
let length = *array_offsets.last().unwrap() as usize - offset;
Self {
definition,
repetition: Some(repetition),
array_offsets,
array_mask: merged_array_mask,
max_definition,
level_type,
offset: offset + self.offset,
length,
}
}
(LevelType::List(_), _) => {
// List and primitive (or struct).
// The list can have more values than the primitive, indicating that there
// are slots where the list is empty. We use a counter to track this behaviour.
let mut nulls_seen = 0;
// let child_max_definition = list_max_definition + is_nullable as i16;
// child values are a function of parent list offsets
let reps = self.repetition.as_deref().unwrap();
self.array_offsets.windows(2).for_each(|w| {
let start = w[0] as usize;
let end = w[1] as usize;
let parent_len = end - start;
if parent_len == 0 {
let index = start + nulls_seen - self.offset;
definition.push(self.definition[index]);
repetition.push(reps[index]);
merged_array_mask.push(self.array_mask[index]);
nulls_seen += 1;
} else {
// iterate through the array, adjusting child definitions for nulls
(start..end).for_each(|child_index| {
let index = child_index + nulls_seen - self.offset;
let child_mask = array_mask[child_index - self.offset];
let parent_mask = self.array_mask[index];
let parent_def = self.definition[index];
if !parent_mask || parent_def < self.max_definition {
definition.push(parent_def);
repetition.push(reps[index]);
merged_array_mask.push(parent_mask);
} else {
definition.push(max_definition - !child_mask as i16);
repetition.push(reps[index]);
merged_array_mask.push(child_mask);
}
});
}
});
debug_assert_eq!(definition.len(), merged_array_mask.len());
let offset = *array_offsets.first().unwrap() as usize;
let length = *array_offsets.last().unwrap() as usize - offset;
Self {
definition,
repetition: Some(repetition),
array_offsets: self.array_offsets.clone(),
array_mask: merged_array_mask,
max_definition,
level_type,
offset: offset + self.offset,
length,
}
}
(_, LevelType::List(is_nullable)) => {
// Encountering a list for the first time.
// Calculate the 2 list hierarchy definitions in advance
// List is not empty, but null (if nullable)
let l2 = max_definition - is_nullable as i16;
// List is not empty, and not null
let l3 = max_definition;
self.definition
.iter()
.enumerate()
.for_each(|(parent_index, def)| {
let child_from = array_offsets[parent_index];
let child_to = array_offsets[parent_index + 1];
let child_len = child_to - child_from;
let child_mask = array_mask[parent_index];
let parent_mask = self.array_mask[parent_index];
match (parent_mask, child_len) {
(true, 0) => {
// empty slot that is valid, i.e. {"parent": {"child": [] } }
definition.push(if child_mask { l3 } else { l2 });
repetition.push(0);
merged_array_mask.push(child_mask);
}
(false, 0) => {
definition.push(*def);
repetition.push(0);
merged_array_mask.push(child_mask);
}
(true, _) => {
(child_from..child_to).for_each(|child_index| {
definition.push(if child_mask { l3 } else { l2 });
// mark the first child slot as 0, and the next as 1
repetition.push(if child_index == child_from {
0
} else {
1
});
merged_array_mask.push(child_mask);
});
}
(false, _) => {
(child_from..child_to).for_each(|child_index| {
definition.push(*def);
// mark the first child slot as 0, and the next as 1
repetition.push(if child_index == child_from {
0
} else {
1
});
merged_array_mask.push(false);
});
}
}
});
debug_assert_eq!(definition.len(), merged_array_mask.len());
let offset = *array_offsets.first().unwrap() as usize;
let length = *array_offsets.last().unwrap() as usize - offset;
Self {
definition,
repetition: Some(repetition),
array_offsets,
array_mask: merged_array_mask,
max_definition,
level_type,
offset,
length,
}
}
(_, _) => {
self.definition
.iter()
.zip(array_mask.into_iter().zip(&self.array_mask))
.for_each(|(current_def, (child_mask, parent_mask))| {
merged_array_mask.push(*parent_mask && child_mask);
match (parent_mask, child_mask) {
(true, true) => {
definition.push(max_definition);
}
(true, false) => {
// The child is only legally null if its array is nullable.
// Thus parent's max_definition is lower
definition.push(if *current_def <= self.max_definition {
*current_def
} else {
self.max_definition
});
}
// if the parent was false, retain its definitions
(false, _) => {
definition.push(*current_def);
}
}
});
debug_assert_eq!(definition.len(), merged_array_mask.len());
Self {
definition,
repetition: self.repetition.clone(), // it's None
array_offsets,
array_mask: merged_array_mask,
max_definition,
level_type,
// Inherit parent offset and length
offset: self.offset,
length: self.length,
}
}
}
}
/// Get the offsets of an array as 64-bit values, and validity masks as booleans
/// - Primitive, binary and struct arrays' offsets will be a sequence, masks obtained
/// from validity bitmap
/// - List array offsets will be the value offsets, masks are computed from offsets
fn get_array_offsets_and_masks(
array: &ArrayRef,
offset: usize,
len: usize,
) -> (Vec<i64>, Vec<bool>) {
match array.data_type() {
DataType::Null
| DataType::Boolean
| DataType::Int8
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64
| DataType::Float16
| DataType::Float32
| DataType::Float64
| DataType::Timestamp(_, _)
| DataType::Date32
| DataType::Date64
| DataType::Time32(_)
| DataType::Time64(_)
| DataType::Duration(_)
| DataType::Interval(_)
| DataType::Binary
| DataType::LargeBinary
| DataType::Utf8
| DataType::LargeUtf8
| DataType::Struct(_)
| DataType::Dictionary(_, _)
| DataType::Decimal(_, _) => {
let array_mask = match array.data().null_buffer() {
Some(buf) => get_bool_array_slice(buf, array.offset() + offset, len),
None => vec![true; len],
};
((0..=(len as i64)).collect(), array_mask)
}
DataType::List(_) => {
let data = array.data();
let offsets = unsafe { data.buffers()[0].typed_data::<i32>() };
let offsets = offsets
.to_vec()
.into_iter()
.skip(offset)
.take(len + 1)
.map(|v| v as i64)
.collect::<Vec<i64>>();
let array_mask = match array.data().null_buffer() {
Some(buf) => get_bool_array_slice(buf, array.offset() + offset, len),
None => vec![true; len],
};
(offsets, array_mask)
}
DataType::LargeList(_) => {
let offsets = unsafe { array.data().buffers()[0].typed_data::<i64>() }
.iter()
.skip(offset)
.take(len + 1)
.copied()
.collect();
let array_mask = match array.data().null_buffer() {
Some(buf) => get_bool_array_slice(buf, array.offset() + offset, len),
None => vec![true; len],
};
(offsets, array_mask)
}
DataType::FixedSizeBinary(value_len) => {
let array_mask = match array.data().null_buffer() {
Some(buf) => get_bool_array_slice(buf, array.offset() + offset, len),
None => vec![true; len],
};
let value_len = *value_len as i64;
(
(0..=(len as i64)).map(|v| v * value_len).collect(),
array_mask,
)
}
DataType::FixedSizeList(_, _) | DataType::Union(_) => {
unimplemented!("Getting offsets not yet implemented")
}
}
}
/// Given a level's information, calculate the offsets required to index an array correctly.
pub(crate) fn filter_array_indices(&self) -> Vec<usize> {
// happy path if not dealing with lists
let is_nullable = match self.level_type {
LevelType::Primitive(is_nullable) => is_nullable,
_ => panic!(
"Cannot filter indices on a non-primitive array, found {:?}",
self.level_type
),
};
if self.repetition.is_none() {
return self
.definition
.iter()
.enumerate()
.filter_map(|(i, def)| {
if *def == self.max_definition {
Some(i)
} else {
None
}
})
.collect();
}
let mut filtered = vec![];
// remove slots that are false from definition_mask
let mut index = 0;
self.definition.iter().for_each(|def| {
if *def == self.max_definition {
filtered.push(index);
}
if *def >= self.max_definition - is_nullable as i16 {
index += 1;
}
});
filtered
}
}
/// Convert an Arrow buffer to a boolean array slice
/// TODO: this was created for buffers, so might not work for bool array, might be slow too
#[inline]
fn get_bool_array_slice(
buffer: &arrow::buffer::Buffer,
offset: usize,
len: usize,
) -> Vec<bool> {
let data = buffer.as_slice();
(offset..(len + offset))
.map(|i| arrow::util::bit_util::get_bit(data, i))
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use arrow::array::*;
use arrow::buffer::Buffer;
use arrow::datatypes::{Schema, ToByteSlice};
use arrow::record_batch::RecordBatch;
#[test]
fn test_calculate_array_levels_twitter_example() {
// based on the example at https://blog.twitter.com/engineering/en_us/a/2013/dremel-made-simple-with-parquet.html
// [[a, b, c], [d, e, f, g]], [[h], [i,j]]
let parent_levels = LevelInfo {
definition: vec![0, 0],
repetition: None,
array_offsets: vec![0, 1, 2], // 2 records, root offsets always sequential
array_mask: vec![true, true], // both lists defined
max_definition: 0,
level_type: LevelType::Root,
offset: 0,
length: 2,
};
// offset into array, each level1 has 2 values
let array_offsets = vec![0, 2, 4];
let array_mask = vec![true, true];
// calculate level1 levels
let levels = parent_levels.calculate_child_levels(
array_offsets.clone(),
array_mask,
LevelType::List(false),
);
//
let expected_levels = LevelInfo {
definition: vec![1, 1, 1, 1],
repetition: Some(vec![0, 1, 0, 1]),
array_offsets,
array_mask: vec![true, true, true, true],
max_definition: 1,
level_type: LevelType::List(false),
offset: 0,
length: 4,
};
// the separate asserts make it easier to see what's failing
assert_eq!(&levels.definition, &expected_levels.definition);
assert_eq!(&levels.repetition, &expected_levels.repetition);
assert_eq!(&levels.array_mask, &expected_levels.array_mask);
assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
assert_eq!(&levels.max_definition, &expected_levels.max_definition);
assert_eq!(&levels.level_type, &expected_levels.level_type);
// this assert is to help if there are more variables added to the struct
assert_eq!(&levels, &expected_levels);
// level2
let parent_levels = levels;
let array_offsets = vec![0, 3, 7, 8, 10];
let array_mask = vec![true, true, true, true];
let levels = parent_levels.calculate_child_levels(
array_offsets.clone(),
array_mask,
LevelType::List(false),
);
let expected_levels = LevelInfo {
definition: vec![2, 2, 2, 2, 2, 2, 2, 2, 2, 2],
repetition: Some(vec![0, 2, 2, 1, 2, 2, 2, 0, 1, 2]),
array_offsets,
array_mask: vec![true; 10],
max_definition: 2,
level_type: LevelType::List(false),
offset: 0,
length: 10,
};
assert_eq!(&levels.definition, &expected_levels.definition);
assert_eq!(&levels.repetition, &expected_levels.repetition);
assert_eq!(&levels.array_mask, &expected_levels.array_mask);
assert_eq!(&levels.max_definition, &expected_levels.max_definition);
assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
assert_eq!(&levels.level_type, &expected_levels.level_type);
assert_eq!(&levels, &expected_levels);
}
#[test]
fn test_calculate_one_level_1() {
// This test calculates the levels for a non-null primitive array
let parent_levels = LevelInfo {
definition: vec![0; 10],
repetition: None,
array_offsets: (0..=10).collect(),
array_mask: vec![true; 10],
max_definition: 0,
level_type: LevelType::Root,
offset: 0,
length: 10,
};
let array_offsets: Vec<i64> = (0..=10).collect();
let array_mask = vec![true; 10];
let levels = parent_levels.calculate_child_levels(
array_offsets.clone(),
array_mask.clone(),
LevelType::Primitive(false),
);
let expected_levels = LevelInfo {
definition: vec![1; 10],
repetition: None,
array_offsets,
array_mask,
max_definition: 1,
level_type: LevelType::Primitive(false),
offset: 0,
length: 10,
};
assert_eq!(&levels, &expected_levels);
}
#[test]
fn test_calculate_one_level_2() {
// This test calculates the levels for a non-null primitive array
let parent_levels = LevelInfo {
definition: vec![0; 5],
repetition: None,
array_offsets: (0..=5).collect(),
array_mask: vec![true, true, true, true, true],
max_definition: 0,
level_type: LevelType::Root,
offset: 0,
length: 5,
};
let array_offsets: Vec<i64> = (0..=5).collect();
let array_mask = vec![true, false, true, true, false];
let levels = parent_levels.calculate_child_levels(
array_offsets.clone(),
array_mask.clone(),
LevelType::Primitive(true),
);
let expected_levels = LevelInfo {
definition: vec![1, 0, 1, 1, 0],
repetition: None,
array_offsets,
array_mask,
max_definition: 1,
level_type: LevelType::Primitive(true),
offset: 0,
length: 5,
};
assert_eq!(&levels, &expected_levels);
}
#[test]
fn test_calculate_array_levels_1() {
// if all array values are defined (e.g. batch<list<_>>)
// [[0], [1], [2], [3], [4]]
let parent_levels = LevelInfo {
definition: vec![0; 5],
repetition: None,
array_offsets: vec![0, 1, 2, 3, 4, 5],
array_mask: vec![true, true, true, true, true],
max_definition: 0,
level_type: LevelType::Root,
offset: 0,
length: 5,
};
let array_offsets = vec![0, 2, 2, 4, 8, 11];
let array_mask = vec![true, false, true, true, true];
let levels = parent_levels.calculate_child_levels(
array_offsets.clone(),
array_mask,
LevelType::List(true),
);
// array: [[0, 0], _1_, [2, 2], [3, 3, 3, 3], [4, 4, 4]]
// all values are defined as we do not have nulls on the root (batch)
// repetition:
// 0: 0, 1
// 1:
// 2: 0, 1
// 3: 0, 1, 1, 1
// 4: 0, 1, 1
let expected_levels = LevelInfo {
// The levels are normally 2 because we:
// - Calculate the level at the list
// - Calculate the level at the list's child
// We do not do this in these tests, thus the levels are 1 less.
definition: vec![1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1],
repetition: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]),
array_offsets,
array_mask: vec![
true, true, false, true, true, true, true, true, true, true, true, true,
],
max_definition: 1,
level_type: LevelType::List(true),
offset: 0,
length: 11, // the child has 11 slots
};
assert_eq!(&levels.definition, &expected_levels.definition);
assert_eq!(&levels.repetition, &expected_levels.repetition);
assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
assert_eq!(&levels.max_definition, &expected_levels.max_definition);
assert_eq!(&levels.level_type, &expected_levels.level_type);
assert_eq!(&levels, &expected_levels);
}
#[test]
fn test_calculate_array_levels_2() {
// If some values are null
//
// This emulates an array in the form: <struct<list<?>>
// with values:
// - 0: [0, 1], but is null because of the struct
// - 1: []
// - 2: [2, 3], but is null because of the struct
// - 3: [4, 5, 6, 7]
// - 4: [8, 9, 10]
//
// If the first values of a list are null due to a parent, we have to still account for them
// while indexing, because they would affect the way the child is indexed
// i.e. in the above example, we have to know that [0, 1] has to be skipped
let parent_levels = LevelInfo {
definition: vec![0, 1, 0, 1, 1],
repetition: None,
array_offsets: vec![0, 1, 2, 3, 4, 5],
array_mask: vec![false, true, false, true, true],
max_definition: 1,
level_type: LevelType::Struct(true),
offset: 0,
length: 5,
};
let array_offsets = vec![0, 2, 2, 4, 8, 11];
let array_mask = vec![true, false, true, true, true];
let levels = parent_levels.calculate_child_levels(
array_offsets.clone(),
array_mask,
LevelType::List(true),
);
let expected_levels = LevelInfo {
// 0 1 [2] are 0 (not defined at level 1)
// [2] is 1, but has 0 slots so is not populated (defined at level 1 only)
// 2 3 [4] are 0
// 4 5 6 7 [8] are 1 (defined at level 1 only)
// 8 9 10 [11] are 2 (defined at both levels)
definition: vec![0, 0, 1, 0, 0, 2, 2, 2, 2, 2, 2, 2],
repetition: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]),
array_offsets,
array_mask: vec![
false, false, false, false, false, true, true, true, true, true, true,
true,
],
max_definition: 2,
level_type: LevelType::List(true),
offset: 0,
length: 11,
};
assert_eq!(&levels.definition, &expected_levels.definition);
assert_eq!(&levels.repetition, &expected_levels.repetition);
assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
assert_eq!(&levels.max_definition, &expected_levels.max_definition);
assert_eq!(&levels.level_type, &expected_levels.level_type);
assert_eq!(&levels, &expected_levels);
// nested lists (using previous test)
let nested_parent_levels = levels;
let array_offsets = vec![0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22];
let array_mask = vec![
true, true, true, true, true, true, true, true, true, true, true,
];
let levels = nested_parent_levels.calculate_child_levels(
array_offsets.clone(),
array_mask,
LevelType::List(true),
);
let expected_levels = LevelInfo {
// (def: 0) 0 1 [2] are 0 (take parent)
// (def: 0) 2 3 [4] are 0 (take parent)
// (def: 0) 4 5 [6] are 0 (take parent)
// (def: 0) 6 7 [8] are 0 (take parent)
// (def: 1) 8 9 [10] are 1 (take parent)
// (def: 1) 10 11 [12] are 1 (take parent)
// (def: 1) 12 23 [14] are 1 (take parent)
// (def: 1) 14 15 [16] are 1 (take parent)
// (def: 2) 16 17 [18] are 2 (defined at all levels)
// (def: 2) 18 19 [20] are 2 (defined at all levels)
// (def: 2) 20 21 [22] are 2 (defined at all levels)
//
// 0 1 [2] are 0 (not defined at level 1)
// [2] is 1, but has 0 slots so is not populated (defined at level 1 only)
// 2 3 [4] are 0
// 4 5 6 7 [8] are 1 (defined at level 1 only)
// 8 9 10 [11] are 2 (defined at both levels)
//
// 0: [[100, 101], [102, 103]]
// 1: []
// 2: [[104, 105], [106, 107]]
// 3: [[108, 109], [110, 111], [112, 113], [114, 115]]
// 4: [[116, 117], [118, 119], [120, 121]]
definition: vec![
0, 0, 0, 0, 1, 0, 0, 0, 0, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4,
],
repetition: Some(vec![
0, 2, 1, 2, 0, 0, 2, 1, 2, 0, 2, 1, 2, 1, 2, 1, 2, 0, 2, 1, 2, 1, 2,
]),
array_offsets,
array_mask: vec![
false, false, false, false, false, false, false, false, false, true,
true, true, true, true, true, true, true, true, true, true, true, true,
true,
],
max_definition: 4,
level_type: LevelType::List(true),
offset: 0,
length: 22,
};
assert_eq!(&levels.definition, &expected_levels.definition);
assert_eq!(&levels.repetition, &expected_levels.repetition);
assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
assert_eq!(&levels.array_mask, &expected_levels.array_mask);
assert_eq!(&levels.max_definition, &expected_levels.max_definition);
assert_eq!(&levels.level_type, &expected_levels.level_type);
assert_eq!(&levels, &expected_levels);
}
#[test]
fn test_calculate_array_levels_nested_list() {
// if all array values are defined (e.g. batch<list<_>>)
// The array at this level looks like:
// 0: [a]
// 1: [a]
// 2: [a]
// 3: [a]
let parent_levels = LevelInfo {
definition: vec![1, 1, 1, 1],
repetition: None,
array_offsets: vec![0, 1, 2, 3, 4],
array_mask: vec![true, true, true, true],
max_definition: 1,
level_type: LevelType::Struct(true),
offset: 0,
length: 4,
};
// 0: null ([], but mask is false, so it's not just an empty list)
// 1: [1, 2, 3]
// 2: [4, 5]
// 3: [6, 7]
let array_offsets = vec![0, 1, 4, 6, 8];
let array_mask = vec![false, true, true, true];
let levels = parent_levels.calculate_child_levels(
array_offsets.clone(),
array_mask,
LevelType::List(true),
);
// 0: [null], level 1 is defined, but not 2
// 1: [1, 2, 3]
// 2: [4, 5]
// 3: [6, 7]
let expected_levels = LevelInfo {
definition: vec![1, 2, 2, 2, 2, 2, 2, 2],
repetition: Some(vec![0, 0, 1, 1, 0, 1, 0, 1]),
array_offsets,
array_mask: vec![false, true, true, true, true, true, true, true],
max_definition: 2,
level_type: LevelType::List(true),
offset: 0,
length: 8,
};
assert_eq!(&levels.definition, &expected_levels.definition);
assert_eq!(&levels.repetition, &expected_levels.repetition);
assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
assert_eq!(&levels.max_definition, &expected_levels.max_definition);
assert_eq!(&levels.level_type, &expected_levels.level_type);
assert_eq!(&levels, &expected_levels);
// nested lists (using previous test)
let nested_parent_levels = levels;
// 0: [null] (was a populated null slot at the parent)
// 1: [201]
// 2: [202, 203]
// 3: null ([])
// 4: [204, 205, 206]
// 5: [207, 208, 209, 210]
// 6: [] (tests a non-null empty list slot)
// 7: [211, 212, 213, 214, 215]
let array_offsets = vec![0, 1, 2, 4, 4, 7, 11, 11, 16];
// logically, the fist slot of the mask is false
let array_mask = vec![true, true, true, false, true, true, true, true];
let levels = nested_parent_levels.calculate_child_levels(
array_offsets.clone(),
array_mask,
LevelType::List(true),
);
// We have 7 array values, and at least 15 primitives (from array_offsets)
// 0: (-)[null], parent was null, no value populated here
// 1: (0)[201], (1)[202, 203], (2)[[null]]
// 2: (3)[204, 205, 206], (4)[207, 208, 209, 210]
// 3: (5)[[]], (6)[211, 212, 213, 214, 215]
//
// In a JSON syntax with the schema: <struct<list<list<primitive>>>>, this translates into:
// 0: {"struct": [ null ]}
// 1: {"struct": [ [201], [202, 203], [] ]}
// 2: {"struct": [ [204, 205, 206], [207, 208, 209, 210] ]}
// 3: {"struct": [ [], [211, 212, 213, 214, 215] ]}
let expected_levels = LevelInfo {
definition: vec![1, 4, 4, 4, 2, 4, 4, 4, 4, 4, 4, 4, 2, 4, 4, 4, 4, 4],
repetition: Some(vec![0, 0, 1, 2, 1, 0, 2, 2, 1, 2, 2, 2, 0, 1, 2, 2, 2, 2]),
array_mask: vec![
false, true, true, true, false, true, true, true, true, true, true, true,
true, true, true, true, true, true,
],
array_offsets,
max_definition: 4,
level_type: LevelType::List(true),
offset: 0,
length: 16,
};
assert_eq!(&levels.definition, &expected_levels.definition);
assert_eq!(&levels.repetition, &expected_levels.repetition);
assert_eq!(&levels.array_offsets, &expected_levels.array_offsets);
assert_eq!(&levels.array_mask, &expected_levels.array_mask);
assert_eq!(&levels.max_definition, &expected_levels.max_definition);
assert_eq!(&levels.level_type, &expected_levels.level_type);
assert_eq!(&levels, &expected_levels);
}
#[test]
fn test_calculate_nested_struct_levels() {
// tests a <struct[a]<struct[b]<int[c]>>
// array:
// - {a: {b: {c: 1}}}
// - {a: {b: {c: null}}}
// - {a: {b: {c: 3}}}
// - {a: {b: null}}
// - {a: null}}
// - {a: {b: {c: 6}}}
let a_levels = LevelInfo {
definition: vec![1, 1, 1, 1, 0, 1],
repetition: None,
array_offsets: (0..=6).collect(),
array_mask: vec![true, true, true, true, false, true],
max_definition: 1,
level_type: LevelType::Struct(true),
offset: 0,
length: 6,
};
// b's offset and mask
let b_offsets: Vec<i64> = (0..=6).collect();
let b_mask = vec![true, true, true, false, false, true];
// b's expected levels
let b_expected_levels = LevelInfo {
definition: vec![2, 2, 2, 1, 0, 2],
repetition: None,
array_offsets: (0..=6).collect(),
array_mask: vec![true, true, true, false, false, true],
max_definition: 2,
level_type: LevelType::Struct(true),
offset: 0,
length: 6,
};
let b_levels = a_levels.calculate_child_levels(
b_offsets.clone(),
b_mask,
LevelType::Struct(true),
);
assert_eq!(&b_expected_levels, &b_levels);
// c's offset and mask
let c_offsets = b_offsets;
let c_mask = vec![true, false, true, false, false, true];
// c's expected levels
let c_expected_levels = LevelInfo {
definition: vec![3, 2, 3, 1, 0, 3],
repetition: None,
array_offsets: c_offsets.clone(),
array_mask: vec![true, false, true, false, false, true],
max_definition: 3,
level_type: LevelType::Struct(true),
offset: 0,
length: 6,
};
let c_levels =
b_levels.calculate_child_levels(c_offsets, c_mask, LevelType::Struct(true));
assert_eq!(&c_expected_levels, &c_levels);
}
#[test]
fn list_single_column() {
// this tests the level generation from the arrow_writer equivalent test
let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
let a_value_offsets =
arrow::buffer::Buffer::from(&[0, 1, 3, 3, 6, 10].to_byte_slice());
let a_list_type =
DataType::List(Box::new(Field::new("item", DataType::Int32, true)));
let a_list_data = ArrayData::builder(a_list_type.clone())
.len(5)
.add_buffer(a_value_offsets)
.null_bit_buffer(Buffer::from(vec![0b00011011]))
.add_child_data(a_values.data().clone())
.build();
assert_eq!(a_list_data.null_count(), 1);
let a = ListArray::from(a_list_data);
let values = Arc::new(a);
let schema = Schema::new(vec![Field::new("item", a_list_type, true)]);
let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
let expected_batch_level = LevelInfo {
definition: vec![0; 2],
repetition: None,
array_offsets: (0..=2).collect(),
array_mask: vec![true, true],
max_definition: 0,
level_type: LevelType::Root,
offset: 2,
length: 2,
};
let batch_level = LevelInfo::new(2, 2);
assert_eq!(&batch_level, &expected_batch_level);
// calculate the list's level
let mut levels = vec![];
batch
.columns()
.iter()
.zip(batch.schema().fields())
.for_each(|(array, field)| {
let mut array_levels = batch_level.calculate_array_levels(array, field);
levels.append(&mut array_levels);
});
assert_eq!(levels.len(), 1);
let list_level = levels.get(0).unwrap();
let expected_level = LevelInfo {
definition: vec![0, 3, 3, 3],
repetition: Some(vec![0, 0, 1, 1]),
array_offsets: vec![3, 3, 6],
array_mask: vec![false, true, true, true],
max_definition: 3,
level_type: LevelType::Primitive(true),
offset: 3,
length: 3,
};
assert_eq!(&list_level.definition, &expected_level.definition);
assert_eq!(&list_level.repetition, &expected_level.repetition);
assert_eq!(&list_level.array_offsets, &expected_level.array_offsets);
assert_eq!(&list_level.array_mask, &expected_level.array_mask);
assert_eq!(&list_level.max_definition, &expected_level.max_definition);
assert_eq!(&list_level.level_type, &expected_level.level_type);
assert_eq!(list_level, &expected_level);
}
#[test]
fn mixed_struct_list() {
// this tests the level generation from the equivalent arrow_writer_complex test
// define schema
let struct_field_d = Field::new("d", DataType::Float64, true);
let struct_field_f = Field::new("f", DataType::Float32, true);
let struct_field_g = Field::new(
"g",
DataType::List(Box::new(Field::new("items", DataType::Int16, false))),
false,
);
let struct_field_e = Field::new(
"e",
DataType::Struct(vec![struct_field_f.clone(), struct_field_g.clone()]),
true,
);
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, true),
Field::new(
"c",
DataType::Struct(vec![struct_field_d.clone(), struct_field_e.clone()]),
true, // https://github.com/apache/arrow-rs/issues/245
),
]);
// create some data
let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
// Construct a buffer for value offsets, for the nested array:
// [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
let g_value_offsets =
arrow::buffer::Buffer::from(&[0, 1, 3, 3, 6, 10].to_byte_slice());
// Construct a list array from the above two
let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
.len(5)
.add_buffer(g_value_offsets)
.add_child_data(g_value.data().clone())
.build();
let g = ListArray::from(g_list_data);
let e = StructArray::from(vec![
(struct_field_f, Arc::new(f) as ArrayRef),
(struct_field_g, Arc::new(g) as ArrayRef),
]);
let c = StructArray::from(vec![
(struct_field_d, Arc::new(d) as ArrayRef),
(struct_field_e, Arc::new(e) as ArrayRef),
]);
// build a record batch
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(a), Arc::new(b), Arc::new(c)],
)
.unwrap();
//////////////////////////////////////////////
let expected_batch_level = LevelInfo {
definition: vec![0; 5],
repetition: None,
array_offsets: (0..=5).collect(),
array_mask: vec![true, true, true, true, true],
max_definition: 0,
level_type: LevelType::Root,
offset: 0,
length: 5,
};
let batch_level = LevelInfo::new(0, 5);
assert_eq!(&batch_level, &expected_batch_level);
// calculate the list's level
let mut levels = vec![];
batch
.columns()
.iter()
.zip(batch.schema().fields())
.for_each(|(array, field)| {
let mut array_levels = batch_level.calculate_array_levels(array, field);
levels.append(&mut array_levels);
});
assert_eq!(levels.len(), 5);
// test "a" levels
let list_level = levels.get(0).unwrap();
let expected_level = LevelInfo {
definition: vec![1, 1, 1, 1, 1],
repetition: None,
array_offsets: vec![0, 1, 2, 3, 4, 5],
array_mask: vec![true, true, true, true, true],
max_definition: 1,
level_type: LevelType::Primitive(false),
offset: 0,
length: 5,
};
assert_eq!(list_level, &expected_level);
// test "b" levels
let list_level = levels.get(1).unwrap();
let expected_level = LevelInfo {
definition: vec![1, 0, 0, 1, 1],
repetition: None,
array_offsets: vec![0, 1, 2, 3, 4, 5],
array_mask: vec![true, false, false, true, true],
max_definition: 1,
level_type: LevelType::Primitive(true),
offset: 0,
length: 5,
};
assert_eq!(list_level, &expected_level);
// test "d" levels
let list_level = levels.get(2).unwrap();
let expected_level = LevelInfo {
definition: vec![1, 1, 1, 2, 1],
repetition: None,
array_offsets: vec![0, 1, 2, 3, 4, 5],
array_mask: vec![false, false, false, true, false],
max_definition: 2,
level_type: LevelType::Primitive(true),
offset: 0,
length: 5,
};
assert_eq!(list_level, &expected_level);
// test "f" levels
let list_level = levels.get(3).unwrap();
let expected_level = LevelInfo {
definition: vec![3, 2, 3, 2, 3],
repetition: None,
array_offsets: vec![0, 1, 2, 3, 4, 5],
array_mask: vec![true, false, true, false, true],
max_definition: 3,
level_type: LevelType::Primitive(true),
offset: 0,
length: 5,
};
assert_eq!(list_level, &expected_level);
}
#[test]
fn test_filter_array_indices() {
let level = LevelInfo {
definition: vec![3, 3, 3, 1, 3, 3, 3],
repetition: Some(vec![0, 1, 1, 0, 0, 1, 1]),
array_offsets: vec![0, 3, 3, 6],
array_mask: vec![true, true, true, false, true, true, true],
max_definition: 3,
level_type: LevelType::Primitive(true),
offset: 0,
length: 6,
};
let expected = vec![0, 1, 2, 3, 4, 5];
let filter = level.filter_array_indices();
assert_eq!(expected, filter);
}
#[test]
fn test_null_vs_nonnull_struct() {
// define schema
let offset_field = Field::new("offset", DataType::Int32, true);
let schema = Schema::new(vec![Field::new(
"some_nested_object",
DataType::Struct(vec![offset_field.clone()]),
false,
)]);
// create some data
let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
let some_nested_object =
StructArray::from(vec![(offset_field, Arc::new(offset) as ArrayRef)]);
// build a record batch
let batch =
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)])
.unwrap();
let batch_level = LevelInfo::new(0, batch.num_rows());
let struct_null_level =
batch_level.calculate_array_levels(batch.column(0), batch.schema().field(0));
// create second batch
// define schema
let offset_field = Field::new("offset", DataType::Int32, true);
let schema = Schema::new(vec![Field::new(
"some_nested_object",
DataType::Struct(vec![offset_field.clone()]),
true,
)]);
// create some data
let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
let some_nested_object =
StructArray::from(vec![(offset_field, Arc::new(offset) as ArrayRef)]);
// build a record batch
let batch =
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)])
.unwrap();
let batch_level = LevelInfo::new(0, batch.num_rows());
let struct_non_null_level =
batch_level.calculate_array_levels(batch.column(0), batch.schema().field(0));
// The 2 levels should not be the same
if struct_non_null_level == struct_null_level {
panic!("Levels should not be equal, to reflect the difference in struct nullness");
}
}
}