blob: e5dde580e1c10562103b0451852b2f7c6e7796f4 [file] [log] [blame]
// Copyright 2022 The Blaze Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Functionality used both on logical and physical plans
use std::sync::Arc;
use datafusion::arrow::array::*;
use datafusion::arrow::datatypes::{
ArrowDictionaryKeyType, ArrowNativeType, DataType, Int16Type, Int32Type, Int64Type,
Int8Type, TimeUnit,
};
use datafusion::error::{DataFusionError, Result};
#[inline]
fn spark_compatible_murmur3_hash<T: AsRef<[u8]>>(data: T, seed: u32) -> u32 {
#[inline]
fn mix_k1(mut k1: i32) -> i32 {
k1 *= 0xcc9e2d51u32 as i32;
k1 = k1.rotate_left(15);
k1 *= 0x1b873593u32 as i32;
k1
}
#[inline]
fn mix_h1(mut h1: i32, k1: i32) -> i32 {
h1 ^= k1;
h1 = h1.rotate_left(13);
h1 = h1 * 5 + 0xe6546b64u32 as i32;
h1
}
#[inline]
fn fmix(mut h1: i32, len: i32) -> i32 {
h1 ^= len;
h1 ^= (h1 as u32 >> 16) as i32;
h1 *= 0x85ebca6bu32 as i32;
h1 ^= (h1 as u32 >> 13) as i32;
h1 *= 0xc2b2ae35u32 as i32;
h1 ^= (h1 as u32 >> 16) as i32;
h1
}
#[inline]
unsafe fn hash_bytes_by_int(data: &[u8], seed: u32) -> i32 {
// safety: data length must be aligned to 4 bytes
let mut h1 = seed as i32;
for i in (0..data.len()).step_by(4) {
let mut half_word = *(data.as_ptr().add(i) as *const i32);
if cfg!(target_endian = "big") {
half_word = half_word.reverse_bits();
}
h1 = mix_h1(h1, mix_k1(half_word));
}
h1
}
let data = data.as_ref();
let len = data.len();
let len_aligned = len - len % 4;
// safety:
// avoid boundary checking in performance critical codes.
// all operations are garenteed to be safe
unsafe {
let mut h1 = hash_bytes_by_int(
std::slice::from_raw_parts(data.get_unchecked(0), len_aligned),
seed,
);
for i in len_aligned..len {
let half_word = *data.get_unchecked(i) as i8 as i32;
h1 = mix_h1(h1, mix_k1(half_word));
}
fmix(h1, len as i32) as u32
}
}
#[test]
fn test_murmur3() {
let _hashes = ["", "a", "ab", "abc", "abcd", "abcde"]
.into_iter()
.map(|s| spark_compatible_murmur3_hash(s.as_bytes(), 42) as i32)
.collect::<Vec<_>>();
let _expected = vec![
142593372, 1485273170, -97053317, 1322437556, -396302900, 814637928,
];
}
macro_rules! hash_array {
($array_type:ident, $column: ident, $ty: ident, $hashes: ident) => {
let array = $column.as_any().downcast_ref::<$array_type>().unwrap();
if array.null_count() == 0 {
for (i, hash) in $hashes.iter_mut().enumerate() {
*hash = spark_compatible_murmur3_hash(&array.value(i), *hash);
}
} else {
for (i, hash) in $hashes.iter_mut().enumerate() {
if !array.is_null(i) {
*hash = spark_compatible_murmur3_hash(&array.value(i), *hash);
}
}
}
};
}
macro_rules! hash_array_primitive_i32 {
($array_type:ident, $column: ident, $ty: ident, $hashes: ident) => {
let array = $column.as_any().downcast_ref::<$array_type>().unwrap();
let values = array.values();
if array.null_count() == 0 {
for (hash, value) in $hashes.iter_mut().zip(values.iter()) {
*hash =
spark_compatible_murmur3_hash((*value as i32).to_le_bytes(), *hash);
}
} else {
for (i, (hash, value)) in $hashes.iter_mut().zip(values.iter()).enumerate() {
if !array.is_null(i) {
*hash = spark_compatible_murmur3_hash(
(*value as i32).to_le_bytes(),
*hash,
);
}
}
}
};
}
macro_rules! hash_array_primitive_i64 {
($array_type:ident, $column: ident, $ty: ident, $hashes: ident) => {
let array = $column.as_any().downcast_ref::<$array_type>().unwrap();
let values = array.values();
if array.null_count() == 0 {
for (hash, value) in $hashes.iter_mut().zip(values.iter()) {
*hash =
spark_compatible_murmur3_hash((*value as i64).to_le_bytes(), *hash);
}
} else {
for (i, (hash, value)) in $hashes.iter_mut().zip(values.iter()).enumerate() {
if !array.is_null(i) {
*hash = spark_compatible_murmur3_hash(
(*value as i64).to_le_bytes(),
*hash,
);
}
}
}
};
}
macro_rules! hash_array_decimal {
($array_type:ident, $column: ident, $hashes: ident) => {
let array = $column.as_any().downcast_ref::<$array_type>().unwrap();
if array.null_count() == 0 {
for (i, hash) in $hashes.iter_mut().enumerate() {
*hash = spark_compatible_murmur3_hash(
array.value(i).as_i128().to_le_bytes(),
*hash,
);
}
} else {
for (i, hash) in $hashes.iter_mut().enumerate() {
if !array.is_null(i) {
*hash = spark_compatible_murmur3_hash(
array.value(i).as_i128().to_le_bytes(),
*hash,
);
}
}
}
};
}
/// Hash the values in a dictionary array
fn create_hashes_dictionary<K: ArrowDictionaryKeyType>(
array: &ArrayRef,
hashes_buffer: &mut [u32],
) -> Result<()> {
let dict_array = array.as_any().downcast_ref::<DictionaryArray<K>>().unwrap();
// Hash each dictionary value once, and then use that computed
// hash for each key value to avoid a potentially expensive
// redundant hashing for large dictionary elements (e.g. strings)
let dict_values = Arc::clone(dict_array.values());
let mut dict_hashes = vec![0; dict_values.len()];
create_hashes(&[dict_values], &mut dict_hashes)?;
for (hash, key) in hashes_buffer.iter_mut().zip(dict_array.keys().iter()) {
if let Some(key) = key {
let idx = key.to_usize().ok_or_else(|| {
DataFusionError::Internal(format!(
"Can not convert key value {:?} to usize in dictionary of type {:?}",
key,
dict_array.data_type()
))
})?;
*hash = dict_hashes[idx]
} // no update for Null, consistent with other hashes
}
Ok(())
}
/// Creates hash values for every row, based on the values in the
/// columns.
///
/// The number of rows to hash is determined by `hashes_buffer.len()`.
/// `hashes_buffer` should be pre-sized appropriately
pub fn create_hashes<'a>(
arrays: &[ArrayRef],
hashes_buffer: &'a mut Vec<u32>,
) -> Result<&'a mut Vec<u32>> {
for col in arrays {
match col.data_type() {
DataType::Boolean => {
let array = col.as_any().downcast_ref::<BooleanArray>().unwrap();
if array.null_count() == 0 {
for (i, hash) in hashes_buffer.iter_mut().enumerate() {
*hash = spark_compatible_murmur3_hash(
((if array.value(i) { 1 } else { 0 }) as i32).to_le_bytes(),
*hash,
);
}
} else {
for (i, hash) in hashes_buffer.iter_mut().enumerate() {
if !array.is_null(i) {
*hash = spark_compatible_murmur3_hash(
((if array.value(i) { 1 } else { 0 }) as i32)
.to_le_bytes(),
*hash,
);
}
}
}
}
DataType::Int8 => {
hash_array_primitive_i32!(Int8Array, col, i8, hashes_buffer);
}
DataType::Int16 => {
hash_array_primitive_i32!(Int16Array, col, i16, hashes_buffer);
}
DataType::Int32 => {
hash_array_primitive_i32!(Int32Array, col, i32, hashes_buffer);
}
DataType::Int64 => {
hash_array_primitive_i64!(Int64Array, col, i64, hashes_buffer);
}
DataType::Timestamp(TimeUnit::Second, None) => {
hash_array_primitive_i64!(TimestampSecondArray, col, i64, hashes_buffer);
}
DataType::Timestamp(TimeUnit::Millisecond, None) => {
hash_array_primitive_i64!(
TimestampMillisecondArray,
col,
i64,
hashes_buffer
);
}
DataType::Timestamp(TimeUnit::Microsecond, None) => {
hash_array_primitive_i64!(
TimestampMicrosecondArray,
col,
i64,
hashes_buffer
);
}
DataType::Timestamp(TimeUnit::Nanosecond, _) => {
hash_array_primitive_i64!(
TimestampNanosecondArray,
col,
i64,
hashes_buffer
);
}
DataType::Date32 => {
hash_array_primitive_i32!(Date32Array, col, i32, hashes_buffer);
}
DataType::Date64 => {
hash_array_primitive_i64!(Date64Array, col, i64, hashes_buffer);
}
DataType::Utf8 => {
hash_array!(StringArray, col, str, hashes_buffer);
}
DataType::LargeUtf8 => {
hash_array!(LargeStringArray, col, str, hashes_buffer);
}
DataType::Decimal(_, _) => {
hash_array_decimal!(DecimalArray, col, hashes_buffer);
}
DataType::Dictionary(index_type, _) => match **index_type {
DataType::Int8 => {
create_hashes_dictionary::<Int8Type>(col, hashes_buffer)?;
}
DataType::Int16 => {
create_hashes_dictionary::<Int16Type>(col, hashes_buffer)?;
}
DataType::Int32 => {
create_hashes_dictionary::<Int32Type>(col, hashes_buffer)?;
}
DataType::Int64 => {
create_hashes_dictionary::<Int64Type>(col, hashes_buffer)?;
}
_ => {
return Err(DataFusionError::Internal(format!(
"Unsupported dictionary type in hasher hashing: {}",
col.data_type(),
)))
}
},
_ => {
// This is internal because we should have caught this before.
return Err(DataFusionError::Internal(format!(
"Unsupported data type in hasher: {}",
col.data_type()
)));
}
}
}
Ok(hashes_buffer)
}
pub(crate) fn pmod(hash: u32, n: usize) -> usize {
let hash = hash as i32;
let n = n as i32;
let r = hash % n;
let result = if r < 0 { (r + n) % n } else { r };
result as usize
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use datafusion::arrow::array::{
ArrayRef, Int32Array, Int64Array, Int8Array, StringArray,
};
use datafusion::from_slice::FromSlice;
use crate::spark_hash::{create_hashes, pmod};
#[test]
fn test_i8() {
let i = Arc::new(Int8Array::from(vec![
Some(1),
Some(0),
Some(-1),
Some(i8::MAX),
Some(i8::MIN),
])) as ArrayRef;
let mut hashes = vec![42; 5];
create_hashes(&[i], &mut hashes).unwrap();
// generated with Spark Murmur3_x86_32
let expected = vec![0xdea578e3, 0x379fae8f, 0xa0590e3d, 0x43b4d8ed, 0x422a1365];
assert_eq!(hashes, expected);
}
#[test]
fn test_i32() {
let i = Arc::new(Int32Array::from(vec![
Some(1),
Some(0),
Some(-1),
Some(i32::MAX),
Some(i32::MIN),
])) as ArrayRef;
let mut hashes = vec![42; 5];
create_hashes(&[i], &mut hashes).unwrap();
// generated with Spark Murmur3_x86_32
let expected = vec![0xdea578e3, 0x379fae8f, 0xa0590e3d, 0x07fb67e7, 0x2b1f0fc6];
assert_eq!(hashes, expected);
}
#[test]
fn test_i64() {
let i = Arc::new(Int64Array::from(vec![
Some(1),
Some(0),
Some(-1),
Some(i64::MAX),
Some(i64::MIN),
])) as ArrayRef;
let mut hashes = vec![42; 5];
create_hashes(&[i], &mut hashes).unwrap();
// generated with Spark Murmur3_x86_32
let expected = vec![0x99f0149d, 0x9c67b85d, 0xc8008529, 0xa05b5d7b, 0xcd1e64fb];
assert_eq!(hashes, expected);
}
#[test]
fn test_str() {
let i = Arc::new(StringArray::from_slice(&["hello", "bar", "", "😁", "天地"]));
let mut hashes = vec![42; 5];
create_hashes(&[i], &mut hashes).unwrap();
// generated with Murmur3Hash(Seq(Literal("")), 42).eval() since Spark is tested against this as well
let expected = vec![3286402344, 2486176763, 142593372, 885025535, 2395000894];
assert_eq!(hashes, expected);
}
#[test]
fn test_pmod() {
let i: Vec<u32> =
vec![0x99f0149d, 0x9c67b85d, 0xc8008529, 0xa05b5d7b, 0xcd1e64fb];
let result = i.into_iter().map(|i| pmod(i, 200)).collect::<Vec<usize>>();
// expected partition from Spark with n=200
let expected = vec![69, 5, 193, 171, 115];
assert_eq!(result, expected);
}
}