blob: e96f178d83130fee4a134ef6f514b1839e3685b6 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use arrow::compute::take;
use twox_hash::XxHash64;
use datafusion::{
arrow::{
array::*,
datatypes::{ArrowDictionaryKeyType, ArrowNativeType},
},
common::{internal_err, ScalarValue},
error::{DataFusionError, Result},
};
use crate::create_hashes_internal;
use arrow_array::{Array, ArrayRef, Int64Array};
use datafusion_expr::ColumnarValue;
use std::sync::Arc;
/// Spark compatible xxhash64 in vectorized execution fashion
pub fn spark_xxhash64(args: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
let length = args.len();
let seed = &args[length - 1];
match seed {
ColumnarValue::Scalar(ScalarValue::Int64(Some(seed))) => {
// iterate over the arguments to find out the length of the array
let num_rows = args[0..args.len() - 1]
.iter()
.find_map(|arg| match arg {
ColumnarValue::Array(array) => Some(array.len()),
ColumnarValue::Scalar(_) => None,
})
.unwrap_or(1);
let mut hashes: Vec<u64> = vec![0_u64; num_rows];
hashes.fill(*seed as u64);
let arrays = args[0..args.len() - 1]
.iter()
.map(|arg| match arg {
ColumnarValue::Array(array) => Arc::clone(array),
ColumnarValue::Scalar(scalar) => {
scalar.clone().to_array_of_size(num_rows).unwrap()
}
})
.collect::<Vec<ArrayRef>>();
create_xxhash64_hashes(&arrays, &mut hashes)?;
if num_rows == 1 {
Ok(ColumnarValue::Scalar(ScalarValue::Int64(Some(
hashes[0] as i64,
))))
} else {
let hashes: Vec<i64> = hashes.into_iter().map(|x| x as i64).collect();
Ok(ColumnarValue::Array(Arc::new(Int64Array::from(hashes))))
}
}
_ => {
internal_err!(
"The seed of function xxhash64 must be an Int64 scalar value, but got: {:?}.",
seed
)
}
}
}
#[inline]
fn spark_compatible_xxhash64<T: AsRef<[u8]>>(data: T, seed: u64) -> u64 {
XxHash64::oneshot(seed, data.as_ref())
}
// Hash the values in a dictionary array using xxhash64
fn create_xxhash64_hashes_dictionary<K: ArrowDictionaryKeyType>(
array: &ArrayRef,
hashes_buffer: &mut [u64],
first_col: bool,
) -> Result<()> {
let dict_array = array.as_any().downcast_ref::<DictionaryArray<K>>().unwrap();
if !first_col {
let unpacked = take(dict_array.values().as_ref(), dict_array.keys(), None)?;
create_xxhash64_hashes(&[unpacked], hashes_buffer)?;
} else {
// Hash each dictionary value once, and then use that computed
// hash for each key value to avoid a potentially expensive
// redundant hashing for large dictionary elements (e.g. strings)
let dict_values = Arc::clone(dict_array.values());
// same initial seed as Spark
let mut dict_hashes = vec![42u64; dict_values.len()];
create_xxhash64_hashes(&[dict_values], &mut dict_hashes)?;
for (hash, key) in hashes_buffer.iter_mut().zip(dict_array.keys().iter()) {
if let Some(key) = key {
let idx = key.to_usize().ok_or_else(|| {
DataFusionError::Internal(format!(
"Can not convert key value {:?} to usize in dictionary of type {:?}",
key,
dict_array.data_type()
))
})?;
*hash = dict_hashes[idx]
} // no update for Null, consistent with other hashes
}
}
Ok(())
}
/// Creates xxhash64 hash values for every row, based on the values in the
/// columns.
///
/// The number of rows to hash is determined by `hashes_buffer.len()`.
/// `hashes_buffer` should be pre-sized appropriately
fn create_xxhash64_hashes<'a>(
arrays: &[ArrayRef],
hashes_buffer: &'a mut [u64],
) -> Result<&'a mut [u64]> {
create_hashes_internal!(
arrays,
hashes_buffer,
spark_compatible_xxhash64,
create_xxhash64_hashes_dictionary
);
Ok(hashes_buffer)
}
#[cfg(test)]
mod tests {
use arrow::array::{Float32Array, Float64Array};
use std::sync::Arc;
use super::create_xxhash64_hashes;
use crate::test_hashes_with_nulls;
use datafusion::arrow::array::{ArrayRef, Int32Array, Int64Array, Int8Array, StringArray};
fn test_xxhash64_hash<I: Clone, T: arrow_array::Array + From<Vec<Option<I>>> + 'static>(
values: Vec<Option<I>>,
expected: Vec<u64>,
) {
test_hashes_with_nulls!(create_xxhash64_hashes, T, values, expected, u64);
}
#[test]
fn test_i8() {
test_xxhash64_hash::<i8, Int8Array>(
vec![Some(1), Some(0), Some(-1), Some(i8::MAX), Some(i8::MIN)],
vec![
0xa309b38455455929,
0x3229fbc4681e48f3,
0x1bfdda8861c06e45,
0x77cc15d9f9f2cdc2,
0x39bc22b9e94d81d0,
],
);
}
#[test]
fn test_i32() {
test_xxhash64_hash::<i32, Int32Array>(
vec![Some(1), Some(0), Some(-1), Some(i32::MAX), Some(i32::MIN)],
vec![
0xa309b38455455929,
0x3229fbc4681e48f3,
0x1bfdda8861c06e45,
0x14f0ac009c21721c,
0x1cc7cb8d034769cd,
],
);
}
#[test]
fn test_i64() {
test_xxhash64_hash::<i64, Int64Array>(
vec![Some(1), Some(0), Some(-1), Some(i64::MAX), Some(i64::MIN)],
vec![
0x9ed50fd59358d232,
0xb71b47ebda15746c,
0x358ae035bfb46fd2,
0xd2f1c616ae7eb306,
0x88608019c494c1f4,
],
);
}
#[test]
fn test_f32() {
test_xxhash64_hash::<f32, Float32Array>(
vec![
Some(1.0),
Some(0.0),
Some(-0.0),
Some(-1.0),
Some(99999999999.99999999999),
Some(-99999999999.99999999999),
],
vec![
0x9b92689757fcdbd,
0x3229fbc4681e48f3,
0x3229fbc4681e48f3,
0xa2becc0e61bb3823,
0x8f20ab82d4f3687f,
0xdce4982d97f7ac4,
],
)
}
#[test]
fn test_f64() {
test_xxhash64_hash::<f64, Float64Array>(
vec![
Some(1.0),
Some(0.0),
Some(-0.0),
Some(-1.0),
Some(99999999999.99999999999),
Some(-99999999999.99999999999),
],
vec![
0xe1fd6e07fee8ad53,
0xb71b47ebda15746c,
0xb71b47ebda15746c,
0x8cdde022746f8f1f,
0x793c5c88d313eac7,
0xc5e60e7b75d9b232,
],
)
}
#[test]
fn test_str() {
let input = [
"hello", "bar", "", "😁", "天地", "a", "ab", "abc", "abcd", "abcde",
]
.iter()
.map(|s| Some(s.to_string()))
.collect::<Vec<Option<String>>>();
test_xxhash64_hash::<String, StringArray>(
input,
vec![
0xc3629e6318d53932,
0xe7097b6a54378d8a,
0x98b1582b0977e704,
0xa80d9d5a6a523bd5,
0xfcba5f61ac666c61,
0x88e4fe59adf7b0cc,
0x259dd873209a3fe3,
0x13c1d910702770e6,
0xa17b5eb5dc364dff,
0xf241303e4a90f299,
],
)
}
}