blob: 46efacdbfbbb3aeb547f69cd102ae03356f843f0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
use crate::config::table::HudiTableConfig;
use crate::config::HudiConfigs;
use crate::error::CoreError::InvalidPartitionPath;
use crate::expr::filter::{Filter, SchemableFilter};
use crate::Result;
use arrow_array::{ArrayRef, Scalar};
use arrow_schema::Schema;
use crate::config::table::HudiTableConfig::{KeyGeneratorClass, PartitionFields};
use std::collections::HashMap;
use std::sync::Arc;
pub const PARTITION_METAFIELD_PREFIX: &str = ".hoodie_partition_metadata";
pub const EMPTY_PARTITION_PATH: &str = "";
pub fn is_table_partitioned(hudi_configs: &HudiConfigs) -> bool {
let has_partition_fields = {
let partition_fields: Vec<String> = hudi_configs.get_or_default(PartitionFields).into();
!partition_fields.is_empty()
};
let uses_non_partitioned_key_gen = hudi_configs
.try_get(KeyGeneratorClass)
.map(|key_gen| {
let key_gen_str: String = key_gen.into();
key_gen_str == "org.apache.hudi.keygen.NonpartitionedKeyGenerator"
})
.unwrap_or(false);
has_partition_fields && !uses_non_partitioned_key_gen
}
/// A partition pruner that filters partitions based on the partition path and its filters.
#[derive(Debug, Clone)]
pub struct PartitionPruner {
schema: Arc<Schema>,
is_hive_style: bool,
is_url_encoded: bool,
and_filters: Vec<SchemableFilter>,
}
impl PartitionPruner {
pub fn new(
and_filters: &[Filter],
partition_schema: &Schema,
hudi_configs: &HudiConfigs,
) -> Result<Self> {
let and_filters: Vec<SchemableFilter> = and_filters
.iter()
.filter_map(|filter| SchemableFilter::try_from((filter.clone(), partition_schema)).ok())
.collect();
let schema = Arc::new(partition_schema.clone());
let is_hive_style: bool = hudi_configs
.get_or_default(HudiTableConfig::IsHiveStylePartitioning)
.into();
let is_url_encoded: bool = hudi_configs
.get_or_default(HudiTableConfig::IsPartitionPathUrlencoded)
.into();
Ok(PartitionPruner {
schema,
is_hive_style,
is_url_encoded,
and_filters,
})
}
/// Creates an empty partition pruner that does not filter any partitions.
pub fn empty() -> Self {
PartitionPruner {
schema: Arc::new(Schema::empty()),
is_hive_style: false,
is_url_encoded: false,
and_filters: Vec::new(),
}
}
/// Returns `true` if the partition pruner does not have any filters.
pub fn is_empty(&self) -> bool {
self.and_filters.is_empty()
}
/// Returns `true` if the partition path should be included based on the filters.
pub fn should_include(&self, partition_path: &str) -> bool {
let segments = match self.parse_segments(partition_path) {
Ok(s) => s,
Err(_) => return true, // Include the partition regardless of parsing error
};
self.and_filters.iter().all(|filter| {
match segments.get(filter.field.name()) {
Some(segment_value) => {
match filter.apply_comparsion(segment_value) {
Ok(scalar) => scalar.value(0),
Err(_) => true, // Include the partition when comparison error occurs
}
}
None => true, // Include the partition when filtering field does not match any field in the partition
}
})
}
fn parse_segments(&self, partition_path: &str) -> Result<HashMap<String, Scalar<ArrayRef>>> {
let partition_path = if self.is_url_encoded {
percent_encoding::percent_decode(partition_path.as_bytes())
.decode_utf8()?
.into_owned()
} else {
partition_path.to_string()
};
let parts: Vec<&str> = partition_path.split('/').collect();
if parts.len() != self.schema.fields().len() {
return Err(InvalidPartitionPath(format!(
"Partition path should have {} part(s) but got {}",
self.schema.fields().len(),
parts.len()
)));
}
self.schema
.fields()
.iter()
.zip(parts)
.map(|(field, part)| {
let value = if self.is_hive_style {
let (name, value) = part.split_once('=').ok_or(InvalidPartitionPath(
format!("Partition path should be hive-style but got {}", part),
))?;
if name != field.name() {
return Err(InvalidPartitionPath(format!(
"Partition path should contain {} but got {}",
field.name(),
name
)));
}
value
} else {
part
};
let scalar = SchemableFilter::cast_value(&[value], field.data_type())?;
Ok((field.name().to_string(), scalar))
})
.collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::table::HudiTableConfig::{
IsHiveStylePartitioning, IsPartitionPathUrlencoded,
};
use crate::expr::ExprOperator;
use arrow::datatypes::{DataType, Field, Schema};
use arrow_array::Date32Array;
use std::str::FromStr;
fn create_test_schema() -> Schema {
Schema::new(vec![
Field::new("date", DataType::Date32, false),
Field::new("category", DataType::Utf8, false),
Field::new("count", DataType::Int32, false),
])
}
fn create_hudi_configs(is_hive_style: bool, is_url_encoded: bool) -> HudiConfigs {
HudiConfigs::new([
(IsHiveStylePartitioning, is_hive_style.to_string()),
(IsPartitionPathUrlencoded, is_url_encoded.to_string()),
])
}
#[test]
fn test_partition_pruner_new() {
let schema = create_test_schema();
let configs = create_hudi_configs(true, false);
let filter_gt_date = Filter::try_from(("date", ">", "2023-01-01")).unwrap();
let filter_eq_a = Filter::try_from(("category", "=", "A")).unwrap();
let pruner = PartitionPruner::new(&[filter_gt_date, filter_eq_a], &schema, &configs);
assert!(pruner.is_ok());
let pruner = pruner.unwrap();
assert_eq!(pruner.and_filters.len(), 2);
assert!(pruner.is_hive_style);
assert!(!pruner.is_url_encoded);
}
#[test]
fn test_partition_pruner_empty() {
let pruner = PartitionPruner::empty();
assert!(pruner.is_empty());
assert!(!pruner.is_hive_style);
assert!(!pruner.is_url_encoded);
}
#[test]
fn test_partition_pruner_is_empty() {
let schema = create_test_schema();
let configs = create_hudi_configs(false, false);
let pruner_empty = PartitionPruner::new(&[], &schema, &configs).unwrap();
assert!(pruner_empty.is_empty());
let filter_gt_date = Filter::try_from(("date", ">", "2023-01-01")).unwrap();
let pruner_non_empty = PartitionPruner::new(&[filter_gt_date], &schema, &configs).unwrap();
assert!(!pruner_non_empty.is_empty());
}
#[test]
fn test_partition_pruner_should_include() {
let schema = create_test_schema();
let configs = create_hudi_configs(true, false);
let filter_gt_date = Filter::try_from(("date", ">", "2023-01-01")).unwrap();
let filter_eq_a = Filter::try_from(("category", "=", "A")).unwrap();
let filter_lte_100 = Filter::try_from(("count", "<=", "100")).unwrap();
let pruner = PartitionPruner::new(
&[filter_gt_date, filter_eq_a, filter_lte_100],
&schema,
&configs,
)
.unwrap();
assert!(pruner.should_include("date=2023-02-01/category=A/count=10"));
assert!(pruner.should_include("date=2023-02-01/category=A/count=100"));
assert!(!pruner.should_include("date=2022-12-31/category=A/count=10"));
assert!(!pruner.should_include("date=2023-02-01/category=B/count=10"));
}
#[test]
fn test_partition_pruner_parse_segments() {
let schema = create_test_schema();
let configs = create_hudi_configs(true, false);
let pruner = PartitionPruner::new(&[], &schema, &configs).unwrap();
let segments = pruner
.parse_segments("date=2023-02-01/category=A/count=10")
.unwrap();
assert_eq!(segments.len(), 3);
assert!(segments.contains_key("date"));
assert!(segments.contains_key("category"));
assert!(segments.contains_key("count"));
}
#[test]
fn test_partition_pruner_url_encoded() {
let schema = create_test_schema();
let configs = create_hudi_configs(true, true);
let pruner = PartitionPruner::new(&[], &schema, &configs).unwrap();
let segments = pruner
.parse_segments("date%3D2023-02-01%2Fcategory%3DA%2Fcount%3D10")
.unwrap();
assert_eq!(segments.len(), 3);
assert!(segments.contains_key("date"));
assert!(segments.contains_key("category"));
assert!(segments.contains_key("count"));
}
#[test]
fn test_partition_pruner_invalid_path() {
let schema = create_test_schema();
let configs = create_hudi_configs(true, false);
let pruner = PartitionPruner::new(&[], &schema, &configs).unwrap();
let result = pruner.parse_segments("date=2023-02-01/category=A/count=10/extra");
assert!(matches!(result.unwrap_err(), InvalidPartitionPath(_)));
let result = pruner.parse_segments("date=2023-02-01/category=A/10");
assert!(matches!(result.unwrap_err(), InvalidPartitionPath(_)));
let result = pruner.parse_segments("date=2023-02-01/category=A/non_exist_field=10");
assert!(matches!(result.unwrap_err(), InvalidPartitionPath(_)));
}
#[test]
fn test_partition_filter_try_from_valid() {
let schema = create_test_schema();
let filter = Filter {
field_name: "date".to_string(),
operator: ExprOperator::Eq,
field_value: "2023-01-01".to_string(),
};
let partition_filter = SchemableFilter::try_from((filter, &schema)).unwrap();
assert_eq!(partition_filter.field.name(), "date");
assert_eq!(partition_filter.operator, ExprOperator::Eq);
let value_inner = partition_filter.value.into_inner();
let date_array = value_inner.as_any().downcast_ref::<Date32Array>().unwrap();
let date_value = date_array.value_as_date(0).unwrap();
assert_eq!(date_value.to_string(), "2023-01-01");
}
#[test]
fn test_partition_filter_try_from_invalid_field() {
let schema = create_test_schema();
let filter = Filter {
field_name: "invalid_field".to_string(),
operator: ExprOperator::Eq,
field_value: "2023-01-01".to_string(),
};
let result = SchemableFilter::try_from((filter, &schema));
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("Field invalid_field not found in schema"));
}
#[test]
fn test_partition_filter_try_from_invalid_value() {
let schema = create_test_schema();
let filter = Filter {
field_name: "count".to_string(),
operator: ExprOperator::Eq,
field_value: "not_a_number".to_string(),
};
let result = SchemableFilter::try_from((filter, &schema));
assert!(result.is_err());
}
#[test]
fn test_partition_filter_try_from_all_operators() {
let schema = create_test_schema();
for (op, _) in ExprOperator::TOKEN_OP_PAIRS {
let filter = Filter {
field_name: "count".to_string(),
operator: ExprOperator::from_str(op).unwrap(),
field_value: "5".to_string(),
};
let partition_filter = SchemableFilter::try_from((filter, &schema));
let filter = partition_filter.unwrap();
assert_eq!(filter.field.name(), "count");
assert_eq!(filter.operator, ExprOperator::from_str(op).unwrap());
}
}
}