blob: d25389bcb4ea85d26b3f1574e21812fcbc356624 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use crate::config::HudiConfigs;
use crate::file_group::FileGroup;
use crate::storage::Storage;
use crate::file_group::file_slice::FileSlice;
use crate::table::listing::FileLister;
use crate::table::partition::PartitionPruner;
use crate::Result;
use dashmap::DashMap;
/// A view of the Hudi table's data files (files stored outside the `.hoodie/` directory) in the file system. It provides APIs to load and
/// access the file groups and file slices.
#[derive(Clone, Debug)]
#[allow(dead_code)]
pub struct FileSystemView {
pub(crate) hudi_configs: Arc<HudiConfigs>,
pub(crate) storage: Arc<Storage>,
partition_to_file_groups: Arc<DashMap<String, Vec<FileGroup>>>,
}
impl FileSystemView {
pub async fn new(
hudi_configs: Arc<HudiConfigs>,
storage_options: Arc<HashMap<String, String>>,
) -> Result<Self> {
let storage = Storage::new(storage_options.clone(), hudi_configs.clone())?;
let partition_to_file_groups = Arc::new(DashMap::new());
Ok(FileSystemView {
hudi_configs,
storage,
partition_to_file_groups,
})
}
async fn load_file_groups(&self, partition_pruner: &PartitionPruner) -> Result<()> {
let lister = FileLister::new(
self.hudi_configs.clone(),
self.storage.clone(),
partition_pruner.to_owned(),
);
let file_groups_map = lister.list_file_groups_for_relevant_partitions().await?;
for (partition_path, file_groups) in file_groups_map {
self.partition_to_file_groups
.insert(partition_path, file_groups);
}
Ok(())
}
async fn collect_file_slices_as_of(
&self,
timestamp: &str,
partition_pruner: &PartitionPruner,
excluding_file_groups: &HashSet<FileGroup>,
) -> Result<Vec<FileSlice>> {
let mut file_slices = Vec::new();
for mut partition_entry in self.partition_to_file_groups.iter_mut() {
if !partition_pruner.should_include(partition_entry.key()) {
continue;
}
let file_groups = partition_entry.value_mut();
for fg in file_groups.iter_mut() {
if excluding_file_groups.contains(fg) {
continue;
}
if let Some(fsl) = fg.get_file_slice_mut_as_of(timestamp) {
fsl.load_metadata_if_needed(&self.storage).await?;
file_slices.push(fsl.clone());
}
}
}
Ok(file_slices)
}
pub async fn get_file_slices_as_of(
&self,
timestamp: &str,
partition_pruner: &PartitionPruner,
excluding_file_groups: &HashSet<FileGroup>,
) -> Result<Vec<FileSlice>> {
self.load_file_groups(partition_pruner).await?;
self.collect_file_slices_as_of(timestamp, partition_pruner, excluding_file_groups)
.await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::expr::filter::Filter;
use crate::table::Table;
use hudi_test::SampleTable;
use std::collections::HashSet;
#[tokio::test]
async fn fs_view_get_latest_file_slices() {
let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
let hudi_table = Table::new(base_url.path()).await.unwrap();
let latest_timestamp = hudi_table
.timeline
.completed_commits
.iter()
.next_back()
.map(|i| i.timestamp.clone())
.unwrap();
let fs_view = &hudi_table.file_system_view;
assert!(fs_view.partition_to_file_groups.is_empty());
let partition_pruner = PartitionPruner::empty();
let excludes = HashSet::new();
let file_slices = fs_view
.get_file_slices_as_of(&latest_timestamp, &partition_pruner, &excludes)
.await
.unwrap();
assert_eq!(fs_view.partition_to_file_groups.len(), 1);
assert_eq!(file_slices.len(), 1);
let file_ids = file_slices
.iter()
.map(|fsl| fsl.file_id())
.collect::<Vec<_>>();
assert_eq!(file_ids, vec!["a079bdb3-731c-4894-b855-abfcd6921007-0"]);
for fsl in file_slices.iter() {
assert_eq!(fsl.base_file.file_metadata.as_ref().unwrap().num_records, 4);
}
}
#[tokio::test]
async fn fs_view_get_latest_file_slices_with_replace_commit() {
let base_url = SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_cow();
let hudi_table = Table::new(base_url.path()).await.unwrap();
let latest_timestamp = hudi_table
.timeline
.completed_commits
.iter()
.next_back()
.map(|i| i.timestamp.clone())
.unwrap();
let fs_view = &hudi_table.file_system_view;
assert_eq!(fs_view.partition_to_file_groups.len(), 0);
let partition_pruner = PartitionPruner::empty();
let excludes = &hudi_table
.timeline
.get_replaced_file_groups_as_of(&latest_timestamp)
.await
.unwrap();
let file_slices = fs_view
.get_file_slices_as_of(&latest_timestamp, &partition_pruner, excludes)
.await
.unwrap();
assert_eq!(fs_view.partition_to_file_groups.len(), 3);
assert_eq!(file_slices.len(), 1);
let file_ids = file_slices
.iter()
.map(|fsl| fsl.file_id())
.collect::<Vec<_>>();
assert_eq!(file_ids, vec!["ebcb261d-62d3-4895-90ec-5b3c9622dff4-0"]);
for fsl in file_slices.iter() {
assert_eq!(fsl.base_file.file_metadata.as_ref().unwrap().num_records, 1);
}
}
#[tokio::test]
async fn fs_view_get_latest_file_slices_with_partition_filters() {
let base_url = SampleTable::V6ComplexkeygenHivestyle.url_to_cow();
let hudi_table = Table::new(base_url.path()).await.unwrap();
let latest_timestamp = hudi_table
.timeline
.completed_commits
.iter()
.next_back()
.map(|i| i.timestamp.clone())
.unwrap();
let fs_view = &hudi_table.file_system_view;
assert_eq!(fs_view.partition_to_file_groups.len(), 0);
let excludes = &hudi_table
.timeline
.get_replaced_file_groups_as_of(&latest_timestamp)
.await
.unwrap();
let partition_schema = hudi_table.get_partition_schema().await.unwrap();
let filter_lt_20 = Filter::try_from(("byteField", "<", "20")).unwrap();
let filter_eq_300 = Filter::try_from(("shortField", "=", "300")).unwrap();
let partition_pruner = PartitionPruner::new(
&[filter_lt_20, filter_eq_300],
&partition_schema,
hudi_table.hudi_configs.as_ref(),
)
.unwrap();
let file_slices = fs_view
.get_file_slices_as_of(&latest_timestamp, &partition_pruner, excludes)
.await
.unwrap();
assert_eq!(fs_view.partition_to_file_groups.len(), 1);
assert_eq!(file_slices.len(), 1);
let file_ids = file_slices
.iter()
.map(|fsl| fsl.file_id())
.collect::<Vec<_>>();
assert_eq!(file_ids, vec!["a22e8257-e249-45e9-ba46-115bc85adcba-0"]);
for fsl in file_slices.iter() {
assert_eq!(fsl.base_file.file_metadata.as_ref().unwrap().num_records, 2);
}
}
}