| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| //! A comparable row-oriented representation of a collection of [`Array`]. |
| //! |
| //! [`Row`]s are [normalized for sorting], and can therefore be very efficiently [compared], |
| //! using [`memcmp`] under the hood, or used in [non-comparison sorts] such as [radix sort]. |
| //! This makes the row format ideal for implementing efficient multi-column sorting, |
| //! grouping, aggregation, windowing and more, as described in more detail |
| //! [in this blog post](https://arrow.apache.org/blog/2022/11/07/multi-column-sorts-in-arrow-rust-part-1/). |
| //! |
| //! For example, given three input [`Array`], [`RowConverter`] creates byte |
| //! sequences that [compare] the same as when using [`lexsort`]. |
| //! |
| //! ```text |
| //! ┌─────┐ ┌─────┐ ┌─────┐ |
| //! │ │ │ │ │ │ |
| //! ├─────┤ ┌ ┼─────┼ ─ ┼─────┼ ┐ ┏━━━━━━━━━━━━━┓ |
| //! │ │ │ │ │ │ ─────────────▶┃ ┃ |
| //! ├─────┤ └ ┼─────┼ ─ ┼─────┼ ┘ ┗━━━━━━━━━━━━━┛ |
| //! │ │ │ │ │ │ |
| //! └─────┘ └─────┘ └─────┘ |
| //! ... |
| //! ┌─────┐ ┌ ┬─────┬ ─ ┬─────┬ ┐ ┏━━━━━━━━┓ |
| //! │ │ │ │ │ │ ─────────────▶┃ ┃ |
| //! └─────┘ └ ┴─────┴ ─ ┴─────┴ ┘ ┗━━━━━━━━┛ |
| //! UInt64 Utf8 F64 |
| //! |
| //! Input Arrays Row Format |
| //! (Columns) |
| //! ``` |
| //! |
| //! _[`Rows`] must be generated by the same [`RowConverter`] for the comparison |
| //! to be meaningful._ |
| //! |
| //! # Basic Example |
| //! ``` |
| //! # use std::sync::Arc; |
| //! # use arrow_row::{RowConverter, SortField}; |
| //! # use arrow_array::{ArrayRef, Int32Array, StringArray}; |
| //! # use arrow_array::cast::{AsArray, as_string_array}; |
| //! # use arrow_array::types::Int32Type; |
| //! # use arrow_schema::DataType; |
| //! |
| //! let a1 = Arc::new(Int32Array::from_iter_values([-1, -1, 0, 3, 3])) as ArrayRef; |
| //! let a2 = Arc::new(StringArray::from_iter_values(["a", "b", "c", "d", "d"])) as ArrayRef; |
| //! let arrays = vec![a1, a2]; |
| //! |
| //! // Convert arrays to rows |
| //! let converter = RowConverter::new(vec![ |
| //! SortField::new(DataType::Int32), |
| //! SortField::new(DataType::Utf8), |
| //! ]).unwrap(); |
| //! let rows = converter.convert_columns(&arrays).unwrap(); |
| //! |
| //! // Compare rows |
| //! for i in 0..4 { |
| //! assert!(rows.row(i) <= rows.row(i + 1)); |
| //! } |
| //! assert_eq!(rows.row(3), rows.row(4)); |
| //! |
| //! // Convert rows back to arrays |
| //! let converted = converter.convert_rows(&rows).unwrap(); |
| //! assert_eq!(arrays, converted); |
| //! |
| //! // Compare rows from different arrays |
| //! let a1 = Arc::new(Int32Array::from_iter_values([3, 4])) as ArrayRef; |
| //! let a2 = Arc::new(StringArray::from_iter_values(["e", "f"])) as ArrayRef; |
| //! let arrays = vec![a1, a2]; |
| //! let rows2 = converter.convert_columns(&arrays).unwrap(); |
| //! |
| //! assert!(rows.row(4) < rows2.row(0)); |
| //! assert!(rows.row(4) < rows2.row(1)); |
| //! |
| //! // Convert selection of rows back to arrays |
| //! let selection = [rows.row(0), rows2.row(1), rows.row(2), rows2.row(0)]; |
| //! let converted = converter.convert_rows(selection).unwrap(); |
| //! let c1 = converted[0].as_primitive::<Int32Type>(); |
| //! assert_eq!(c1.values(), &[-1, 4, 0, 3]); |
| //! |
| //! let c2 = converted[1].as_string::<i32>(); |
| //! let c2_values: Vec<_> = c2.iter().flatten().collect(); |
| //! assert_eq!(&c2_values, &["a", "f", "c", "e"]); |
| //! ``` |
| //! |
| //! # Lexsort |
| //! |
| //! The row format can also be used to implement a fast multi-column / lexicographic sort |
| //! |
| //! ``` |
| //! # use arrow_row::{RowConverter, SortField}; |
| //! # use arrow_array::{ArrayRef, UInt32Array}; |
| //! fn lexsort_to_indices(arrays: &[ArrayRef]) -> UInt32Array { |
| //! let fields = arrays |
| //! .iter() |
| //! .map(|a| SortField::new(a.data_type().clone())) |
| //! .collect(); |
| //! let converter = RowConverter::new(fields).unwrap(); |
| //! let rows = converter.convert_columns(arrays).unwrap(); |
| //! let mut sort: Vec<_> = rows.iter().enumerate().collect(); |
| //! sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b)); |
| //! UInt32Array::from_iter_values(sort.iter().map(|(i, _)| *i as u32)) |
| //! } |
| //! ``` |
| //! |
| //! [non-comparison sorts]: https://en.wikipedia.org/wiki/Sorting_algorithm#Non-comparison_sorts |
| //! [radix sort]: https://en.wikipedia.org/wiki/Radix_sort |
| //! [normalized for sorting]: http://wwwlgis.informatik.uni-kl.de/archiv/wwwdvs.informatik.uni-kl.de/courses/DBSREAL/SS2005/Vorlesungsunterlagen/Implementing_Sorting.pdf |
| //! [`memcmp`]: https://www.man7.org/linux/man-pages/man3/memcmp.3.html |
| //! [`lexsort`]: https://docs.rs/arrow-ord/latest/arrow_ord/sort/fn.lexsort.html |
| //! [compared]: PartialOrd |
| //! [compare]: PartialOrd |
| |
| use std::cmp::Ordering; |
| use std::hash::{Hash, Hasher}; |
| use std::sync::Arc; |
| |
| use arrow_array::cast::*; |
| use arrow_array::types::ArrowDictionaryKeyType; |
| use arrow_array::*; |
| use arrow_buffer::ArrowNativeType; |
| use arrow_data::ArrayDataBuilder; |
| use arrow_schema::*; |
| use variable::{decode_binary_view, decode_string_view}; |
| |
| use crate::fixed::{decode_bool, decode_fixed_size_binary, decode_primitive}; |
| use crate::variable::{decode_binary, decode_string}; |
| |
| mod fixed; |
| mod list; |
| mod variable; |
| |
| /// Converts [`ArrayRef`] columns into a [row-oriented](self) format. |
| /// |
| /// *Note: The encoding of the row format may change from release to release.* |
| /// |
| /// ## Overview |
| /// |
| /// The row format is a variable length byte sequence created by |
| /// concatenating the encoded form of each column. The encoding for |
| /// each column depends on its datatype (and sort options). |
| /// |
| /// The encoding is carefully designed in such a way that escaping is |
| /// unnecessary: it is never ambiguous as to whether a byte is part of |
| /// a sentinel (e.g. null) or a value. |
| /// |
| /// ## Unsigned Integer Encoding |
| /// |
| /// A null integer is encoded as a `0_u8`, followed by a zero-ed number of bytes corresponding |
| /// to the integer's length. |
| /// |
| /// A valid integer is encoded as `1_u8`, followed by the big-endian representation of the |
| /// integer. |
| /// |
| /// ```text |
| /// ┌──┬──┬──┬──┐ ┌──┬──┬──┬──┬──┐ |
| /// 3 │03│00│00│00│ │01│00│00│00│03│ |
| /// └──┴──┴──┴──┘ └──┴──┴──┴──┴──┘ |
| /// ┌──┬──┬──┬──┐ ┌──┬──┬──┬──┬──┐ |
| /// 258 │02│01│00│00│ │01│00│00│01│02│ |
| /// └──┴──┴──┴──┘ └──┴──┴──┴──┴──┘ |
| /// ┌──┬──┬──┬──┐ ┌──┬──┬──┬──┬──┐ |
| /// 23423 │7F│5B│00│00│ │01│00│00│5B│7F│ |
| /// └──┴──┴──┴──┘ └──┴──┴──┴──┴──┘ |
| /// ┌──┬──┬──┬──┐ ┌──┬──┬──┬──┬──┐ |
| /// NULL │??│??│??│??│ │00│00│00│00│00│ |
| /// └──┴──┴──┴──┘ └──┴──┴──┴──┴──┘ |
| /// |
| /// 32-bit (4 bytes) Row Format |
| /// Value Little Endian |
| /// ``` |
| /// |
| /// ## Signed Integer Encoding |
| /// |
| /// Signed integers have their most significant sign bit flipped, and are then encoded in the |
| /// same manner as an unsigned integer. |
| /// |
| /// ```text |
| /// ┌──┬──┬──┬──┐ ┌──┬──┬──┬──┐ ┌──┬──┬──┬──┬──┐ |
| /// 5 │05│00│00│00│ │05│00│00│80│ │01│80│00│00│05│ |
| /// └──┴──┴──┴──┘ └──┴──┴──┴──┘ └──┴──┴──┴──┴──┘ |
| /// ┌──┬──┬──┬──┐ ┌──┬──┬──┬──┐ ┌──┬──┬──┬──┬──┐ |
| /// -5 │FB│FF│FF│FF│ │FB│FF│FF│7F│ │01│7F│FF│FF│FB│ |
| /// └──┴──┴──┴──┘ └──┴──┴──┴──┘ └──┴──┴──┴──┴──┘ |
| /// |
| /// Value 32-bit (4 bytes) High bit flipped Row Format |
| /// Little Endian |
| /// ``` |
| /// |
| /// ## Float Encoding |
| /// |
| /// Floats are converted from IEEE 754 representation to a signed integer representation |
| /// by flipping all bar the sign bit if they are negative. |
| /// |
| /// They are then encoded in the same manner as a signed integer. |
| /// |
| /// ## Fixed Length Bytes Encoding |
| /// |
| /// Fixed length bytes are encoded in the same fashion as primitive types above. |
| /// |
| /// For a fixed length array of length `n`: |
| /// |
| /// A null is encoded as `0_u8` null sentinel followed by `n` `0_u8` bytes |
| /// |
| /// A valid value is encoded as `1_u8` followed by the value bytes |
| /// |
| /// ## Variable Length Bytes (including Strings) Encoding |
| /// |
| /// A null is encoded as a `0_u8`. |
| /// |
| /// An empty byte array is encoded as `1_u8`. |
| /// |
| /// A non-null, non-empty byte array is encoded as `2_u8` followed by the byte array |
| /// encoded using a block based scheme described below. |
| /// |
| /// The byte array is broken up into fixed-width blocks, each block is written in turn |
| /// to the output, followed by `0xFF_u8`. The final block is padded to 32-bytes |
| /// with `0_u8` and written to the output, followed by the un-padded length in bytes |
| /// of this final block as a `u8`. The first 4 blocks have a length of 8, with subsequent |
| /// blocks using a length of 32, this is to reduce space amplification for small strings. |
| /// |
| /// Note the following example encodings use a block size of 4 bytes for brevity: |
| /// |
| /// ```text |
| /// ┌───┬───┬───┬───┬───┬───┐ |
| /// "MEEP" │02 │'M'│'E'│'E'│'P'│04 │ |
| /// └───┴───┴───┴───┴───┴───┘ |
| /// |
| /// ┌───┐ |
| /// "" │01 | |
| /// └───┘ |
| /// |
| /// NULL ┌───┐ |
| /// │00 │ |
| /// └───┘ |
| /// |
| /// "Defenestration" ┌───┬───┬───┬───┬───┬───┐ |
| /// │02 │'D'│'e'│'f'│'e'│FF │ |
| /// └───┼───┼───┼───┼───┼───┤ |
| /// │'n'│'e'│'s'│'t'│FF │ |
| /// ├───┼───┼───┼───┼───┤ |
| /// │'r'│'a'│'t'│'r'│FF │ |
| /// ├───┼───┼───┼───┼───┤ |
| /// │'a'│'t'│'i'│'o'│FF │ |
| /// ├───┼───┼───┼───┼───┤ |
| /// │'n'│00 │00 │00 │01 │ |
| /// └───┴───┴───┴───┴───┘ |
| /// ``` |
| /// |
| /// This approach is loosely inspired by [COBS] encoding, and chosen over more traditional |
| /// [byte stuffing] as it is more amenable to vectorisation, in particular AVX-256. |
| /// |
| /// ## Dictionary Encoding |
| /// |
| /// Dictionaries are hydrated to their underlying values |
| /// |
| /// ## Struct Encoding |
| /// |
| /// A null is encoded as a `0_u8`. |
| /// |
| /// A valid value is encoded as `1_u8` followed by the row encoding of each child. |
| /// |
| /// This encoding effectively flattens the schema in a depth-first fashion. |
| /// |
| /// For example |
| /// |
| /// ```text |
| /// ┌───────┬────────────────────────┬───────┐ |
| /// │ Int32 │ Struct[Int32, Float32] │ Int32 │ |
| /// └───────┴────────────────────────┴───────┘ |
| /// ``` |
| /// |
| /// Is encoded as |
| /// |
| /// ```text |
| /// ┌───────┬───────────────┬───────┬─────────┬───────┐ |
| /// │ Int32 │ Null Sentinel │ Int32 │ Float32 │ Int32 │ |
| /// └───────┴───────────────┴───────┴─────────┴───────┘ |
| /// ``` |
| /// |
| /// ## List Encoding |
| /// |
| /// Lists are encoded by first encoding all child elements to the row format. |
| /// |
| /// A list value is then encoded as the concatenation of each of the child elements, |
| /// separately encoded using the variable length encoding described above, followed |
| /// by the variable length encoding of an empty byte array. |
| /// |
| /// For example given: |
| /// |
| /// ```text |
| /// [1_u8, 2_u8, 3_u8] |
| /// [1_u8, null] |
| /// [] |
| /// null |
| /// ``` |
| /// |
| /// The elements would be converted to: |
| /// |
| /// ```text |
| /// ┌──┬──┐ ┌──┬──┐ ┌──┬──┐ ┌──┬──┐ ┌──┬──┐ |
| /// 1 │01│01│ 2 │01│02│ 3 │01│03│ 1 │01│01│ null │00│00│ |
| /// └──┴──┘ └──┴──┘ └──┴──┘ └──┴──┘ └──┴──┘ |
| ///``` |
| /// |
| /// Which would be encoded as |
| /// |
| /// ```text |
| /// ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐ |
| /// [1_u8, 2_u8, 3_u8] │02│01│01│00│00│02│02│01│02│00│00│02│02│01│03│00│00│02│01│ |
| /// └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘ |
| /// └──── 1_u8 ────┘ └──── 2_u8 ────┘ └──── 3_u8 ────┘ |
| /// |
| /// ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐ |
| /// [1_u8, null] │02│01│01│00│00│02│02│00│00│00│00│02│01│ |
| /// └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘ |
| /// └──── 1_u8 ────┘ └──── null ────┘ |
| /// |
| ///``` |
| /// |
| /// With `[]` represented by an empty byte array, and `null` a null byte array. |
| /// |
| /// # Ordering |
| /// |
| /// ## Float Ordering |
| /// |
| /// Floats are totally ordered in accordance to the `totalOrder` predicate as defined |
| /// in the IEEE 754 (2008 revision) floating point standard. |
| /// |
| /// The ordering established by this does not always agree with the |
| /// [`PartialOrd`] and [`PartialEq`] implementations of `f32`. For example, |
| /// they consider negative and positive zero equal, while this does not |
| /// |
| /// ## Null Ordering |
| /// |
| /// The encoding described above will order nulls first, this can be inverted by representing |
| /// nulls as `0xFF_u8` instead of `0_u8` |
| /// |
| /// ## Reverse Column Ordering |
| /// |
| /// The order of a given column can be reversed by negating the encoded bytes of non-null values |
| /// |
| /// [COBS]: https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing |
| /// [byte stuffing]: https://en.wikipedia.org/wiki/High-Level_Data_Link_Control#Asynchronous_framing |
| #[derive(Debug)] |
| pub struct RowConverter { |
| fields: Arc<[SortField]>, |
| /// State for codecs |
| codecs: Vec<Codec>, |
| } |
| |
| #[derive(Debug)] |
| enum Codec { |
| /// No additional codec state is necessary |
| Stateless, |
| /// A row converter for the dictionary values |
| /// and the encoding of a row containing only nulls |
| Dictionary(RowConverter, OwnedRow), |
| /// A row converter for the child fields |
| /// and the encoding of a row containing only nulls |
| Struct(RowConverter, OwnedRow), |
| /// A row converter for the child field |
| List(RowConverter), |
| } |
| |
| impl Codec { |
| fn new(sort_field: &SortField) -> Result<Self, ArrowError> { |
| match &sort_field.data_type { |
| DataType::Dictionary(_, values) => { |
| let sort_field = |
| SortField::new_with_options(values.as_ref().clone(), sort_field.options); |
| |
| let converter = RowConverter::new(vec![sort_field])?; |
| let null_array = new_null_array(values.as_ref(), 1); |
| let nulls = converter.convert_columns(&[null_array])?; |
| |
| let owned = OwnedRow { |
| data: nulls.buffer.into(), |
| config: nulls.config, |
| }; |
| Ok(Self::Dictionary(converter, owned)) |
| } |
| d if !d.is_nested() => Ok(Self::Stateless), |
| DataType::List(f) | DataType::LargeList(f) => { |
| // The encoded contents will be inverted if descending is set to true |
| // As such we set `descending` to false and negate nulls first if it |
| // it set to true |
| let options = SortOptions { |
| descending: false, |
| nulls_first: sort_field.options.nulls_first != sort_field.options.descending, |
| }; |
| |
| let field = SortField::new_with_options(f.data_type().clone(), options); |
| let converter = RowConverter::new(vec![field])?; |
| Ok(Self::List(converter)) |
| } |
| DataType::Struct(f) => { |
| let sort_fields = f |
| .iter() |
| .map(|x| SortField::new_with_options(x.data_type().clone(), sort_field.options)) |
| .collect(); |
| |
| let converter = RowConverter::new(sort_fields)?; |
| let nulls: Vec<_> = f.iter().map(|x| new_null_array(x.data_type(), 1)).collect(); |
| |
| let nulls = converter.convert_columns(&nulls)?; |
| let owned = OwnedRow { |
| data: nulls.buffer.into(), |
| config: nulls.config, |
| }; |
| |
| Ok(Self::Struct(converter, owned)) |
| } |
| _ => Err(ArrowError::NotYetImplemented(format!( |
| "not yet implemented: {:?}", |
| sort_field.data_type |
| ))), |
| } |
| } |
| |
| fn encoder(&self, array: &dyn Array) -> Result<Encoder<'_>, ArrowError> { |
| match self { |
| Codec::Stateless => Ok(Encoder::Stateless), |
| Codec::Dictionary(converter, nulls) => { |
| let values = array.as_any_dictionary().values().clone(); |
| let rows = converter.convert_columns(&[values])?; |
| Ok(Encoder::Dictionary(rows, nulls.row())) |
| } |
| Codec::Struct(converter, null) => { |
| let v = as_struct_array(array); |
| let rows = converter.convert_columns(v.columns())?; |
| Ok(Encoder::Struct(rows, null.row())) |
| } |
| Codec::List(converter) => { |
| let values = match array.data_type() { |
| DataType::List(_) => as_list_array(array).values(), |
| DataType::LargeList(_) => as_large_list_array(array).values(), |
| _ => unreachable!(), |
| }; |
| let rows = converter.convert_columns(&[values.clone()])?; |
| Ok(Encoder::List(rows)) |
| } |
| } |
| } |
| |
| fn size(&self) -> usize { |
| match self { |
| Codec::Stateless => 0, |
| Codec::Dictionary(converter, nulls) => converter.size() + nulls.data.len(), |
| Codec::Struct(converter, nulls) => converter.size() + nulls.data.len(), |
| Codec::List(converter) => converter.size(), |
| } |
| } |
| } |
| |
| #[derive(Debug)] |
| enum Encoder<'a> { |
| /// No additional encoder state is necessary |
| Stateless, |
| /// The encoding of the child array and the encoding of a null row |
| Dictionary(Rows, Row<'a>), |
| /// The row encoding of the child arrays and the encoding of a null row |
| /// |
| /// It is necessary to encode to a temporary [`Rows`] to avoid serializing |
| /// values that are masked by a null in the parent StructArray, otherwise |
| /// this would establish an ordering between semantically null values |
| Struct(Rows, Row<'a>), |
| /// The row encoding of the child array |
| List(Rows), |
| } |
| |
| /// Configure the data type and sort order for a given column |
| #[derive(Debug, Clone, PartialEq, Eq)] |
| pub struct SortField { |
| /// Sort options |
| options: SortOptions, |
| /// Data type |
| data_type: DataType, |
| } |
| |
| impl SortField { |
| /// Create a new column with the given data type |
| pub fn new(data_type: DataType) -> Self { |
| Self::new_with_options(data_type, Default::default()) |
| } |
| |
| /// Create a new column with the given data type and [`SortOptions`] |
| pub fn new_with_options(data_type: DataType, options: SortOptions) -> Self { |
| Self { options, data_type } |
| } |
| |
| /// Return size of this instance in bytes. |
| /// |
| /// Includes the size of `Self`. |
| pub fn size(&self) -> usize { |
| self.data_type.size() + std::mem::size_of::<Self>() - std::mem::size_of::<DataType>() |
| } |
| } |
| |
| impl RowConverter { |
| /// Create a new [`RowConverter`] with the provided schema |
| pub fn new(fields: Vec<SortField>) -> Result<Self, ArrowError> { |
| if !Self::supports_fields(&fields) { |
| return Err(ArrowError::NotYetImplemented(format!( |
| "Row format support not yet implemented for: {fields:?}" |
| ))); |
| } |
| |
| let codecs = fields.iter().map(Codec::new).collect::<Result<_, _>>()?; |
| Ok(Self { |
| fields: fields.into(), |
| codecs, |
| }) |
| } |
| |
| /// Check if the given fields are supported by the row format. |
| pub fn supports_fields(fields: &[SortField]) -> bool { |
| fields.iter().all(|x| Self::supports_datatype(&x.data_type)) |
| } |
| |
| fn supports_datatype(d: &DataType) -> bool { |
| match d { |
| _ if !d.is_nested() => true, |
| DataType::List(f) | DataType::LargeList(f) | DataType::Map(f, _) => { |
| Self::supports_datatype(f.data_type()) |
| } |
| DataType::Struct(f) => f.iter().all(|x| Self::supports_datatype(x.data_type())), |
| _ => false, |
| } |
| } |
| |
| /// Convert [`ArrayRef`] columns into [`Rows`] |
| /// |
| /// See [`Row`] for information on when [`Row`] can be compared |
| /// |
| /// # Panics |
| /// |
| /// Panics if the schema of `columns` does not match that provided to [`RowConverter::new`] |
| pub fn convert_columns(&self, columns: &[ArrayRef]) -> Result<Rows, ArrowError> { |
| let num_rows = columns.first().map(|x| x.len()).unwrap_or(0); |
| let mut rows = self.empty_rows(num_rows, 0); |
| self.append(&mut rows, columns)?; |
| Ok(rows) |
| } |
| |
| /// Convert [`ArrayRef`] columns appending to an existing [`Rows`] |
| /// |
| /// See [`Row`] for information on when [`Row`] can be compared |
| /// |
| /// # Panics |
| /// |
| /// Panics if |
| /// * The schema of `columns` does not match that provided to [`RowConverter::new`] |
| /// * The provided [`Rows`] were not created by this [`RowConverter`] |
| /// |
| /// ``` |
| /// # use std::sync::Arc; |
| /// # use std::collections::HashSet; |
| /// # use arrow_array::cast::AsArray; |
| /// # use arrow_array::StringArray; |
| /// # use arrow_row::{Row, RowConverter, SortField}; |
| /// # use arrow_schema::DataType; |
| /// # |
| /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); |
| /// let a1 = StringArray::from(vec!["hello", "world"]); |
| /// let a2 = StringArray::from(vec!["a", "a", "hello"]); |
| /// |
| /// let mut rows = converter.empty_rows(5, 128); |
| /// converter.append(&mut rows, &[Arc::new(a1)]).unwrap(); |
| /// converter.append(&mut rows, &[Arc::new(a2)]).unwrap(); |
| /// |
| /// let back = converter.convert_rows(&rows).unwrap(); |
| /// let values: Vec<_> = back[0].as_string::<i32>().iter().map(Option::unwrap).collect(); |
| /// assert_eq!(&values, &["hello", "world", "a", "a", "hello"]); |
| /// ``` |
| pub fn append(&self, rows: &mut Rows, columns: &[ArrayRef]) -> Result<(), ArrowError> { |
| assert!( |
| Arc::ptr_eq(&rows.config.fields, &self.fields), |
| "rows were not produced by this RowConverter" |
| ); |
| |
| if columns.len() != self.fields.len() { |
| return Err(ArrowError::InvalidArgumentError(format!( |
| "Incorrect number of arrays provided to RowConverter, expected {} got {}", |
| self.fields.len(), |
| columns.len() |
| ))); |
| } |
| |
| let encoders = columns |
| .iter() |
| .zip(&self.codecs) |
| .zip(self.fields.iter()) |
| .map(|((column, codec), field)| { |
| if !column.data_type().equals_datatype(&field.data_type) { |
| return Err(ArrowError::InvalidArgumentError(format!( |
| "RowConverter column schema mismatch, expected {} got {}", |
| field.data_type, |
| column.data_type() |
| ))); |
| } |
| codec.encoder(column.as_ref()) |
| }) |
| .collect::<Result<Vec<_>, _>>()?; |
| |
| let write_offset = rows.num_rows(); |
| let lengths = row_lengths(columns, &encoders); |
| |
| // We initialize the offsets shifted down by one row index. |
| // |
| // As the rows are appended to the offsets will be incremented to match |
| // |
| // For example, consider the case of 3 rows of length 3, 4, and 6 respectively. |
| // The offsets would be initialized to `0, 0, 3, 7` |
| // |
| // Writing the first row entirely would yield `0, 3, 3, 7` |
| // The second, `0, 3, 7, 7` |
| // The third, `0, 3, 7, 13` |
| // |
| // This would be the final offsets for reading |
| // |
| // In this way offsets tracks the position during writing whilst eventually serving |
| // as identifying the offsets of the written rows |
| rows.offsets.reserve(lengths.len()); |
| let mut cur_offset = rows.offsets[write_offset]; |
| for l in lengths { |
| rows.offsets.push(cur_offset); |
| cur_offset = cur_offset.checked_add(l).expect("overflow"); |
| } |
| |
| // Note this will not zero out any trailing data in `rows.buffer`, |
| // e.g. resulting from a call to `Rows::clear`, relying instead on the |
| // encoders not assuming a zero-initialized buffer |
| rows.buffer.resize(cur_offset, 0); |
| |
| for ((column, field), encoder) in columns.iter().zip(self.fields.iter()).zip(encoders) { |
| // We encode a column at a time to minimise dispatch overheads |
| encode_column( |
| &mut rows.buffer, |
| &mut rows.offsets[write_offset..], |
| column.as_ref(), |
| field.options, |
| &encoder, |
| ) |
| } |
| |
| if cfg!(debug_assertions) { |
| assert_eq!(*rows.offsets.last().unwrap(), rows.buffer.len()); |
| rows.offsets |
| .windows(2) |
| .for_each(|w| assert!(w[0] <= w[1], "offsets should be monotonic")); |
| } |
| |
| Ok(()) |
| } |
| |
| /// Convert [`Rows`] columns into [`ArrayRef`] |
| /// |
| /// # Panics |
| /// |
| /// Panics if the rows were not produced by this [`RowConverter`] |
| pub fn convert_rows<'a, I>(&self, rows: I) -> Result<Vec<ArrayRef>, ArrowError> |
| where |
| I: IntoIterator<Item = Row<'a>>, |
| { |
| let mut validate_utf8 = false; |
| let mut rows: Vec<_> = rows |
| .into_iter() |
| .map(|row| { |
| assert!( |
| Arc::ptr_eq(&row.config.fields, &self.fields), |
| "rows were not produced by this RowConverter" |
| ); |
| validate_utf8 |= row.config.validate_utf8; |
| row.data |
| }) |
| .collect(); |
| |
| // SAFETY |
| // We have validated that the rows came from this [`RowConverter`] |
| // and therefore must be valid |
| unsafe { self.convert_raw(&mut rows, validate_utf8) } |
| } |
| |
| /// Returns an empty [`Rows`] with capacity for `row_capacity` rows with |
| /// a total length of `data_capacity` |
| /// |
| /// This can be used to buffer a selection of [`Row`] |
| /// |
| /// ``` |
| /// # use std::sync::Arc; |
| /// # use std::collections::HashSet; |
| /// # use arrow_array::cast::AsArray; |
| /// # use arrow_array::StringArray; |
| /// # use arrow_row::{Row, RowConverter, SortField}; |
| /// # use arrow_schema::DataType; |
| /// # |
| /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); |
| /// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]); |
| /// |
| /// // Convert to row format and deduplicate |
| /// let converted = converter.convert_columns(&[Arc::new(array)]).unwrap(); |
| /// let mut distinct_rows = converter.empty_rows(3, 100); |
| /// let mut dedup: HashSet<Row> = HashSet::with_capacity(3); |
| /// converted.iter().filter(|row| dedup.insert(*row)).for_each(|row| distinct_rows.push(row)); |
| /// |
| /// // Note: we could skip buffering and feed the filtered iterator directly |
| /// // into convert_rows, this is done for demonstration purposes only |
| /// let distinct = converter.convert_rows(&distinct_rows).unwrap(); |
| /// let values: Vec<_> = distinct[0].as_string::<i32>().iter().map(Option::unwrap).collect(); |
| /// assert_eq!(&values, &["hello", "world", "a"]); |
| /// ``` |
| pub fn empty_rows(&self, row_capacity: usize, data_capacity: usize) -> Rows { |
| let mut offsets = Vec::with_capacity(row_capacity.saturating_add(1)); |
| offsets.push(0); |
| |
| Rows { |
| offsets, |
| buffer: Vec::with_capacity(data_capacity), |
| config: RowConfig { |
| fields: self.fields.clone(), |
| validate_utf8: false, |
| }, |
| } |
| } |
| |
| /// Convert raw bytes into [`ArrayRef`] |
| /// |
| /// # Safety |
| /// |
| /// `rows` must contain valid data for this [`RowConverter`] |
| unsafe fn convert_raw( |
| &self, |
| rows: &mut [&[u8]], |
| validate_utf8: bool, |
| ) -> Result<Vec<ArrayRef>, ArrowError> { |
| self.fields |
| .iter() |
| .zip(&self.codecs) |
| .map(|(field, codec)| decode_column(field, rows, codec, validate_utf8)) |
| .collect() |
| } |
| |
| /// Returns a [`RowParser`] that can be used to parse [`Row`] from bytes |
| pub fn parser(&self) -> RowParser { |
| RowParser::new(Arc::clone(&self.fields)) |
| } |
| |
| /// Returns the size of this instance in bytes |
| /// |
| /// Includes the size of `Self`. |
| pub fn size(&self) -> usize { |
| std::mem::size_of::<Self>() |
| + self.fields.iter().map(|x| x.size()).sum::<usize>() |
| + self.codecs.capacity() * std::mem::size_of::<Codec>() |
| + self.codecs.iter().map(Codec::size).sum::<usize>() |
| } |
| } |
| |
| /// A [`RowParser`] can be created from a [`RowConverter`] and used to parse bytes to [`Row`] |
| #[derive(Debug)] |
| pub struct RowParser { |
| config: RowConfig, |
| } |
| |
| impl RowParser { |
| fn new(fields: Arc<[SortField]>) -> Self { |
| Self { |
| config: RowConfig { |
| fields, |
| validate_utf8: true, |
| }, |
| } |
| } |
| |
| /// Creates a [`Row`] from the provided `bytes`. |
| /// |
| /// `bytes` must be a [`Row`] produced by the [`RowConverter`] associated with |
| /// this [`RowParser`], otherwise subsequent operations with the produced [`Row`] may panic |
| pub fn parse<'a>(&'a self, bytes: &'a [u8]) -> Row<'a> { |
| Row { |
| data: bytes, |
| config: &self.config, |
| } |
| } |
| } |
| |
| /// The config of a given set of [`Row`] |
| #[derive(Debug, Clone)] |
| struct RowConfig { |
| /// The schema for these rows |
| fields: Arc<[SortField]>, |
| /// Whether to run UTF-8 validation when converting to arrow arrays |
| validate_utf8: bool, |
| } |
| |
| /// A row-oriented representation of arrow data, that is normalized for comparison. |
| /// |
| /// See the [module level documentation](self) and [`RowConverter`] for more details. |
| #[derive(Debug)] |
| pub struct Rows { |
| /// Underlying row bytes |
| buffer: Vec<u8>, |
| /// Row `i` has data `&buffer[offsets[i]..offsets[i+1]]` |
| offsets: Vec<usize>, |
| /// The config for these rows |
| config: RowConfig, |
| } |
| |
| impl Rows { |
| /// Append a [`Row`] to this [`Rows`] |
| pub fn push(&mut self, row: Row<'_>) { |
| assert!( |
| Arc::ptr_eq(&row.config.fields, &self.config.fields), |
| "row was not produced by this RowConverter" |
| ); |
| self.config.validate_utf8 |= row.config.validate_utf8; |
| self.buffer.extend_from_slice(row.data); |
| self.offsets.push(self.buffer.len()) |
| } |
| |
| /// Returns the row at index `row` |
| pub fn row(&self, row: usize) -> Row<'_> { |
| let end = self.offsets[row + 1]; |
| let start = self.offsets[row]; |
| Row { |
| data: &self.buffer[start..end], |
| config: &self.config, |
| } |
| } |
| |
| /// Sets the length of this [`Rows`] to 0 |
| pub fn clear(&mut self) { |
| self.offsets.truncate(1); |
| self.buffer.clear(); |
| } |
| |
| /// Returns the number of [`Row`] in this [`Rows`] |
| pub fn num_rows(&self) -> usize { |
| self.offsets.len() - 1 |
| } |
| |
| /// Returns an iterator over the [`Row`] in this [`Rows`] |
| pub fn iter(&self) -> RowsIter<'_> { |
| self.into_iter() |
| } |
| |
| /// Returns the size of this instance in bytes |
| /// |
| /// Includes the size of `Self`. |
| pub fn size(&self) -> usize { |
| // Size of fields is accounted for as part of RowConverter |
| std::mem::size_of::<Self>() |
| + self.buffer.len() |
| + self.offsets.len() * std::mem::size_of::<usize>() |
| } |
| } |
| |
| impl<'a> IntoIterator for &'a Rows { |
| type Item = Row<'a>; |
| type IntoIter = RowsIter<'a>; |
| |
| fn into_iter(self) -> Self::IntoIter { |
| RowsIter { |
| rows: self, |
| start: 0, |
| end: self.num_rows(), |
| } |
| } |
| } |
| |
| /// An iterator over [`Rows`] |
| #[derive(Debug)] |
| pub struct RowsIter<'a> { |
| rows: &'a Rows, |
| start: usize, |
| end: usize, |
| } |
| |
| impl<'a> Iterator for RowsIter<'a> { |
| type Item = Row<'a>; |
| |
| fn next(&mut self) -> Option<Self::Item> { |
| if self.end == self.start { |
| return None; |
| } |
| let row = self.rows.row(self.start); |
| self.start += 1; |
| Some(row) |
| } |
| |
| fn size_hint(&self) -> (usize, Option<usize>) { |
| let len = self.len(); |
| (len, Some(len)) |
| } |
| } |
| |
| impl<'a> ExactSizeIterator for RowsIter<'a> { |
| fn len(&self) -> usize { |
| self.end - self.start |
| } |
| } |
| |
| impl<'a> DoubleEndedIterator for RowsIter<'a> { |
| fn next_back(&mut self) -> Option<Self::Item> { |
| if self.end == self.start { |
| return None; |
| } |
| let row = self.rows.row(self.end); |
| self.end -= 1; |
| Some(row) |
| } |
| } |
| |
| /// A comparable representation of a row. |
| /// |
| /// See the [module level documentation](self) for more details. |
| /// |
| /// Two [`Row`] can only be compared if they both belong to [`Rows`] |
| /// returned by calls to [`RowConverter::convert_columns`] on the same |
| /// [`RowConverter`]. If different [`RowConverter`]s are used, any |
| /// ordering established by comparing the [`Row`] is arbitrary. |
| #[derive(Debug, Copy, Clone)] |
| pub struct Row<'a> { |
| data: &'a [u8], |
| config: &'a RowConfig, |
| } |
| |
| impl<'a> Row<'a> { |
| /// Create owned version of the row to detach it from the shared [`Rows`]. |
| pub fn owned(&self) -> OwnedRow { |
| OwnedRow { |
| data: self.data.into(), |
| config: self.config.clone(), |
| } |
| } |
| } |
| |
| // Manually derive these as don't wish to include `fields` |
| |
| impl<'a> PartialEq for Row<'a> { |
| #[inline] |
| fn eq(&self, other: &Self) -> bool { |
| self.data.eq(other.data) |
| } |
| } |
| |
| impl<'a> Eq for Row<'a> {} |
| |
| impl<'a> PartialOrd for Row<'a> { |
| #[inline] |
| fn partial_cmp(&self, other: &Self) -> Option<Ordering> { |
| Some(self.cmp(other)) |
| } |
| } |
| |
| impl<'a> Ord for Row<'a> { |
| #[inline] |
| fn cmp(&self, other: &Self) -> Ordering { |
| self.data.cmp(other.data) |
| } |
| } |
| |
| impl<'a> Hash for Row<'a> { |
| #[inline] |
| fn hash<H: Hasher>(&self, state: &mut H) { |
| self.data.hash(state) |
| } |
| } |
| |
| impl<'a> AsRef<[u8]> for Row<'a> { |
| #[inline] |
| fn as_ref(&self) -> &[u8] { |
| self.data |
| } |
| } |
| |
| /// Owned version of a [`Row`] that can be moved/cloned freely. |
| /// |
| /// This contains the data for the one specific row (not the entire buffer of all rows). |
| #[derive(Debug, Clone)] |
| pub struct OwnedRow { |
| data: Box<[u8]>, |
| config: RowConfig, |
| } |
| |
| impl OwnedRow { |
| /// Get borrowed [`Row`] from owned version. |
| /// |
| /// This is helpful if you want to compare an [`OwnedRow`] with a [`Row`]. |
| pub fn row(&self) -> Row<'_> { |
| Row { |
| data: &self.data, |
| config: &self.config, |
| } |
| } |
| } |
| |
| // Manually derive these as don't wish to include `fields`. Also we just want to use the same `Row` implementations here. |
| |
| impl PartialEq for OwnedRow { |
| #[inline] |
| fn eq(&self, other: &Self) -> bool { |
| self.row().eq(&other.row()) |
| } |
| } |
| |
| impl Eq for OwnedRow {} |
| |
| impl PartialOrd for OwnedRow { |
| #[inline] |
| fn partial_cmp(&self, other: &Self) -> Option<Ordering> { |
| Some(self.cmp(other)) |
| } |
| } |
| |
| impl Ord for OwnedRow { |
| #[inline] |
| fn cmp(&self, other: &Self) -> Ordering { |
| self.row().cmp(&other.row()) |
| } |
| } |
| |
| impl Hash for OwnedRow { |
| #[inline] |
| fn hash<H: Hasher>(&self, state: &mut H) { |
| self.row().hash(state) |
| } |
| } |
| |
| impl AsRef<[u8]> for OwnedRow { |
| #[inline] |
| fn as_ref(&self) -> &[u8] { |
| &self.data |
| } |
| } |
| |
| /// Returns the null sentinel, negated if `invert` is true |
| #[inline] |
| fn null_sentinel(options: SortOptions) -> u8 { |
| match options.nulls_first { |
| true => 0, |
| false => 0xFF, |
| } |
| } |
| |
| /// Computes the length of each encoded [`Rows`] and returns an empty [`Rows`] |
| fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> Vec<usize> { |
| use fixed::FixedLengthEncoding; |
| |
| let num_rows = cols.first().map(|x| x.len()).unwrap_or(0); |
| let mut lengths = vec![0; num_rows]; |
| |
| for (array, encoder) in cols.iter().zip(encoders) { |
| match encoder { |
| Encoder::Stateless => { |
| downcast_primitive_array! { |
| array => lengths.iter_mut().for_each(|x| *x += fixed::encoded_len(array)), |
| DataType::Null => {}, |
| DataType::Boolean => lengths.iter_mut().for_each(|x| *x += bool::ENCODED_LEN), |
| DataType::Binary => as_generic_binary_array::<i32>(array) |
| .iter() |
| .zip(lengths.iter_mut()) |
| .for_each(|(slice, length)| *length += variable::encoded_len(slice)), |
| DataType::LargeBinary => as_generic_binary_array::<i64>(array) |
| .iter() |
| .zip(lengths.iter_mut()) |
| .for_each(|(slice, length)| *length += variable::encoded_len(slice)), |
| DataType::BinaryView => array.as_binary_view().iter().zip(lengths.iter_mut()).for_each(|(slice, length)| { |
| *length += variable::encoded_len(slice) |
| }), |
| DataType::Utf8 => array.as_string::<i32>() |
| .iter() |
| .zip(lengths.iter_mut()) |
| .for_each(|(slice, length)| { |
| *length += variable::encoded_len(slice.map(|x| x.as_bytes())) |
| }), |
| DataType::LargeUtf8 => array.as_string::<i64>() |
| .iter() |
| .zip(lengths.iter_mut()) |
| .for_each(|(slice, length)| { |
| *length += variable::encoded_len(slice.map(|x| x.as_bytes())) |
| }), |
| DataType::Utf8View => array.as_string_view().iter().zip(lengths.iter_mut()).for_each(|(slice, length)| { |
| *length += variable::encoded_len(slice.map(|x| x.as_bytes())) |
| }), |
| DataType::FixedSizeBinary(len) => { |
| let len = len.to_usize().unwrap(); |
| lengths.iter_mut().for_each(|x| *x += 1 + len) |
| } |
| _ => unimplemented!("unsupported data type: {}", array.data_type()), |
| } |
| } |
| Encoder::Dictionary(values, null) => { |
| downcast_dictionary_array! { |
| array => { |
| for (v, length) in array.keys().iter().zip(lengths.iter_mut()) { |
| *length += match v { |
| Some(k) => values.row(k.as_usize()).data.len(), |
| None => null.data.len(), |
| } |
| } |
| } |
| _ => unreachable!(), |
| } |
| } |
| Encoder::Struct(rows, null) => { |
| let array = as_struct_array(array); |
| lengths.iter_mut().enumerate().for_each(|(idx, length)| { |
| match array.is_valid(idx) { |
| true => *length += 1 + rows.row(idx).as_ref().len(), |
| false => *length += 1 + null.data.len(), |
| } |
| }); |
| } |
| Encoder::List(rows) => match array.data_type() { |
| DataType::List(_) => { |
| list::compute_lengths(&mut lengths, rows, as_list_array(array)) |
| } |
| DataType::LargeList(_) => { |
| list::compute_lengths(&mut lengths, rows, as_large_list_array(array)) |
| } |
| _ => unreachable!(), |
| }, |
| } |
| } |
| |
| lengths |
| } |
| |
| /// Encodes a column to the provided [`Rows`] incrementing the offsets as it progresses |
| fn encode_column( |
| data: &mut [u8], |
| offsets: &mut [usize], |
| column: &dyn Array, |
| opts: SortOptions, |
| encoder: &Encoder<'_>, |
| ) { |
| match encoder { |
| Encoder::Stateless => { |
| downcast_primitive_array! { |
| column => { |
| if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){ |
| fixed::encode(data, offsets, column.values(), nulls, opts) |
| } else { |
| fixed::encode_not_null(data, offsets, column.values(), opts) |
| } |
| } |
| DataType::Null => {} |
| DataType::Boolean => { |
| if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){ |
| fixed::encode_boolean(data, offsets, column.as_boolean().values(), nulls, opts) |
| } else { |
| fixed::encode_boolean_not_null(data, offsets, column.as_boolean().values(), opts) |
| } |
| } |
| DataType::Binary => { |
| variable::encode(data, offsets, as_generic_binary_array::<i32>(column).iter(), opts) |
| } |
| DataType::BinaryView => { |
| variable::encode(data, offsets, column.as_binary_view().iter(), opts) |
| } |
| DataType::LargeBinary => { |
| variable::encode(data, offsets, as_generic_binary_array::<i64>(column).iter(), opts) |
| } |
| DataType::Utf8 => variable::encode( |
| data, offsets, |
| column.as_string::<i32>().iter().map(|x| x.map(|x| x.as_bytes())), |
| opts, |
| ), |
| DataType::LargeUtf8 => variable::encode( |
| data, offsets, |
| column.as_string::<i64>() |
| .iter() |
| .map(|x| x.map(|x| x.as_bytes())), |
| opts, |
| ), |
| DataType::Utf8View => variable::encode( |
| data, offsets, |
| column.as_string_view().iter().map(|x| x.map(|x| x.as_bytes())), |
| opts, |
| ), |
| DataType::FixedSizeBinary(_) => { |
| let array = column.as_any().downcast_ref().unwrap(); |
| fixed::encode_fixed_size_binary(data, offsets, array, opts) |
| } |
| _ => unimplemented!("unsupported data type: {}", column.data_type()), |
| } |
| } |
| Encoder::Dictionary(values, nulls) => { |
| downcast_dictionary_array! { |
| column => encode_dictionary_values(data, offsets, column, values, nulls), |
| _ => unreachable!() |
| } |
| } |
| Encoder::Struct(rows, null) => { |
| let array = as_struct_array(column); |
| let null_sentinel = null_sentinel(opts); |
| offsets |
| .iter_mut() |
| .skip(1) |
| .enumerate() |
| .for_each(|(idx, offset)| { |
| let (row, sentinel) = match array.is_valid(idx) { |
| true => (rows.row(idx), 0x01), |
| false => (*null, null_sentinel), |
| }; |
| let end_offset = *offset + 1 + row.as_ref().len(); |
| data[*offset] = sentinel; |
| data[*offset + 1..end_offset].copy_from_slice(row.as_ref()); |
| *offset = end_offset; |
| }) |
| } |
| Encoder::List(rows) => match column.data_type() { |
| DataType::List(_) => list::encode(data, offsets, rows, opts, as_list_array(column)), |
| DataType::LargeList(_) => { |
| list::encode(data, offsets, rows, opts, as_large_list_array(column)) |
| } |
| _ => unreachable!(), |
| }, |
| } |
| } |
| |
| /// Encode dictionary values not preserving the dictionary encoding |
| pub fn encode_dictionary_values<K: ArrowDictionaryKeyType>( |
| data: &mut [u8], |
| offsets: &mut [usize], |
| column: &DictionaryArray<K>, |
| values: &Rows, |
| null: &Row<'_>, |
| ) { |
| for (offset, k) in offsets.iter_mut().skip(1).zip(column.keys()) { |
| let row = match k { |
| Some(k) => values.row(k.as_usize()).data, |
| None => null.data, |
| }; |
| let end_offset = *offset + row.len(); |
| data[*offset..end_offset].copy_from_slice(row); |
| *offset = end_offset; |
| } |
| } |
| |
| macro_rules! decode_primitive_helper { |
| ($t:ty, $rows:ident, $data_type:ident, $options:ident) => { |
| Arc::new(decode_primitive::<$t>($rows, $data_type, $options)) |
| }; |
| } |
| |
| /// Decodes a the provided `field` from `rows` |
| /// |
| /// # Safety |
| /// |
| /// Rows must contain valid data for the provided field |
| unsafe fn decode_column( |
| field: &SortField, |
| rows: &mut [&[u8]], |
| codec: &Codec, |
| validate_utf8: bool, |
| ) -> Result<ArrayRef, ArrowError> { |
| let options = field.options; |
| |
| let array: ArrayRef = match codec { |
| Codec::Stateless => { |
| let data_type = field.data_type.clone(); |
| downcast_primitive! { |
| data_type => (decode_primitive_helper, rows, data_type, options), |
| DataType::Null => Arc::new(NullArray::new(rows.len())), |
| DataType::Boolean => Arc::new(decode_bool(rows, options)), |
| DataType::Binary => Arc::new(decode_binary::<i32>(rows, options)), |
| DataType::LargeBinary => Arc::new(decode_binary::<i64>(rows, options)), |
| DataType::BinaryView => Arc::new(decode_binary_view(rows, options)), |
| DataType::FixedSizeBinary(size) => Arc::new(decode_fixed_size_binary(rows, size, options)), |
| DataType::Utf8 => Arc::new(decode_string::<i32>(rows, options, validate_utf8)), |
| DataType::LargeUtf8 => Arc::new(decode_string::<i64>(rows, options, validate_utf8)), |
| DataType::Utf8View => Arc::new(decode_string_view(rows, options, validate_utf8)), |
| _ => return Err(ArrowError::NotYetImplemented(format!("unsupported data type: {}", data_type))) |
| } |
| } |
| Codec::Dictionary(converter, _) => { |
| let cols = converter.convert_raw(rows, validate_utf8)?; |
| cols.into_iter().next().unwrap() |
| } |
| Codec::Struct(converter, _) => { |
| let (null_count, nulls) = fixed::decode_nulls(rows); |
| rows.iter_mut().for_each(|row| *row = &row[1..]); |
| let children = converter.convert_raw(rows, validate_utf8)?; |
| |
| let child_data = children.iter().map(|c| c.to_data()).collect(); |
| let builder = ArrayDataBuilder::new(field.data_type.clone()) |
| .len(rows.len()) |
| .null_count(null_count) |
| .null_bit_buffer(Some(nulls)) |
| .child_data(child_data); |
| |
| Arc::new(StructArray::from(builder.build_unchecked())) |
| } |
| Codec::List(converter) => match &field.data_type { |
| DataType::List(_) => { |
| Arc::new(list::decode::<i32>(converter, rows, field, validate_utf8)?) |
| } |
| DataType::LargeList(_) => { |
| Arc::new(list::decode::<i64>(converter, rows, field, validate_utf8)?) |
| } |
| _ => unreachable!(), |
| }, |
| }; |
| Ok(array) |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use rand::distributions::uniform::SampleUniform; |
| use rand::distributions::{Distribution, Standard}; |
| use rand::{thread_rng, Rng}; |
| |
| use arrow_array::builder::*; |
| use arrow_array::types::*; |
| use arrow_array::*; |
| use arrow_buffer::{i256, NullBuffer}; |
| use arrow_buffer::{Buffer, OffsetBuffer}; |
| use arrow_cast::display::{ArrayFormatter, FormatOptions}; |
| use arrow_ord::sort::{LexicographicalComparator, SortColumn}; |
| |
| use super::*; |
| |
| #[test] |
| fn test_fixed_width() { |
| let cols = [ |
| Arc::new(Int16Array::from_iter([ |
| Some(1), |
| Some(2), |
| None, |
| Some(-5), |
| Some(2), |
| Some(2), |
| Some(0), |
| ])) as ArrayRef, |
| Arc::new(Float32Array::from_iter([ |
| Some(1.3), |
| Some(2.5), |
| None, |
| Some(4.), |
| Some(0.1), |
| Some(-4.), |
| Some(-0.), |
| ])) as ArrayRef, |
| ]; |
| |
| let converter = RowConverter::new(vec![ |
| SortField::new(DataType::Int16), |
| SortField::new(DataType::Float32), |
| ]) |
| .unwrap(); |
| let rows = converter.convert_columns(&cols).unwrap(); |
| |
| assert_eq!(rows.offsets, &[0, 8, 16, 24, 32, 40, 48, 56]); |
| assert_eq!( |
| rows.buffer, |
| &[ |
| 1, 128, 1, // |
| 1, 191, 166, 102, 102, // |
| 1, 128, 2, // |
| 1, 192, 32, 0, 0, // |
| 0, 0, 0, // |
| 0, 0, 0, 0, 0, // |
| 1, 127, 251, // |
| 1, 192, 128, 0, 0, // |
| 1, 128, 2, // |
| 1, 189, 204, 204, 205, // |
| 1, 128, 2, // |
| 1, 63, 127, 255, 255, // |
| 1, 128, 0, // |
| 1, 127, 255, 255, 255 // |
| ] |
| ); |
| |
| assert!(rows.row(3) < rows.row(6)); |
| assert!(rows.row(0) < rows.row(1)); |
| assert!(rows.row(3) < rows.row(0)); |
| assert!(rows.row(4) < rows.row(1)); |
| assert!(rows.row(5) < rows.row(4)); |
| |
| let back = converter.convert_rows(&rows).unwrap(); |
| for (expected, actual) in cols.iter().zip(&back) { |
| assert_eq!(expected, actual); |
| } |
| } |
| |
| #[test] |
| fn test_decimal128() { |
| let converter = RowConverter::new(vec![SortField::new(DataType::Decimal128( |
| DECIMAL128_MAX_PRECISION, |
| 7, |
| ))]) |
| .unwrap(); |
| let col = Arc::new( |
| Decimal128Array::from_iter([ |
| None, |
| Some(i128::MIN), |
| Some(-13), |
| Some(46_i128), |
| Some(5456_i128), |
| Some(i128::MAX), |
| ]) |
| .with_precision_and_scale(38, 7) |
| .unwrap(), |
| ) as ArrayRef; |
| |
| let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap(); |
| for i in 0..rows.num_rows() - 1 { |
| assert!(rows.row(i) < rows.row(i + 1)); |
| } |
| |
| let back = converter.convert_rows(&rows).unwrap(); |
| assert_eq!(back.len(), 1); |
| assert_eq!(col.as_ref(), back[0].as_ref()) |
| } |
| |
| #[test] |
| fn test_decimal256() { |
| let converter = RowConverter::new(vec![SortField::new(DataType::Decimal256( |
| DECIMAL256_MAX_PRECISION, |
| 7, |
| ))]) |
| .unwrap(); |
| let col = Arc::new( |
| Decimal256Array::from_iter([ |
| None, |
| Some(i256::MIN), |
| Some(i256::from_parts(0, -1)), |
| Some(i256::from_parts(u128::MAX, -1)), |
| Some(i256::from_parts(u128::MAX, 0)), |
| Some(i256::from_parts(0, 46_i128)), |
| Some(i256::from_parts(5, 46_i128)), |
| Some(i256::MAX), |
| ]) |
| .with_precision_and_scale(DECIMAL256_MAX_PRECISION, 7) |
| .unwrap(), |
| ) as ArrayRef; |
| |
| let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap(); |
| for i in 0..rows.num_rows() - 1 { |
| assert!(rows.row(i) < rows.row(i + 1)); |
| } |
| |
| let back = converter.convert_rows(&rows).unwrap(); |
| assert_eq!(back.len(), 1); |
| assert_eq!(col.as_ref(), back[0].as_ref()) |
| } |
| |
| #[test] |
| fn test_bool() { |
| let converter = RowConverter::new(vec![SortField::new(DataType::Boolean)]).unwrap(); |
| |
| let col = Arc::new(BooleanArray::from_iter([None, Some(false), Some(true)])) as ArrayRef; |
| |
| let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap(); |
| assert!(rows.row(2) > rows.row(1)); |
| assert!(rows.row(2) > rows.row(0)); |
| assert!(rows.row(1) > rows.row(0)); |
| |
| let cols = converter.convert_rows(&rows).unwrap(); |
| assert_eq!(&cols[0], &col); |
| |
| let converter = RowConverter::new(vec![SortField::new_with_options( |
| DataType::Boolean, |
| SortOptions { |
| descending: true, |
| nulls_first: false, |
| }, |
| )]) |
| .unwrap(); |
| |
| let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap(); |
| assert!(rows.row(2) < rows.row(1)); |
| assert!(rows.row(2) < rows.row(0)); |
| assert!(rows.row(1) < rows.row(0)); |
| let cols = converter.convert_rows(&rows).unwrap(); |
| assert_eq!(&cols[0], &col); |
| } |
| |
| #[test] |
| fn test_timezone() { |
| let a = |
| TimestampNanosecondArray::from(vec![1, 2, 3, 4, 5]).with_timezone("+01:00".to_string()); |
| let d = a.data_type().clone(); |
| |
| let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap(); |
| let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap(); |
| let back = converter.convert_rows(&rows).unwrap(); |
| assert_eq!(back.len(), 1); |
| assert_eq!(back[0].data_type(), &d); |
| |
| // Test dictionary |
| let mut a = PrimitiveDictionaryBuilder::<Int32Type, TimestampNanosecondType>::new(); |
| a.append(34).unwrap(); |
| a.append_null(); |
| a.append(345).unwrap(); |
| |
| // Construct dictionary with a timezone |
| let dict = a.finish(); |
| let values = TimestampNanosecondArray::from(dict.values().to_data()); |
| let dict_with_tz = dict.with_values(Arc::new(values.with_timezone("+02:00"))); |
| let v = DataType::Timestamp(TimeUnit::Nanosecond, Some("+02:00".into())); |
| let d = DataType::Dictionary(Box::new(DataType::Int32), Box::new(v.clone())); |
| |
| assert_eq!(dict_with_tz.data_type(), &d); |
| let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap(); |
| let rows = converter |
| .convert_columns(&[Arc::new(dict_with_tz) as _]) |
| .unwrap(); |
| let back = converter.convert_rows(&rows).unwrap(); |
| assert_eq!(back.len(), 1); |
| assert_eq!(back[0].data_type(), &v); |
| } |
| |
| #[test] |
| fn test_null_encoding() { |
| let col = Arc::new(NullArray::new(10)); |
| let converter = RowConverter::new(vec![SortField::new(DataType::Null)]).unwrap(); |
| let rows = converter.convert_columns(&[col]).unwrap(); |
| assert_eq!(rows.num_rows(), 10); |
| assert_eq!(rows.row(1).data.len(), 0); |
| } |
| |
| #[test] |
| fn test_variable_width() { |
| let col = Arc::new(StringArray::from_iter([ |
| Some("hello"), |
| Some("he"), |
| None, |
| Some("foo"), |
| Some(""), |
| ])) as ArrayRef; |
| |
| let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); |
| let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap(); |
| |
| assert!(rows.row(1) < rows.row(0)); |
| assert!(rows.row(2) < rows.row(4)); |
| assert!(rows.row(3) < rows.row(0)); |
| assert!(rows.row(3) < rows.row(1)); |
| |
| let cols = converter.convert_rows(&rows).unwrap(); |
| assert_eq!(&cols[0], &col); |
| |
| let col = Arc::new(BinaryArray::from_iter([ |
| None, |
| Some(vec![0_u8; 0]), |
| Some(vec![0_u8; 6]), |
| Some(vec![0_u8; variable::MINI_BLOCK_SIZE]), |
| Some(vec![0_u8; variable::MINI_BLOCK_SIZE + 1]), |
| Some(vec![0_u8; variable::BLOCK_SIZE]), |
| Some(vec![0_u8; variable::BLOCK_SIZE + 1]), |
| Some(vec![1_u8; 6]), |
| Some(vec![1_u8; variable::MINI_BLOCK_SIZE]), |
| Some(vec![1_u8; variable::MINI_BLOCK_SIZE + 1]), |
| Some(vec![1_u8; variable::BLOCK_SIZE]), |
| Some(vec![1_u8; variable::BLOCK_SIZE + 1]), |
| Some(vec![0xFF_u8; 6]), |
| Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE]), |
| Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE + 1]), |
| Some(vec![0xFF_u8; variable::BLOCK_SIZE]), |
| Some(vec![0xFF_u8; variable::BLOCK_SIZE + 1]), |
| ])) as ArrayRef; |
| |
| let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap(); |
| let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap(); |
| |
| for i in 0..rows.num_rows() { |
| for j in i + 1..rows.num_rows() { |
| assert!( |
| rows.row(i) < rows.row(j), |
| "{} < {} - {:?} < {:?}", |
| i, |
| j, |
| rows.row(i), |
| rows.row(j) |
| ); |
| } |
| } |
| |
| let cols = converter.convert_rows(&rows).unwrap(); |
| assert_eq!(&cols[0], &col); |
| |
| let converter = RowConverter::new(vec![SortField::new_with_options( |
| DataType::Binary, |
| SortOptions { |
| descending: true, |
| nulls_first: false, |
| }, |
| )]) |
| .unwrap(); |
| let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap(); |
| |
| for i in 0..rows.num_rows() { |
| for j in i + 1..rows.num_rows() { |
| assert!( |
| rows.row(i) > rows.row(j), |
| "{} > {} - {:?} > {:?}", |
| i, |
| j, |
| rows.row(i), |
| rows.row(j) |
| ); |
| } |
| } |
| |
| let cols = converter.convert_rows(&rows).unwrap(); |
| assert_eq!(&cols[0], &col); |
| } |
| |
| /// If `exact` is false performs a logical comparison between a and dictionary-encoded b |
| fn dictionary_eq(a: &dyn Array, b: &dyn Array) { |
| match b.data_type() { |
| DataType::Dictionary(_, v) => { |
| assert_eq!(a.data_type(), v.as_ref()); |
| let b = arrow_cast::cast(b, v).unwrap(); |
| assert_eq!(a, b.as_ref()) |
| } |
| _ => assert_eq!(a, b), |
| } |
| } |
| |
| #[test] |
| fn test_string_dictionary() { |
| let a = Arc::new(DictionaryArray::<Int32Type>::from_iter([ |
| Some("foo"), |
| Some("hello"), |
| Some("he"), |
| None, |
| Some("hello"), |
| Some(""), |
| Some("hello"), |
| Some("hello"), |
| ])) as ArrayRef; |
| |
| let field = SortField::new(a.data_type().clone()); |
| let converter = RowConverter::new(vec![field]).unwrap(); |
| let rows_a = converter.convert_columns(&[Arc::clone(&a)]).unwrap(); |
| |
| assert!(rows_a.row(3) < rows_a.row(5)); |
| assert!(rows_a.row(2) < rows_a.row(1)); |
| assert!(rows_a.row(0) < rows_a.row(1)); |
| assert!(rows_a.row(3) < rows_a.row(0)); |
| |
| assert_eq!(rows_a.row(1), rows_a.row(4)); |
| assert_eq!(rows_a.row(1), rows_a.row(6)); |
| assert_eq!(rows_a.row(1), rows_a.row(7)); |
| |
| let cols = converter.convert_rows(&rows_a).unwrap(); |
| dictionary_eq(&cols[0], &a); |
| |
| let b = Arc::new(DictionaryArray::<Int32Type>::from_iter([ |
| Some("hello"), |
| None, |
| Some("cupcakes"), |
| ])) as ArrayRef; |
| |
| let rows_b = converter.convert_columns(&[Arc::clone(&b)]).unwrap(); |
| assert_eq!(rows_a.row(1), rows_b.row(0)); |
| assert_eq!(rows_a.row(3), rows_b.row(1)); |
| assert!(rows_b.row(2) < rows_a.row(0)); |
| |
| let cols = converter.convert_rows(&rows_b).unwrap(); |
| dictionary_eq(&cols[0], &b); |
| |
| let converter = RowConverter::new(vec![SortField::new_with_options( |
| a.data_type().clone(), |
| SortOptions { |
| descending: true, |
| nulls_first: false, |
| }, |
| )]) |
| .unwrap(); |
| |
| let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap(); |
| assert!(rows_c.row(3) > rows_c.row(5)); |
| assert!(rows_c.row(2) > rows_c.row(1)); |
| assert!(rows_c.row(0) > rows_c.row(1)); |
| assert!(rows_c.row(3) > rows_c.row(0)); |
| |
| let cols = converter.convert_rows(&rows_c).unwrap(); |
| dictionary_eq(&cols[0], &a); |
| |
| let converter = RowConverter::new(vec![SortField::new_with_options( |
| a.data_type().clone(), |
| SortOptions { |
| descending: true, |
| nulls_first: true, |
| }, |
| )]) |
| .unwrap(); |
| |
| let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap(); |
| assert!(rows_c.row(3) < rows_c.row(5)); |
| assert!(rows_c.row(2) > rows_c.row(1)); |
| assert!(rows_c.row(0) > rows_c.row(1)); |
| assert!(rows_c.row(3) < rows_c.row(0)); |
| |
| let cols = converter.convert_rows(&rows_c).unwrap(); |
| dictionary_eq(&cols[0], &a); |
| } |
| |
| #[test] |
| fn test_struct() { |
| // Test basic |
| let a = Arc::new(Int32Array::from(vec![1, 1, 2, 2])) as ArrayRef; |
| let a_f = Arc::new(Field::new("int", DataType::Int32, false)); |
| let u = Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as ArrayRef; |
| let u_f = Arc::new(Field::new("s", DataType::Utf8, false)); |
| let s1 = Arc::new(StructArray::from(vec![(a_f, a), (u_f, u)])) as ArrayRef; |
| |
| let sort_fields = vec![SortField::new(s1.data_type().clone())]; |
| let converter = RowConverter::new(sort_fields).unwrap(); |
| let r1 = converter.convert_columns(&[Arc::clone(&s1)]).unwrap(); |
| |
| for (a, b) in r1.iter().zip(r1.iter().skip(1)) { |
| assert!(a < b); |
| } |
| |
| let back = converter.convert_rows(&r1).unwrap(); |
| assert_eq!(back.len(), 1); |
| assert_eq!(&back[0], &s1); |
| |
| // Test struct nullability |
| let data = s1 |
| .to_data() |
| .into_builder() |
| .null_bit_buffer(Some(Buffer::from_slice_ref([0b00001010]))) |
| .null_count(2) |
| .build() |
| .unwrap(); |
| |
| let s2 = Arc::new(StructArray::from(data)) as ArrayRef; |
| let r2 = converter.convert_columns(&[Arc::clone(&s2)]).unwrap(); |
| assert_eq!(r2.row(0), r2.row(2)); // Nulls equal |
| assert!(r2.row(0) < r2.row(1)); // Nulls first |
| assert_ne!(r1.row(0), r2.row(0)); // Value does not equal null |
| assert_eq!(r1.row(1), r2.row(1)); // Values equal |
| |
| let back = converter.convert_rows(&r2).unwrap(); |
| assert_eq!(back.len(), 1); |
| assert_eq!(&back[0], &s2); |
| |
| back[0].to_data().validate_full().unwrap(); |
| } |
| |
| #[test] |
| fn test_primitive_dictionary() { |
| let mut builder = PrimitiveDictionaryBuilder::<Int32Type, Int32Type>::new(); |
| builder.append(2).unwrap(); |
| builder.append(3).unwrap(); |
| builder.append(0).unwrap(); |
| builder.append_null(); |
| builder.append(5).unwrap(); |
| builder.append(3).unwrap(); |
| builder.append(-1).unwrap(); |
| |
| let a = builder.finish(); |
| let data_type = a.data_type().clone(); |
| let columns = [Arc::new(a) as ArrayRef]; |
| |
| let field = SortField::new(data_type.clone()); |
| let converter = RowConverter::new(vec![field]).unwrap(); |
| let rows = converter.convert_columns(&columns).unwrap(); |
| assert!(rows.row(0) < rows.row(1)); |
| assert!(rows.row(2) < rows.row(0)); |
| assert!(rows.row(3) < rows.row(2)); |
| assert!(rows.row(6) < rows.row(2)); |
| assert!(rows.row(3) < rows.row(6)); |
| } |
| |
| #[test] |
| fn test_dictionary_nulls() { |
| let values = Int32Array::from_iter([Some(1), Some(-1), None, Some(4), None]).into_data(); |
| let keys = |
| Int32Array::from_iter([Some(0), Some(0), Some(1), Some(2), Some(4), None]).into_data(); |
| |
| let data_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int32)); |
| let data = keys |
| .into_builder() |
| .data_type(data_type.clone()) |
| .child_data(vec![values]) |
| .build() |
| .unwrap(); |
| |
| let columns = [Arc::new(DictionaryArray::<Int32Type>::from(data)) as ArrayRef]; |
| let field = SortField::new(data_type.clone()); |
| let converter = RowConverter::new(vec![field]).unwrap(); |
| let rows = converter.convert_columns(&columns).unwrap(); |
| |
| assert_eq!(rows.row(0), rows.row(1)); |
| assert_eq!(rows.row(3), rows.row(4)); |
| assert_eq!(rows.row(4), rows.row(5)); |
| assert!(rows.row(3) < rows.row(0)); |
| } |
| |
| #[test] |
| #[should_panic(expected = "Encountered non UTF-8 data")] |
| fn test_invalid_utf8() { |
| let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap(); |
| let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _; |
| let rows = converter.convert_columns(&[array]).unwrap(); |
| let binary_row = rows.row(0); |
| |
| let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); |
| let parser = converter.parser(); |
| let utf8_row = parser.parse(binary_row.as_ref()); |
| |
| converter.convert_rows(std::iter::once(utf8_row)).unwrap(); |
| } |
| |
| #[test] |
| #[should_panic(expected = "rows were not produced by this RowConverter")] |
| fn test_different_converter() { |
| let values = Arc::new(Int32Array::from_iter([Some(1), Some(-1)])); |
| let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap(); |
| let rows = converter.convert_columns(&[values]).unwrap(); |
| |
| let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap(); |
| let _ = converter.convert_rows(&rows); |
| } |
| |
| fn test_single_list<O: OffsetSizeTrait>() { |
| let mut builder = GenericListBuilder::<O, _>::new(Int32Builder::new()); |
| builder.values().append_value(32); |
| builder.values().append_value(52); |
| builder.values().append_value(32); |
| builder.append(true); |
| builder.values().append_value(32); |
| builder.values().append_value(52); |
| builder.values().append_value(12); |
| builder.append(true); |
| builder.values().append_value(32); |
| builder.values().append_value(52); |
| builder.append(true); |
| builder.values().append_value(32); // MASKED |
| builder.values().append_value(52); // MASKED |
| builder.append(false); |
| builder.values().append_value(32); |
| builder.values().append_null(); |
| builder.append(true); |
| builder.append(true); |
| |
| let list = Arc::new(builder.finish()) as ArrayRef; |
| let d = list.data_type().clone(); |
| |
| let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap(); |
| |
| let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap(); |
| assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12] |
| assert!(rows.row(2) < rows.row(1)); // [32, 42] < [32, 52, 12] |
| assert!(rows.row(3) < rows.row(2)); // null < [32, 42] |
| assert!(rows.row(4) < rows.row(2)); // [32, null] < [32, 42] |
| assert!(rows.row(5) < rows.row(2)); // [] < [32, 42] |
| assert!(rows.row(3) < rows.row(5)); // null < [] |
| |
| let back = converter.convert_rows(&rows).unwrap(); |
| assert_eq!(back.len(), 1); |
| back[0].to_data().validate_full().unwrap(); |
| assert_eq!(&back[0], &list); |
| |
| let options = SortOptions { |
| descending: false, |
| nulls_first: false, |
| }; |
| let field = SortField::new_with_options(d.clone(), options); |
| let converter = RowConverter::new(vec![field]).unwrap(); |
| let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap(); |
| |
| assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12] |
| assert!(rows.row(2) < rows.row(1)); // [32, 42] < [32, 52, 12] |
| assert!(rows.row(3) > rows.row(2)); // null > [32, 42] |
| assert!(rows.row(4) > rows.row(2)); // [32, null] > [32, 42] |
| assert!(rows.row(5) < rows.row(2)); // [] < [32, 42] |
| assert!(rows.row(3) > rows.row(5)); // null > [] |
| |
| let back = converter.convert_rows(&rows).unwrap(); |
| assert_eq!(back.len(), 1); |
| back[0].to_data().validate_full().unwrap(); |
| assert_eq!(&back[0], &list); |
| |
| let options = SortOptions { |
| descending: true, |
| nulls_first: false, |
| }; |
| let field = SortField::new_with_options(d.clone(), options); |
| let converter = RowConverter::new(vec![field]).unwrap(); |
| let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap(); |
| |
| assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12] |
| assert!(rows.row(2) > rows.row(1)); // [32, 42] > [32, 52, 12] |
| assert!(rows.row(3) > rows.row(2)); // null > [32, 42] |
| assert!(rows.row(4) > rows.row(2)); // [32, null] > [32, 42] |
| assert!(rows.row(5) > rows.row(2)); // [] > [32, 42] |
| assert!(rows.row(3) > rows.row(5)); // null > [] |
| |
| let back = converter.convert_rows(&rows).unwrap(); |
| assert_eq!(back.len(), 1); |
| back[0].to_data().validate_full().unwrap(); |
| assert_eq!(&back[0], &list); |
| |
| let options = SortOptions { |
| descending: true, |
| nulls_first: true, |
| }; |
| let field = SortField::new_with_options(d, options); |
| let converter = RowConverter::new(vec![field]).unwrap(); |
| let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap(); |
| |
| assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12] |
| assert!(rows.row(2) > rows.row(1)); // [32, 42] > [32, 52, 12] |
| assert!(rows.row(3) < rows.row(2)); // null < [32, 42] |
| assert!(rows.row(4) < rows.row(2)); // [32, null] < [32, 42] |
| assert!(rows.row(5) > rows.row(2)); // [] > [32, 42] |
| assert!(rows.row(3) < rows.row(5)); // null < [] |
| |
| let back = converter.convert_rows(&rows).unwrap(); |
| assert_eq!(back.len(), 1); |
| back[0].to_data().validate_full().unwrap(); |
| assert_eq!(&back[0], &list); |
| } |
| |
| fn test_nested_list<O: OffsetSizeTrait>() { |
| let mut builder = |
| GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(Int32Builder::new())); |
| |
| builder.values().values().append_value(1); |
| builder.values().values().append_value(2); |
| builder.values().append(true); |
| builder.values().values().append_value(1); |
| builder.values().values().append_null(); |
| builder.values().append(true); |
| builder.append(true); |
| |
| builder.values().values().append_value(1); |
| builder.values().values().append_null(); |
| builder.values().append(true); |
| builder.values().values().append_value(1); |
| builder.values().values().append_null(); |
| builder.values().append(true); |
| builder.append(true); |
| |
| builder.values().values().append_value(1); |
| builder.values().values().append_null(); |
| builder.values().append(true); |
| builder.values().append(false); |
| builder.append(true); |
| builder.append(false); |
| |
| builder.values().values().append_value(1); |
| builder.values().values().append_value(2); |
| builder.values().append(true); |
| builder.append(true); |
| |
| let list = Arc::new(builder.finish()) as ArrayRef; |
| let d = list.data_type().clone(); |
| |
| // [ |
| // [[1, 2], [1, null]], |
| // [[1, null], [1, null]], |
| // [[1, null], null] |
| // null |
| // [[1, 2]] |
| // ] |
| let options = SortOptions { |
| descending: false, |
| nulls_first: true, |
| }; |
| let field = SortField::new_with_options(d.clone(), options); |
| let converter = RowConverter::new(vec![field]).unwrap(); |
| let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap(); |
| |
| assert!(rows.row(0) > rows.row(1)); |
| assert!(rows.row(1) > rows.row(2)); |
| assert!(rows.row(2) > rows.row(3)); |
| assert!(rows.row(4) < rows.row(0)); |
| assert!(rows.row(4) > rows.row(1)); |
| |
| let back = converter.convert_rows(&rows).unwrap(); |
| assert_eq!(back.len(), 1); |
| back[0].to_data().validate_full().unwrap(); |
| assert_eq!(&back[0], &list); |
| |
| let options = SortOptions { |
| descending: true, |
| nulls_first: true, |
| }; |
| let field = SortField::new_with_options(d.clone(), options); |
| let converter = RowConverter::new(vec![field]).unwrap(); |
| let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap(); |
| |
| assert!(rows.row(0) > rows.row(1)); |
| assert!(rows.row(1) > rows.row(2)); |
| assert!(rows.row(2) > rows.row(3)); |
| assert!(rows.row(4) > rows.row(0)); |
| assert!(rows.row(4) > rows.row(1)); |
| |
| let back = converter.convert_rows(&rows).unwrap(); |
| assert_eq!(back.len(), 1); |
| back[0].to_data().validate_full().unwrap(); |
| assert_eq!(&back[0], &list); |
| |
| let options = SortOptions { |
| descending: true, |
| nulls_first: false, |
| }; |
| let field = SortField::new_with_options(d, options); |
| let converter = RowConverter::new(vec![field]).unwrap(); |
| let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap(); |
| |
| assert!(rows.row(0) < rows.row(1)); |
| assert!(rows.row(1) < rows.row(2)); |
| assert!(rows.row(2) < rows.row(3)); |
| assert!(rows.row(4) > rows.row(0)); |
| assert!(rows.row(4) < rows.row(1)); |
| |
| let back = converter.convert_rows(&rows).unwrap(); |
| assert_eq!(back.len(), 1); |
| back[0].to_data().validate_full().unwrap(); |
| assert_eq!(&back[0], &list); |
| } |
| |
| #[test] |
| fn test_list() { |
| test_single_list::<i32>(); |
| test_nested_list::<i32>(); |
| } |
| |
| #[test] |
| fn test_large_list() { |
| test_single_list::<i64>(); |
| test_nested_list::<i64>(); |
| } |
| |
| fn generate_primitive_array<K>(len: usize, valid_percent: f64) -> PrimitiveArray<K> |
| where |
| K: ArrowPrimitiveType, |
| Standard: Distribution<K::Native>, |
| { |
| let mut rng = thread_rng(); |
| (0..len) |
| .map(|_| rng.gen_bool(valid_percent).then(|| rng.gen())) |
| .collect() |
| } |
| |
| fn generate_strings<O: OffsetSizeTrait>( |
| len: usize, |
| valid_percent: f64, |
| ) -> GenericStringArray<O> { |
| let mut rng = thread_rng(); |
| (0..len) |
| .map(|_| { |
| rng.gen_bool(valid_percent).then(|| { |
| let len = rng.gen_range(0..100); |
| let bytes = (0..len).map(|_| rng.gen_range(0..128)).collect(); |
| String::from_utf8(bytes).unwrap() |
| }) |
| }) |
| .collect() |
| } |
| |
| fn generate_string_view(len: usize, valid_percent: f64) -> StringViewArray { |
| let mut rng = thread_rng(); |
| (0..len) |
| .map(|_| { |
| rng.gen_bool(valid_percent).then(|| { |
| let len = rng.gen_range(0..100); |
| let bytes = (0..len).map(|_| rng.gen_range(0..128)).collect(); |
| String::from_utf8(bytes).unwrap() |
| }) |
| }) |
| .collect() |
| } |
| |
| fn generate_byte_view(len: usize, valid_percent: f64) -> BinaryViewArray { |
| let mut rng = thread_rng(); |
| (0..len) |
| .map(|_| { |
| rng.gen_bool(valid_percent).then(|| { |
| let len = rng.gen_range(0..100); |
| let bytes: Vec<_> = (0..len).map(|_| rng.gen_range(0..128)).collect(); |
| bytes |
| }) |
| }) |
| .collect() |
| } |
| |
| fn generate_dictionary<K>( |
| values: ArrayRef, |
| len: usize, |
| valid_percent: f64, |
| ) -> DictionaryArray<K> |
| where |
| K: ArrowDictionaryKeyType, |
| K::Native: SampleUniform, |
| { |
| let mut rng = thread_rng(); |
| let min_key = K::Native::from_usize(0).unwrap(); |
| let max_key = K::Native::from_usize(values.len()).unwrap(); |
| let keys: PrimitiveArray<K> = (0..len) |
| .map(|_| { |
| rng.gen_bool(valid_percent) |
| .then(|| rng.gen_range(min_key..max_key)) |
| }) |
| .collect(); |
| |
| let data_type = |
| DataType::Dictionary(Box::new(K::DATA_TYPE), Box::new(values.data_type().clone())); |
| |
| let data = keys |
| .into_data() |
| .into_builder() |
| .data_type(data_type) |
| .add_child_data(values.to_data()) |
| .build() |
| .unwrap(); |
| |
| DictionaryArray::from(data) |
| } |
| |
| fn generate_fixed_size_binary(len: usize, valid_percent: f64) -> FixedSizeBinaryArray { |
| let mut rng = thread_rng(); |
| let width = rng.gen_range(0..20); |
| let mut builder = FixedSizeBinaryBuilder::new(width); |
| |
| let mut b = vec![0; width as usize]; |
| for _ in 0..len { |
| match rng.gen_bool(valid_percent) { |
| true => { |
| b.iter_mut().for_each(|x| *x = rng.gen()); |
| builder.append_value(&b).unwrap(); |
| } |
| false => builder.append_null(), |
| } |
| } |
| |
| builder.finish() |
| } |
| |
| fn generate_struct(len: usize, valid_percent: f64) -> StructArray { |
| let mut rng = thread_rng(); |
| let nulls = NullBuffer::from_iter((0..len).map(|_| rng.gen_bool(valid_percent))); |
| let a = generate_primitive_array::<Int32Type>(len, valid_percent); |
| let b = generate_strings::<i32>(len, valid_percent); |
| let fields = Fields::from(vec![ |
| Field::new("a", DataType::Int32, true), |
| Field::new("b", DataType::Utf8, true), |
| ]); |
| let values = vec![Arc::new(a) as _, Arc::new(b) as _]; |
| StructArray::new(fields, values, Some(nulls)) |
| } |
| |
| fn generate_list<F>(len: usize, valid_percent: f64, values: F) -> ListArray |
| where |
| F: FnOnce(usize) -> ArrayRef, |
| { |
| let mut rng = thread_rng(); |
| let offsets = OffsetBuffer::<i32>::from_lengths((0..len).map(|_| rng.gen_range(0..10))); |
| let values_len = offsets.last().unwrap().to_usize().unwrap(); |
| let values = values(values_len); |
| let nulls = NullBuffer::from_iter((0..len).map(|_| rng.gen_bool(valid_percent))); |
| let field = Arc::new(Field::new("item", values.data_type().clone(), true)); |
| ListArray::new(field, offsets, values, Some(nulls)) |
| } |
| |
| fn generate_column(len: usize) -> ArrayRef { |
| let mut rng = thread_rng(); |
| match rng.gen_range(0..16) { |
| 0 => Arc::new(generate_primitive_array::<Int32Type>(len, 0.8)), |
| 1 => Arc::new(generate_primitive_array::<UInt32Type>(len, 0.8)), |
| 2 => Arc::new(generate_primitive_array::<Int64Type>(len, 0.8)), |
| 3 => Arc::new(generate_primitive_array::<UInt64Type>(len, 0.8)), |
| 4 => Arc::new(generate_primitive_array::<Float32Type>(len, 0.8)), |
| 5 => Arc::new(generate_primitive_array::<Float64Type>(len, 0.8)), |
| 6 => Arc::new(generate_strings::<i32>(len, 0.8)), |
| 7 => Arc::new(generate_dictionary::<Int64Type>( |
| // Cannot test dictionaries containing null values because of #2687 |
| Arc::new(generate_strings::<i32>(rng.gen_range(1..len), 1.0)), |
| len, |
| 0.8, |
| )), |
| 8 => Arc::new(generate_dictionary::<Int64Type>( |
| // Cannot test dictionaries containing null values because of #2687 |
| Arc::new(generate_primitive_array::<Int64Type>( |
| rng.gen_range(1..len), |
| 1.0, |
| )), |
| len, |
| 0.8, |
| )), |
| 9 => Arc::new(generate_fixed_size_binary(len, 0.8)), |
| 10 => Arc::new(generate_struct(len, 0.8)), |
| 11 => Arc::new(generate_list(len, 0.8, |values_len| { |
| Arc::new(generate_primitive_array::<Int64Type>(values_len, 0.8)) |
| })), |
| 12 => Arc::new(generate_list(len, 0.8, |values_len| { |
| Arc::new(generate_strings::<i32>(values_len, 0.8)) |
| })), |
| 13 => Arc::new(generate_list(len, 0.8, |values_len| { |
| Arc::new(generate_struct(values_len, 0.8)) |
| })), |
| 14 => Arc::new(generate_string_view(len, 0.8)), |
| 15 => Arc::new(generate_byte_view(len, 0.8)), |
| _ => unreachable!(), |
| } |
| } |
| |
| fn print_row(cols: &[SortColumn], row: usize) -> String { |
| let t: Vec<_> = cols |
| .iter() |
| .map(|x| match x.values.is_valid(row) { |
| true => { |
| let opts = FormatOptions::default().with_null("NULL"); |
| let formatter = ArrayFormatter::try_new(x.values.as_ref(), &opts).unwrap(); |
| formatter.value(row).to_string() |
| } |
| false => "NULL".to_string(), |
| }) |
| .collect(); |
| t.join(",") |
| } |
| |
| fn print_col_types(cols: &[SortColumn]) -> String { |
| let t: Vec<_> = cols |
| .iter() |
| .map(|x| x.values.data_type().to_string()) |
| .collect(); |
| t.join(",") |
| } |
| |
| #[test] |
| #[cfg_attr(miri, ignore)] |
| fn fuzz_test() { |
| for _ in 0..100 { |
| let mut rng = thread_rng(); |
| let num_columns = rng.gen_range(1..5); |
| let len = rng.gen_range(5..100); |
| let arrays: Vec<_> = (0..num_columns).map(|_| generate_column(len)).collect(); |
| |
| let options: Vec<_> = (0..num_columns) |
| .map(|_| SortOptions { |
| descending: rng.gen_bool(0.5), |
| nulls_first: rng.gen_bool(0.5), |
| }) |
| .collect(); |
| |
| let sort_columns: Vec<_> = options |
| .iter() |
| .zip(&arrays) |
| .map(|(o, c)| SortColumn { |
| values: Arc::clone(c), |
| options: Some(*o), |
| }) |
| .collect(); |
| |
| let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap(); |
| |
| let columns = options |
| .into_iter() |
| .zip(&arrays) |
| .map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o)) |
| .collect(); |
| |
| let converter = RowConverter::new(columns).unwrap(); |
| let rows = converter.convert_columns(&arrays).unwrap(); |
| |
| for i in 0..len { |
| for j in 0..len { |
| let row_i = rows.row(i); |
| let row_j = rows.row(j); |
| let row_cmp = row_i.cmp(&row_j); |
| let lex_cmp = comparator.compare(i, j); |
| assert_eq!( |
| row_cmp, |
| lex_cmp, |
| "({:?} vs {:?}) vs ({:?} vs {:?}) for types {}", |
| print_row(&sort_columns, i), |
| print_row(&sort_columns, j), |
| row_i, |
| row_j, |
| print_col_types(&sort_columns) |
| ); |
| } |
| } |
| |
| let back = converter.convert_rows(&rows).unwrap(); |
| for (actual, expected) in back.iter().zip(&arrays) { |
| actual.to_data().validate_full().unwrap(); |
| dictionary_eq(actual, expected) |
| } |
| } |
| } |
| |
| #[test] |
| fn test_clear() { |
| let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap(); |
| let mut rows = converter.empty_rows(3, 128); |
| |
| let first = Int32Array::from(vec![None, Some(2), Some(4)]); |
| let second = Int32Array::from(vec![Some(2), None, Some(4)]); |
| let arrays = [Arc::new(first) as ArrayRef, Arc::new(second) as ArrayRef]; |
| |
| for array in arrays.iter() { |
| rows.clear(); |
| converter.append(&mut rows, &[array.clone()]).unwrap(); |
| let back = converter.convert_rows(&rows).unwrap(); |
| assert_eq!(&back[0], array); |
| } |
| |
| let mut rows_expected = converter.empty_rows(3, 128); |
| converter.append(&mut rows_expected, &arrays[1..]).unwrap(); |
| |
| for (i, (actual, expected)) in rows.iter().zip(rows_expected.iter()).enumerate() { |
| assert_eq!( |
| actual, expected, |
| "For row {}: expected {:?}, actual: {:?}", |
| i, expected, actual |
| ); |
| } |
| } |
| |
| #[test] |
| fn test_append_codec_dictionary_binary() { |
| use DataType::*; |
| // Dictionary RowConverter |
| let converter = RowConverter::new(vec![SortField::new(Dictionary( |
| Box::new(Int32), |
| Box::new(Binary), |
| ))]) |
| .unwrap(); |
| let mut rows = converter.empty_rows(4, 128); |
| |
| let keys = Int32Array::from_iter_values([0, 1, 2, 3]); |
| let values = BinaryArray::from(vec![ |
| Some("a".as_bytes()), |
| Some(b"b"), |
| Some(b"c"), |
| Some(b"d"), |
| ]); |
| let dict_array = DictionaryArray::new(keys, Arc::new(values)); |
| |
| rows.clear(); |
| let array = Arc::new(dict_array) as ArrayRef; |
| converter.append(&mut rows, &[array.clone()]).unwrap(); |
| let back = converter.convert_rows(&rows).unwrap(); |
| |
| dictionary_eq(&back[0], &array); |
| } |
| |
| #[test] |
| fn test_list_prefix() { |
| let mut a = ListBuilder::new(Int8Builder::new()); |
| a.append_value([None]); |
| a.append_value([None, None]); |
| let a = a.finish(); |
| |
| let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap(); |
| let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap(); |
| assert_eq!(rows.row(0).cmp(&rows.row(1)), Ordering::Less); |
| } |
| } |