| // Copyright 2022 The Blaze Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| //! Functionality used both on logical and physical plans |
| |
| use std::sync::Arc; |
| |
| use arrow::{ |
| array::*, |
| datatypes::{ |
| ArrowDictionaryKeyType, ArrowNativeType, DataType, Int16Type, Int32Type, Int64Type, |
| Int8Type, TimeUnit, |
| }, |
| }; |
| use datafusion::error::Result; |
| |
| use crate::{ |
| df_execution_err, |
| hash::{mur::spark_compatible_murmur3_hash, xxhash::spark_compatible_xxhash64_hash}, |
| }; |
| |
| macro_rules! hash_array { |
| ($array_type:ident, $column:ident, $hashes:ident, $h:expr) => { |
| let array = $column.as_any().downcast_ref::<$array_type>().unwrap(); |
| if array.null_count() == 0 { |
| for (i, hash) in $hashes.iter_mut().enumerate() { |
| *hash = $h(&array.value(i).as_ref(), *hash); |
| } |
| } else { |
| for (i, hash) in $hashes.iter_mut().enumerate() { |
| if !array.is_null(i) { |
| *hash = $h(&array.value(i).as_ref(), *hash); |
| } |
| } |
| } |
| }; |
| } |
| |
| macro_rules! hash_array_primitive { |
| ($array_type:ident, $column:ident, $ty:ident, $hashes:ident, $h:expr) => { |
| let array = $column.as_any().downcast_ref::<$array_type>().unwrap(); |
| let values = array.values(); |
| |
| if array.null_count() == 0 { |
| for (hash, value) in $hashes.iter_mut().zip(values.iter()) { |
| *hash = $h((*value as $ty).to_le_bytes().as_ref(), *hash); |
| } |
| } else { |
| for (i, (hash, value)) in $hashes.iter_mut().zip(values.iter()).enumerate() { |
| if !array.is_null(i) { |
| *hash = $h((*value as $ty).to_le_bytes().as_ref(), *hash); |
| } |
| } |
| } |
| }; |
| } |
| |
| macro_rules! hash_array_decimal { |
| ($array_type:ident, $column:ident, $hashes:ident, $h:expr) => { |
| let array = $column.as_any().downcast_ref::<$array_type>().unwrap(); |
| |
| if array.null_count() == 0 { |
| for (i, hash) in $hashes.iter_mut().enumerate() { |
| *hash = $h(array.value(i).to_le_bytes().as_ref(), *hash); |
| } |
| } else { |
| for (i, hash) in $hashes.iter_mut().enumerate() { |
| if !array.is_null(i) { |
| *hash = $h(array.value(i).to_le_bytes().as_ref(), *hash); |
| } |
| } |
| } |
| }; |
| } |
| |
| /// Hash the values in a dictionary array |
| fn create_hashes_dictionary<K: ArrowDictionaryKeyType, T: num::PrimInt>( |
| array: &ArrayRef, |
| hashes_buffer: &mut [T], |
| h: impl Fn(&[u8], T) -> T + Copy, |
| ) -> Result<()> { |
| let dict_array = array.as_any().downcast_ref::<DictionaryArray<K>>().unwrap(); |
| |
| // Hash each dictionary value once, and then use that computed |
| // hash for each key value to avoid a potentially expensive |
| // redundant hashing for large dictionary elements (e.g. strings) |
| let dict_values = Arc::clone(dict_array.values()); |
| let mut dict_hashes = vec![T::zero(); dict_values.len()]; |
| create_hashes(&[dict_values], &mut dict_hashes, h)?; |
| |
| for (hash, key) in hashes_buffer.iter_mut().zip(dict_array.keys().iter()) { |
| if let Some(key) = key { |
| if let Some(idx) = key.to_usize() { |
| *hash = dict_hashes[idx]; |
| } else { |
| let dt = dict_array.data_type(); |
| df_execution_err!( |
| "Can not convert key value {key:?} to usize in dictionary of type {dt:?}" |
| )?; |
| } |
| } // no update for Null, consistent with other hashes |
| } |
| Ok(()) |
| } |
| |
| pub fn create_murmur3_hashes(arrays: &[ArrayRef], hashes_buffer: &mut [i32]) -> Result<()> { |
| create_hashes(arrays, hashes_buffer, |data: &[u8], seed: i32| { |
| spark_compatible_murmur3_hash(data, seed) |
| }) |
| } |
| |
| pub fn create_xxhash64_hashes(arrays: &[ArrayRef], hashes_buffer: &mut [i64]) -> Result<()> { |
| create_hashes(arrays, hashes_buffer, |data: &[u8], seed: i64| { |
| spark_compatible_xxhash64_hash(data, seed) |
| }) |
| } |
| |
| /// Creates hash values for every row, based on the values in the |
| /// columns. |
| /// |
| /// The number of rows to hash is determined by `hashes_buffer.len()`. |
| /// `hashes_buffer` should be pre-sized appropriately |
| pub fn create_hashes<T: num::PrimInt>( |
| arrays: &[ArrayRef], |
| hashes_buffer: &mut [T], |
| h: impl Fn(&[u8], T) -> T + Copy, |
| ) -> Result<()> { |
| for col in arrays { |
| hash_array(col, hashes_buffer, h)?; |
| } |
| Ok(()) |
| } |
| |
| fn hash_array<T: num::PrimInt>( |
| array: &ArrayRef, |
| hashes_buffer: &mut [T], |
| h: impl Fn(&[u8], T) -> T + Copy, |
| ) -> Result<()> { |
| match array.data_type() { |
| DataType::Null => {} |
| DataType::Boolean => { |
| let array = array.as_any().downcast_ref::<BooleanArray>().unwrap(); |
| if array.null_count() == 0 { |
| for (i, hash) in hashes_buffer.iter_mut().enumerate() { |
| *hash = h( |
| (if array.value(i) { 1u32 } else { 0u32 }) |
| .to_le_bytes() |
| .as_ref(), |
| *hash, |
| ); |
| } |
| } else { |
| for (i, hash) in hashes_buffer.iter_mut().enumerate() { |
| if !array.is_null(i) { |
| *hash = h( |
| (if array.value(i) { 1u32 } else { 0u32 }) |
| .to_le_bytes() |
| .as_ref(), |
| *hash, |
| ); |
| } |
| } |
| } |
| } |
| DataType::Int8 => { |
| hash_array_primitive!(Int8Array, array, i32, hashes_buffer, h); |
| } |
| DataType::Int16 => { |
| hash_array_primitive!(Int16Array, array, i32, hashes_buffer, h); |
| } |
| DataType::Int32 => { |
| hash_array_primitive!(Int32Array, array, i32, hashes_buffer, h); |
| } |
| DataType::Int64 => { |
| hash_array_primitive!(Int64Array, array, i64, hashes_buffer, h); |
| } |
| DataType::Float32 => { |
| hash_array_primitive!(Float32Array, array, f32, hashes_buffer, h); |
| } |
| DataType::Float64 => { |
| hash_array_primitive!(Float64Array, array, f64, hashes_buffer, h); |
| } |
| DataType::Timestamp(TimeUnit::Second, _) => { |
| hash_array_primitive!(TimestampSecondArray, array, i64, hashes_buffer, h); |
| } |
| DataType::Timestamp(TimeUnit::Millisecond, _) => { |
| hash_array_primitive!(TimestampMillisecondArray, array, i64, hashes_buffer, h); |
| } |
| DataType::Timestamp(TimeUnit::Microsecond, _) => { |
| hash_array_primitive!(TimestampMicrosecondArray, array, i64, hashes_buffer, h); |
| } |
| DataType::Timestamp(TimeUnit::Nanosecond, _) => { |
| hash_array_primitive!(TimestampNanosecondArray, array, i64, hashes_buffer, h); |
| } |
| DataType::Date32 => { |
| hash_array_primitive!(Date32Array, array, i32, hashes_buffer, h); |
| } |
| DataType::Date64 => { |
| hash_array_primitive!(Date64Array, array, i64, hashes_buffer, h); |
| } |
| DataType::Binary => { |
| hash_array!(BinaryArray, array, hashes_buffer, h); |
| } |
| DataType::LargeBinary => { |
| hash_array!(LargeBinaryArray, array, hashes_buffer, h); |
| } |
| DataType::Utf8 => { |
| hash_array!(StringArray, array, hashes_buffer, h); |
| } |
| DataType::LargeUtf8 => { |
| hash_array!(LargeStringArray, array, hashes_buffer, h); |
| } |
| DataType::Decimal128(..) => { |
| hash_array_decimal!(Decimal128Array, array, hashes_buffer, h); |
| } |
| DataType::Dictionary(index_type, _) => match &**index_type { |
| DataType::Int8 => create_hashes_dictionary::<Int8Type, _>(array, hashes_buffer, h)?, |
| DataType::Int16 => create_hashes_dictionary::<Int16Type, _>(array, hashes_buffer, h)?, |
| DataType::Int32 => create_hashes_dictionary::<Int32Type, _>(array, hashes_buffer, h)?, |
| DataType::Int64 => create_hashes_dictionary::<Int64Type, _>(array, hashes_buffer, h)?, |
| other => df_execution_err!("Unsupported dictionary type in hasher hashing: {other}")?, |
| }, |
| _ => { |
| for idx in 0..array.len() { |
| hash_one(array, idx, &mut hashes_buffer[idx], h)?; |
| } |
| } |
| } |
| Ok(()) |
| } |
| |
| macro_rules! hash_one_primitive { |
| ($array_type:ident, $column:ident, $ty:ident, $hash:ident, $idx:ident, $h:expr) => { |
| let array = $column.as_any().downcast_ref::<$array_type>().unwrap(); |
| *$hash = $h( |
| (array.value($idx as usize) as $ty).to_le_bytes().as_ref(), |
| *$hash, |
| ); |
| }; |
| } |
| |
| macro_rules! hash_one_binary { |
| ($array_type:ident, $column:ident, $hash:ident, $idx:ident, $h:expr) => { |
| let array = $column.as_any().downcast_ref::<$array_type>().unwrap(); |
| *$hash = $h(&array.value($idx as usize).as_ref(), *$hash); |
| }; |
| } |
| |
| macro_rules! hash_one_decimal { |
| ($array_type:ident, $column:ident, $hash:ident, $idx:ident, $h:expr) => { |
| let array = $column.as_any().downcast_ref::<$array_type>().unwrap(); |
| *$hash = $h(array.value($idx as usize).to_le_bytes().as_ref(), *$hash); |
| }; |
| } |
| |
| fn hash_one<T: num::PrimInt>( |
| col: &ArrayRef, |
| idx: usize, |
| hash: &mut T, |
| h: impl Fn(&[u8], T) -> T + Copy, |
| ) -> Result<()> { |
| if col.is_valid(idx) { |
| match col.data_type() { |
| DataType::Null => {} |
| DataType::Boolean => { |
| let array = col.as_any().downcast_ref::<BooleanArray>().unwrap(); |
| *hash = h( |
| (if array.value(idx) { 1u32 } else { 0u32 }) |
| .to_le_bytes() |
| .as_ref(), |
| *hash, |
| ); |
| } |
| DataType::Int8 => { |
| hash_one_primitive!(Int8Array, col, i32, hash, idx, h); |
| } |
| DataType::Int16 => { |
| hash_one_primitive!(Int16Array, col, i32, hash, idx, h); |
| } |
| DataType::Int32 => { |
| hash_one_primitive!(Int32Array, col, i32, hash, idx, h); |
| } |
| DataType::Int64 => { |
| hash_one_primitive!(Int64Array, col, i64, hash, idx, h); |
| } |
| DataType::Float32 => { |
| hash_one_primitive!(Float32Array, col, f32, hash, idx, h); |
| } |
| DataType::Float64 => { |
| hash_one_primitive!(Float64Array, col, f64, hash, idx, h); |
| } |
| DataType::Timestamp(TimeUnit::Second, None) => { |
| hash_one_primitive!(TimestampSecondArray, col, i64, hash, idx, h); |
| } |
| DataType::Timestamp(TimeUnit::Millisecond, None) => { |
| hash_one_primitive!(TimestampMillisecondArray, col, i64, hash, idx, h); |
| } |
| DataType::Timestamp(TimeUnit::Microsecond, None) => { |
| hash_one_primitive!(TimestampMicrosecondArray, col, i64, hash, idx, h); |
| } |
| DataType::Timestamp(TimeUnit::Nanosecond, _) => { |
| hash_one_primitive!(TimestampNanosecondArray, col, i64, hash, idx, h); |
| } |
| DataType::Date32 => { |
| hash_one_primitive!(Date32Array, col, i32, hash, idx, h); |
| } |
| DataType::Date64 => { |
| hash_one_primitive!(Date64Array, col, i64, hash, idx, h); |
| } |
| DataType::Binary => { |
| hash_one_binary!(BinaryArray, col, hash, idx, h); |
| } |
| DataType::LargeBinary => { |
| hash_one_binary!(LargeBinaryArray, col, hash, idx, h); |
| } |
| DataType::Utf8 => { |
| hash_one_binary!(StringArray, col, hash, idx, h); |
| } |
| DataType::LargeUtf8 => { |
| hash_one_binary!(LargeStringArray, col, hash, idx, h); |
| } |
| DataType::Decimal128(..) => { |
| hash_one_decimal!(Decimal128Array, col, hash, idx, h); |
| } |
| DataType::List(..) => { |
| let list_array = col.as_any().downcast_ref::<ListArray>().unwrap(); |
| let value_array = list_array.value(idx); |
| for i in 0..value_array.len() { |
| hash_one(&value_array, i, hash, h)?; |
| } |
| } |
| DataType::Map(..) => { |
| let map_array = col.as_any().downcast_ref::<MapArray>().unwrap(); |
| let kv_array = map_array.value(idx); |
| let key_array = kv_array.column(0); |
| let value_array = kv_array.column(1); |
| for i in 0..kv_array.len() { |
| hash_one(key_array, i, hash, h)?; |
| hash_one(value_array, i, hash, h)?; |
| } |
| } |
| DataType::Struct(_) => { |
| let struct_array = col.as_any().downcast_ref::<StructArray>().unwrap(); |
| for col in struct_array.columns() { |
| hash_one(col, idx, hash, h)?; |
| } |
| } |
| other => df_execution_err!("Unsupported data type in hasher: {other}")?, |
| } |
| } |
| Ok(()) |
| } |
| |
| pub fn pmod(hash: i32, n: usize) -> usize { |
| let n = n as i32; |
| let r = hash % n; |
| let result = if r < 0 { (r + n) % n } else { r }; |
| result as usize |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use std::sync::Arc; |
| |
| use arrow::{ |
| array::{ |
| make_array, Array, ArrayData, ArrayRef, Int32Array, Int64Array, Int8Array, MapArray, |
| StringArray, StructArray, UInt32Array, |
| }, |
| buffer::Buffer, |
| datatypes::{DataType, Field, ToByteSlice}, |
| }; |
| |
| use super::*; |
| |
| #[test] |
| fn test_list() { |
| let mut hashes_buffer = vec![42; 4]; |
| for hash in hashes_buffer.iter_mut() { |
| *hash = spark_compatible_murmur3_hash(5_i32.to_le_bytes(), *hash); |
| } |
| } |
| |
| #[test] |
| fn test_i8() { |
| let i = Arc::new(Int8Array::from(vec![ |
| Some(1), |
| Some(0), |
| Some(-1), |
| Some(i8::MAX), |
| Some(i8::MIN), |
| ])) as ArrayRef; |
| let mut hashes = vec![42; 5]; |
| create_murmur3_hashes(&[i], &mut hashes).unwrap(); |
| |
| // generated with Spark Murmur3_x86_32 |
| let expected: Vec<i32> = [ |
| 0xdea578e3_u32, |
| 0x379fae8f, |
| 0xa0590e3d, |
| 0x43b4d8ed, |
| 0x422a1365, |
| ] |
| .into_iter() |
| .map(|v| v as i32) |
| .collect(); |
| assert_eq!(hashes, expected); |
| } |
| |
| #[test] |
| fn test_i32() { |
| let i = Arc::new(Int32Array::from(vec![Some(1)])) as ArrayRef; |
| let mut hashes = vec![42; 1]; |
| create_murmur3_hashes(&[i], &mut hashes).unwrap(); |
| |
| let j = Arc::new(Int32Array::from(vec![Some(2)])) as ArrayRef; |
| create_murmur3_hashes(&[j], &mut hashes).unwrap(); |
| |
| let m = Arc::new(Int32Array::from(vec![Some(3)])) as ArrayRef; |
| create_murmur3_hashes(&[m], &mut hashes).unwrap(); |
| |
| let n = Arc::new(Int32Array::from(vec![Some(4)])) as ArrayRef; |
| create_murmur3_hashes(&[n], &mut hashes).unwrap(); |
| } |
| |
| #[test] |
| fn test_i64() { |
| let i = Arc::new(Int64Array::from(vec![ |
| Some(1), |
| Some(0), |
| Some(-1), |
| Some(i64::MAX), |
| Some(i64::MIN), |
| ])) as ArrayRef; |
| |
| // generated with Murmur3Hash(Seq(Literal(1L)), 42).eval() since Spark is tested |
| let mut hashes = vec![42; 5]; |
| create_murmur3_hashes(&[i.clone()], &mut hashes).unwrap(); |
| let expected: Vec<i32> = [ |
| 0x99f0149d_u32, |
| 0x9c67b85d, |
| 0xc8008529, |
| 0xa05b5d7b, |
| 0xcd1e64fb, |
| ] |
| .into_iter() |
| .map(|v| v as i32) |
| .collect(); |
| assert_eq!(hashes, expected); |
| |
| // generated with XxHash64(Seq(Literal(1L)), 42).eval() since Spark is tested |
| // against this as well |
| let mut hashes = vec![42; 5]; |
| create_xxhash64_hashes(&[i.clone()], &mut hashes).unwrap(); |
| let expected = vec![ |
| -7001672635703045582, |
| -5252525462095825812, |
| 3858142552250413010, |
| -3246596055638297850, |
| -8619748838626508300, |
| ]; |
| assert_eq!(hashes, expected); |
| } |
| |
| #[test] |
| fn test_str() { |
| let i = Arc::new(StringArray::from(vec!["hello", "bar", "", "😁", "天地"])); |
| |
| // generated with Murmur3Hash(Seq(Literal("")), 42).eval() since Spark is tested |
| // against this as well |
| let mut hashes = vec![42; 5]; |
| create_murmur3_hashes(&[i.clone()], &mut hashes).unwrap(); |
| let expected: Vec<i32> = [3286402344_u32, 2486176763, 142593372, 885025535, 2395000894] |
| .into_iter() |
| .map(|v| v as i32) |
| .collect(); |
| assert_eq!(hashes, expected); |
| |
| // generated with XxHash64(Seq(Literal("")), 42).eval() since Spark is tested |
| // against this as well |
| let mut hashes = vec![42; 5]; |
| create_xxhash64_hashes(&[i.clone()], &mut hashes).unwrap(); |
| let expected = vec![ |
| -4367754540140381902, |
| -1798770879548125814, |
| -7444071767201028348, |
| -6337236088984028203, |
| -235771157374669727, |
| ]; |
| assert_eq!(hashes, expected); |
| } |
| |
| #[test] |
| fn test_pmod() { |
| let i: Vec<i32> = [ |
| 0x99f0149d_u32, |
| 0x9c67b85d, |
| 0xc8008529, |
| 0xa05b5d7b, |
| 0xcd1e64fb, |
| ] |
| .into_iter() |
| .map(|v| v as i32) |
| .collect(); |
| let result = i.into_iter().map(|i| pmod(i, 200)).collect::<Vec<usize>>(); |
| |
| // expected partition from Spark with n=200 |
| let expected = vec![69, 5, 193, 171, 115]; |
| assert_eq!(result, expected); |
| } |
| |
| #[test] |
| fn test_map_array() { |
| // Construct key and values |
| let key_data = ArrayData::builder(DataType::Int32) |
| .len(8) |
| .add_buffer(Buffer::from(&[0, 1, 2, 3, 4, 5, 6, 7].to_byte_slice())) |
| .build() |
| .unwrap(); |
| let value_data = ArrayData::builder(DataType::UInt32) |
| .len(8) |
| .add_buffer(Buffer::from( |
| &[0u32, 10, 20, 0, 40, 0, 60, 70].to_byte_slice(), |
| )) |
| .null_bit_buffer(Some(Buffer::from(&[0b11010110]))) |
| .build() |
| .unwrap(); |
| |
| // Construct a buffer for value offsets, for the nested array: |
| // [[0, 1, 2], [3, 4, 5], [6, 7]] |
| let entry_offsets = Buffer::from(&[0, 3, 6, 8].to_byte_slice()); |
| |
| let keys_field = Arc::new(Field::new("keys", DataType::Int32, false)); |
| let values_field = Arc::new(Field::new("values", DataType::UInt32, true)); |
| let entry_struct = StructArray::from(vec![ |
| (keys_field.clone(), make_array(key_data)), |
| (values_field.clone(), make_array(value_data.clone())), |
| ]); |
| |
| // Construct a map array from the above two |
| let map_data_type = DataType::Map( |
| Arc::new(Field::new( |
| "entries", |
| entry_struct.data_type().clone(), |
| true, |
| )), |
| false, |
| ); |
| let map_data = ArrayData::builder(map_data_type) |
| .len(3) |
| .add_buffer(entry_offsets) |
| .add_child_data(entry_struct.into_data()) |
| .build() |
| .unwrap(); |
| let map_array = MapArray::from(map_data); |
| |
| assert_eq!(&value_data, &map_array.values().to_data()); |
| assert_eq!(&DataType::UInt32, map_array.value_type()); |
| assert_eq!(3, map_array.len()); |
| assert_eq!(0, map_array.null_count()); |
| assert_eq!(6, map_array.value_offsets()[2]); |
| assert_eq!(2, map_array.value_length(2)); |
| |
| let key_array = Arc::new(Int32Array::from(vec![0, 1, 2])) as ArrayRef; |
| let value_array = |
| Arc::new(UInt32Array::from(vec![None, Some(10u32), Some(20)])) as ArrayRef; |
| let struct_array = StructArray::from(vec![ |
| (keys_field.clone(), key_array), |
| (values_field.clone(), value_array), |
| ]); |
| assert_eq!( |
| struct_array, |
| StructArray::from(map_array.value(0).into_data()) |
| ); |
| assert_eq!( |
| &struct_array, |
| unsafe { map_array.value_unchecked(0) } |
| .as_any() |
| .downcast_ref::<StructArray>() |
| .unwrap() |
| ); |
| for i in 0..3 { |
| assert!(map_array.is_valid(i)); |
| assert!(!map_array.is_null(i)); |
| } |
| |
| // Now test with a non-zero offset |
| let map_data = ArrayData::builder(map_array.data_type().clone()) |
| .len(2) |
| .offset(1) |
| .add_buffer(map_array.to_data().buffers()[0].clone()) |
| .add_child_data(map_array.to_data().child_data()[0].clone()) |
| .build() |
| .unwrap(); |
| let map_array = MapArray::from(map_data); |
| |
| assert_eq!(&value_data, &map_array.values().to_data()); |
| assert_eq!(&DataType::UInt32, map_array.value_type()); |
| assert_eq!(2, map_array.len()); |
| assert_eq!(0, map_array.null_count()); |
| assert_eq!(6, map_array.value_offsets()[1]); |
| assert_eq!(2, map_array.value_length(1)); |
| |
| let key_array = Arc::new(Int32Array::from(vec![3, 4, 5])) as ArrayRef; |
| let value_array = Arc::new(UInt32Array::from(vec![None, Some(40), None])) as ArrayRef; |
| let struct_array = |
| StructArray::from(vec![(keys_field, key_array), (values_field, value_array)]); |
| assert_eq!( |
| &struct_array, |
| map_array |
| .value(0) |
| .as_any() |
| .downcast_ref::<StructArray>() |
| .unwrap() |
| ); |
| assert_eq!( |
| &struct_array, |
| unsafe { map_array.value_unchecked(0) } |
| .as_any() |
| .downcast_ref::<StructArray>() |
| .unwrap() |
| ); |
| } |
| } |