| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| use std::collections::{HashMap, HashSet}; |
| use std::sync::Arc; |
| |
| use crate::config::HudiConfigs; |
| use crate::file_group::FileGroup; |
| use crate::storage::Storage; |
| |
| use crate::file_group::file_slice::FileSlice; |
| use crate::table::listing::FileLister; |
| use crate::table::partition::PartitionPruner; |
| use crate::Result; |
| use dashmap::DashMap; |
| |
| /// A view of the Hudi table's data files (files stored outside the `.hoodie/` directory) in the file system. It provides APIs to load and |
| /// access the file groups and file slices. |
| #[derive(Clone, Debug)] |
| #[allow(dead_code)] |
| pub struct FileSystemView { |
| pub(crate) hudi_configs: Arc<HudiConfigs>, |
| pub(crate) storage: Arc<Storage>, |
| partition_to_file_groups: Arc<DashMap<String, Vec<FileGroup>>>, |
| } |
| |
| impl FileSystemView { |
| pub async fn new( |
| hudi_configs: Arc<HudiConfigs>, |
| storage_options: Arc<HashMap<String, String>>, |
| ) -> Result<Self> { |
| let storage = Storage::new(storage_options.clone(), hudi_configs.clone())?; |
| let partition_to_file_groups = Arc::new(DashMap::new()); |
| Ok(FileSystemView { |
| hudi_configs, |
| storage, |
| partition_to_file_groups, |
| }) |
| } |
| |
| async fn load_file_groups(&self, partition_pruner: &PartitionPruner) -> Result<()> { |
| let lister = FileLister::new( |
| self.hudi_configs.clone(), |
| self.storage.clone(), |
| partition_pruner.to_owned(), |
| ); |
| let file_groups_map = lister.list_file_groups_for_relevant_partitions().await?; |
| for (partition_path, file_groups) in file_groups_map { |
| self.partition_to_file_groups |
| .insert(partition_path, file_groups); |
| } |
| Ok(()) |
| } |
| |
| async fn collect_file_slices_as_of( |
| &self, |
| timestamp: &str, |
| partition_pruner: &PartitionPruner, |
| excluding_file_groups: &HashSet<FileGroup>, |
| ) -> Result<Vec<FileSlice>> { |
| let mut file_slices = Vec::new(); |
| for mut partition_entry in self.partition_to_file_groups.iter_mut() { |
| if !partition_pruner.should_include(partition_entry.key()) { |
| continue; |
| } |
| let file_groups = partition_entry.value_mut(); |
| for fg in file_groups.iter_mut() { |
| if excluding_file_groups.contains(fg) { |
| continue; |
| } |
| if let Some(fsl) = fg.get_file_slice_mut_as_of(timestamp) { |
| fsl.load_metadata_if_needed(&self.storage).await?; |
| file_slices.push(fsl.clone()); |
| } |
| } |
| } |
| Ok(file_slices) |
| } |
| |
| pub async fn get_file_slices_as_of( |
| &self, |
| timestamp: &str, |
| partition_pruner: &PartitionPruner, |
| excluding_file_groups: &HashSet<FileGroup>, |
| ) -> Result<Vec<FileSlice>> { |
| self.load_file_groups(partition_pruner).await?; |
| self.collect_file_slices_as_of(timestamp, partition_pruner, excluding_file_groups) |
| .await |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| use crate::expr::filter::Filter; |
| use crate::table::Table; |
| |
| use hudi_test::SampleTable; |
| use std::collections::HashSet; |
| |
| #[tokio::test] |
| async fn fs_view_get_latest_file_slices() { |
| let base_url = SampleTable::V6Nonpartitioned.url_to_cow(); |
| let hudi_table = Table::new(base_url.path()).await.unwrap(); |
| let latest_timestamp = hudi_table |
| .timeline |
| .completed_commits |
| .iter() |
| .next_back() |
| .map(|i| i.timestamp.clone()) |
| .unwrap(); |
| let fs_view = &hudi_table.file_system_view; |
| |
| assert!(fs_view.partition_to_file_groups.is_empty()); |
| let partition_pruner = PartitionPruner::empty(); |
| let excludes = HashSet::new(); |
| let file_slices = fs_view |
| .get_file_slices_as_of(&latest_timestamp, &partition_pruner, &excludes) |
| .await |
| .unwrap(); |
| assert_eq!(fs_view.partition_to_file_groups.len(), 1); |
| assert_eq!(file_slices.len(), 1); |
| let file_ids = file_slices |
| .iter() |
| .map(|fsl| fsl.file_id()) |
| .collect::<Vec<_>>(); |
| assert_eq!(file_ids, vec!["a079bdb3-731c-4894-b855-abfcd6921007-0"]); |
| for fsl in file_slices.iter() { |
| assert_eq!(fsl.base_file.file_metadata.as_ref().unwrap().num_records, 4); |
| } |
| } |
| |
| #[tokio::test] |
| async fn fs_view_get_latest_file_slices_with_replace_commit() { |
| let base_url = SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_cow(); |
| let hudi_table = Table::new(base_url.path()).await.unwrap(); |
| let latest_timestamp = hudi_table |
| .timeline |
| .completed_commits |
| .iter() |
| .next_back() |
| .map(|i| i.timestamp.clone()) |
| .unwrap(); |
| let fs_view = &hudi_table.file_system_view; |
| |
| assert_eq!(fs_view.partition_to_file_groups.len(), 0); |
| let partition_pruner = PartitionPruner::empty(); |
| let excludes = &hudi_table |
| .timeline |
| .get_replaced_file_groups_as_of(&latest_timestamp) |
| .await |
| .unwrap(); |
| let file_slices = fs_view |
| .get_file_slices_as_of(&latest_timestamp, &partition_pruner, excludes) |
| .await |
| .unwrap(); |
| assert_eq!(fs_view.partition_to_file_groups.len(), 3); |
| assert_eq!(file_slices.len(), 1); |
| let file_ids = file_slices |
| .iter() |
| .map(|fsl| fsl.file_id()) |
| .collect::<Vec<_>>(); |
| assert_eq!(file_ids, vec!["ebcb261d-62d3-4895-90ec-5b3c9622dff4-0"]); |
| for fsl in file_slices.iter() { |
| assert_eq!(fsl.base_file.file_metadata.as_ref().unwrap().num_records, 1); |
| } |
| } |
| |
| #[tokio::test] |
| async fn fs_view_get_latest_file_slices_with_partition_filters() { |
| let base_url = SampleTable::V6ComplexkeygenHivestyle.url_to_cow(); |
| let hudi_table = Table::new(base_url.path()).await.unwrap(); |
| let latest_timestamp = hudi_table |
| .timeline |
| .completed_commits |
| .iter() |
| .next_back() |
| .map(|i| i.timestamp.clone()) |
| .unwrap(); |
| let fs_view = &hudi_table.file_system_view; |
| |
| assert_eq!(fs_view.partition_to_file_groups.len(), 0); |
| |
| let excludes = &hudi_table |
| .timeline |
| .get_replaced_file_groups_as_of(&latest_timestamp) |
| .await |
| .unwrap(); |
| let partition_schema = hudi_table.get_partition_schema().await.unwrap(); |
| |
| let filter_lt_20 = Filter::try_from(("byteField", "<", "20")).unwrap(); |
| let filter_eq_300 = Filter::try_from(("shortField", "=", "300")).unwrap(); |
| let partition_pruner = PartitionPruner::new( |
| &[filter_lt_20, filter_eq_300], |
| &partition_schema, |
| hudi_table.hudi_configs.as_ref(), |
| ) |
| .unwrap(); |
| |
| let file_slices = fs_view |
| .get_file_slices_as_of(&latest_timestamp, &partition_pruner, excludes) |
| .await |
| .unwrap(); |
| assert_eq!(fs_view.partition_to_file_groups.len(), 1); |
| assert_eq!(file_slices.len(), 1); |
| |
| let file_ids = file_slices |
| .iter() |
| .map(|fsl| fsl.file_id()) |
| .collect::<Vec<_>>(); |
| assert_eq!(file_ids, vec!["a22e8257-e249-45e9-ba46-115bc85adcba-0"]); |
| for fsl in file_slices.iter() { |
| assert_eq!(fsl.base_file.file_metadata.as_ref().unwrap().num_records, 2); |
| } |
| } |
| } |