| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| use crate::config::read::HudiReadConfig; |
| use crate::config::table::HudiTableConfig; |
| use crate::config::util::split_hudi_options_from_others; |
| use crate::config::HudiConfigs; |
| use crate::error::CoreError; |
| use crate::error::CoreError::ReadFileSliceError; |
| use crate::expr::filter::{Filter, SchemableFilter}; |
| use crate::file_group::file_slice::FileSlice; |
| use crate::file_group::log_file::scanner::{LogFileScanner, ScanResult}; |
| use crate::file_group::record_batches::RecordBatches; |
| use crate::merge::record_merger::RecordMerger; |
| use crate::metadata::meta_field::MetaField; |
| use crate::storage::Storage; |
| use crate::table::builder::OptionResolver; |
| use crate::timeline::selector::InstantRange; |
| use crate::Result; |
| use arrow::compute::and; |
| use arrow::compute::filter_record_batch; |
| use arrow_array::{BooleanArray, RecordBatch}; |
| use futures::TryFutureExt; |
| use std::convert::TryFrom; |
| use std::sync::Arc; |
| |
| /// The reader that handles all read operations against a file group. |
| #[derive(Clone, Debug)] |
| pub struct FileGroupReader { |
| hudi_configs: Arc<HudiConfigs>, |
| storage: Arc<Storage>, |
| } |
| |
| impl FileGroupReader { |
| /// Creates a new reader with the given Hudi configurations and overwriting options. |
| /// |
| /// # Notes |
| /// This API does **not** use [`OptionResolver`] that loads table properties from storage to resolve options. |
| pub(crate) fn new_with_configs_and_overwriting_options<I, K, V>( |
| hudi_configs: Arc<HudiConfigs>, |
| overwriting_options: I, |
| ) -> Result<Self> |
| where |
| I: IntoIterator<Item = (K, V)>, |
| K: AsRef<str>, |
| V: Into<String>, |
| { |
| let (hudi_opts, others) = split_hudi_options_from_others(overwriting_options); |
| |
| let mut final_opts = hudi_configs.as_options(); |
| final_opts.extend(hudi_opts); |
| let hudi_configs = Arc::new(HudiConfigs::new(final_opts)); |
| let storage = Storage::new(Arc::new(others), hudi_configs.clone())?; |
| |
| Ok(Self { |
| hudi_configs, |
| storage, |
| }) |
| } |
| |
| /// Creates a new reader with the given base URI and options. |
| /// |
| /// # Arguments |
| /// * `base_uri` - The base URI of the file group's residing table. |
| /// * `options` - Additional options for the reader. |
| /// |
| /// # Notes |
| /// This API uses [`OptionResolver`] that loads table properties from storage to resolve options. |
| pub fn new_with_options<I, K, V>(base_uri: &str, options: I) -> Result<Self> |
| where |
| I: IntoIterator<Item = (K, V)>, |
| K: AsRef<str>, |
| V: Into<String>, |
| { |
| tokio::runtime::Builder::new_current_thread() |
| .enable_all() |
| .build()? |
| .block_on(async { |
| let mut resolver = OptionResolver::new_with_options(base_uri, options); |
| resolver.resolve_options().await?; |
| let hudi_configs = Arc::new(HudiConfigs::new(resolver.hudi_options)); |
| let storage = |
| Storage::new(Arc::new(resolver.storage_options), hudi_configs.clone())?; |
| |
| Ok(Self { |
| hudi_configs, |
| storage, |
| }) |
| }) |
| } |
| |
| fn create_filtering_mask_for_base_file_records( |
| &self, |
| records: &RecordBatch, |
| ) -> Result<Option<BooleanArray>> { |
| let populates_meta_fields: bool = self |
| .hudi_configs |
| .get_or_default(HudiTableConfig::PopulatesMetaFields) |
| .into(); |
| if !populates_meta_fields { |
| // If meta fields are not populated, commit time filtering is not applicable. |
| return Ok(None); |
| } |
| |
| let mut and_filters: Vec<SchemableFilter> = Vec::new(); |
| let schema = MetaField::schema(); |
| if let Some(start) = self |
| .hudi_configs |
| .try_get(HudiReadConfig::FileGroupStartTimestamp) |
| .map(|v| -> String { v.into() }) |
| { |
| let filter: Filter = |
| Filter::try_from((MetaField::CommitTime.as_ref(), ">", start.as_str()))?; |
| let filter = SchemableFilter::try_from((filter, schema.as_ref()))?; |
| and_filters.push(filter); |
| } else { |
| // If start timestamp is not provided, the query is snapshot or time-travel, so |
| // commit time filtering is not needed as the base file being read is already |
| // filtered and selected by the timeline. |
| return Ok(None); |
| } |
| |
| if let Some(end) = self |
| .hudi_configs |
| .try_get(HudiReadConfig::FileGroupEndTimestamp) |
| .map(|v| -> String { v.into() }) |
| { |
| let filter = Filter::try_from((MetaField::CommitTime.as_ref(), "<=", end.as_str()))?; |
| let filter = SchemableFilter::try_from((filter, schema.as_ref()))?; |
| and_filters.push(filter); |
| } |
| |
| if and_filters.is_empty() { |
| return Ok(None); |
| } |
| |
| let mut mask = BooleanArray::from(vec![true; records.num_rows()]); |
| for filter in &and_filters { |
| let col_name = filter.field.name().as_str(); |
| let col_values = records |
| .column_by_name(col_name) |
| .ok_or_else(|| ReadFileSliceError(format!("Column {col_name} not found")))?; |
| |
| let comparison = filter.apply_comparsion(col_values)?; |
| mask = and(&mask, &comparison)?; |
| } |
| Ok(Some(mask)) |
| } |
| |
| /// Reads the data from the base file at the given relative path. |
| /// |
| /// # Arguments |
| /// * `relative_path` - The relative path to the base file. |
| /// |
| /// # Returns |
| /// A record batch read from the base file. |
| pub async fn read_file_slice_by_base_file_path( |
| &self, |
| relative_path: &str, |
| ) -> Result<RecordBatch> { |
| let records: RecordBatch = self |
| .storage |
| .get_parquet_file_data(relative_path) |
| .map_err(|e| ReadFileSliceError(format!("Failed to read path {relative_path}: {e:?}"))) |
| .await?; |
| |
| if let Some(mask) = self.create_filtering_mask_for_base_file_records(&records)? { |
| filter_record_batch(&records, &mask) |
| .map_err(|e| ReadFileSliceError(format!("Failed to filter records: {e:?}"))) |
| } else { |
| Ok(records) |
| } |
| } |
| |
| /// Same as [FileGroupReader::read_file_slice_by_base_file_path], but blocking. |
| pub fn read_file_slice_by_base_file_path_blocking( |
| &self, |
| relative_path: &str, |
| ) -> Result<RecordBatch> { |
| tokio::runtime::Builder::new_current_thread() |
| .enable_all() |
| .build()? |
| .block_on(self.read_file_slice_by_base_file_path(relative_path)) |
| } |
| |
| fn create_instant_range_for_log_file_scan(&self) -> InstantRange { |
| let timezone = self |
| .hudi_configs |
| .get_or_default(HudiTableConfig::TimelineTimezone) |
| .into(); |
| let start_timestamp = self |
| .hudi_configs |
| .try_get(HudiReadConfig::FileGroupStartTimestamp) |
| .map(|v| -> String { v.into() }); |
| let end_timestamp = self |
| .hudi_configs |
| .try_get(HudiReadConfig::FileGroupEndTimestamp) |
| .map(|v| -> String { v.into() }); |
| InstantRange::new(timezone, start_timestamp, end_timestamp, false, true) |
| } |
| |
| /// Reads the data from the given file slice. |
| /// |
| /// # Arguments |
| /// * `file_slice` - The file slice to read. |
| /// |
| /// # Returns |
| /// A record batch read from the file slice. |
| pub async fn read_file_slice(&self, file_slice: &FileSlice) -> Result<RecordBatch> { |
| let base_file_path = file_slice.base_file_relative_path()?; |
| let log_file_paths = if file_slice.has_log_file() { |
| file_slice |
| .log_files |
| .iter() |
| .map(|log_file| file_slice.log_file_relative_path(log_file)) |
| .collect::<Result<Vec<String>>>()? |
| } else { |
| vec![] |
| }; |
| self.read_file_slice_from_paths(&base_file_path, log_file_paths) |
| .await |
| } |
| |
| /// Same as [FileGroupReader::read_file_slice], but blocking. |
| pub fn read_file_slice_blocking(&self, file_slice: &FileSlice) -> Result<RecordBatch> { |
| tokio::runtime::Builder::new_current_thread() |
| .enable_all() |
| .build()? |
| .block_on(self.read_file_slice(file_slice)) |
| } |
| |
| /// Reads a file slice from a base file and a list of log files. |
| /// |
| /// # Arguments |
| /// * `base_file_path` - The relative path to the base file. |
| /// * `log_file_paths` - An iterator of relative paths to log files. |
| /// |
| /// # Returns |
| /// A record batch read from the base file merged with log files. |
| pub async fn read_file_slice_from_paths<I, S>( |
| &self, |
| base_file_path: &str, |
| log_file_paths: I, |
| ) -> Result<RecordBatch> |
| where |
| I: IntoIterator<Item = S>, |
| S: AsRef<str>, |
| { |
| let log_file_paths: Vec<String> = log_file_paths |
| .into_iter() |
| .map(|s| s.as_ref().to_string()) |
| .collect(); |
| let use_read_optimized: bool = self |
| .hudi_configs |
| .get_or_default(HudiReadConfig::UseReadOptimizedMode) |
| .into(); |
| let base_file_only = log_file_paths.is_empty() || use_read_optimized; |
| |
| if base_file_only { |
| self.read_file_slice_by_base_file_path(base_file_path).await |
| } else { |
| let instant_range = self.create_instant_range_for_log_file_scan(); |
| let scan_result = LogFileScanner::new(self.hudi_configs.clone(), self.storage.clone()) |
| .scan(log_file_paths, &instant_range) |
| .await?; |
| |
| let log_batches = match scan_result { |
| ScanResult::RecordBatches(batches) => batches, |
| ScanResult::Empty => RecordBatches::new(), |
| ScanResult::HFileRecords(_) => { |
| return Err(CoreError::LogBlockError( |
| "Unexpected HFile records in regular table log file".to_string(), |
| )); |
| } |
| }; |
| |
| let base_batch = self |
| .read_file_slice_by_base_file_path(base_file_path) |
| .await?; |
| let schema = base_batch.schema(); |
| let num_data_batches = log_batches.num_data_batches() + 1; |
| let num_delete_batches = log_batches.num_delete_batches(); |
| let mut all_batches = |
| RecordBatches::new_with_capacity(num_data_batches, num_delete_batches); |
| all_batches.push_data_batch(base_batch); |
| all_batches.extend(log_batches); |
| |
| let merger = RecordMerger::new(schema.clone(), self.hudi_configs.clone()); |
| merger.merge_record_batches(all_batches) |
| } |
| } |
| |
| /// Same as [FileGroupReader::read_file_slice_from_paths], but blocking. |
| pub fn read_file_slice_from_paths_blocking<I, S>( |
| &self, |
| base_file_path: &str, |
| log_file_paths: I, |
| ) -> Result<RecordBatch> |
| where |
| I: IntoIterator<Item = S>, |
| S: AsRef<str>, |
| { |
| tokio::runtime::Builder::new_current_thread() |
| .enable_all() |
| .build()? |
| .block_on(self.read_file_slice_from_paths(base_file_path, log_file_paths)) |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| use crate::config::util::empty_options; |
| use crate::error::CoreError; |
| use crate::file_group::base_file::BaseFile; |
| use crate::file_group::file_slice::FileSlice; |
| use crate::Result; |
| use arrow::array::{ArrayRef, Int64Array, StringArray}; |
| use arrow::record_batch::RecordBatch; |
| use arrow_schema::{DataType, Field, Schema}; |
| use std::fs::canonicalize; |
| use std::path::PathBuf; |
| use std::str::FromStr; |
| use std::sync::Arc; |
| use url::Url; |
| |
| const TEST_SAMPLE_BASE_FILE: &str = |
| "a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet"; |
| const TEST_SAMPLE_LOG_FILE: &str = |
| ".a079bdb3-731c-4894-b855-abfcd6921007-0_20240418173551906.log.1_0-204-275"; |
| |
| fn get_non_existent_base_uri() -> String { |
| "file:///non-existent-path/table".to_string() |
| } |
| |
| fn get_base_uri_with_valid_props() -> String { |
| let url = Url::from_file_path( |
| canonicalize( |
| PathBuf::from("tests") |
| .join("data") |
| .join("table_props_valid"), |
| ) |
| .unwrap(), |
| ) |
| .unwrap(); |
| url.as_ref().to_string() |
| } |
| |
| fn get_base_uri_with_valid_props_minimum() -> String { |
| let url = Url::from_file_path( |
| canonicalize( |
| PathBuf::from("tests") |
| .join("data") |
| .join("table_props_valid_minimum"), |
| ) |
| .unwrap(), |
| ) |
| .unwrap(); |
| url.as_ref().to_string() |
| } |
| |
| fn get_base_uri_with_invalid_props() -> String { |
| let url = Url::from_file_path( |
| canonicalize( |
| PathBuf::from("tests") |
| .join("data") |
| .join("table_props_invalid"), |
| ) |
| .unwrap(), |
| ) |
| .unwrap(); |
| url.as_ref().to_string() |
| } |
| |
| #[test] |
| fn test_new_with_options() { |
| let options = vec![("key1", "value1"), ("key2", "value2")]; |
| let base_uri = get_base_uri_with_valid_props(); |
| let reader = FileGroupReader::new_with_options(&base_uri, options).unwrap(); |
| assert!(!reader.storage.options.is_empty()); |
| assert!(reader |
| .storage |
| .hudi_configs |
| .contains(HudiTableConfig::BasePath)); |
| } |
| |
| #[test] |
| fn test_new_with_options_invalid_base_uri_or_invalid_props() { |
| let base_uri = get_non_existent_base_uri(); |
| let result = FileGroupReader::new_with_options(&base_uri, empty_options()); |
| assert!(result.is_err()); |
| |
| let base_uri = get_base_uri_with_invalid_props(); |
| let result = FileGroupReader::new_with_options(&base_uri, empty_options()); |
| assert!(result.is_err()) |
| } |
| |
| fn create_test_record_batch() -> Result<RecordBatch> { |
| let schema = Schema::new(vec![ |
| Field::new("_hoodie_commit_time", DataType::Utf8, false), |
| Field::new("name", DataType::Utf8, false), |
| Field::new("age", DataType::Int64, false), |
| ]); |
| let schema = Arc::new(schema); |
| |
| let commit_times: ArrayRef = Arc::new(StringArray::from(vec!["1", "2", "3", "4", "5"])); |
| let names: ArrayRef = Arc::new(StringArray::from(vec![ |
| "Alice", "Bob", "Charlie", "David", "Eve", |
| ])); |
| let ages: ArrayRef = Arc::new(Int64Array::from(vec![25, 30, 35, 40, 45])); |
| |
| RecordBatch::try_new(schema, vec![commit_times, names, ages]).map_err(CoreError::ArrowError) |
| } |
| |
| #[test] |
| fn test_create_filtering_mask_for_base_file_records() -> Result<()> { |
| let base_uri = get_base_uri_with_valid_props_minimum(); |
| let records = create_test_record_batch()?; |
| |
| // Test case 1: Disable populating the meta fields |
| let reader = FileGroupReader::new_with_options( |
| &base_uri, |
| [ |
| (HudiTableConfig::PopulatesMetaFields.as_ref(), "false"), |
| (HudiReadConfig::FileGroupStartTimestamp.as_ref(), "2"), |
| ], |
| )?; |
| let mask = reader.create_filtering_mask_for_base_file_records(&records)?; |
| assert_eq!(mask, None, "Commit time filtering should not be needed"); |
| |
| // Test case 2: No commit time filtering options |
| let reader = FileGroupReader::new_with_options(&base_uri, empty_options())?; |
| let mask = reader.create_filtering_mask_for_base_file_records(&records)?; |
| assert_eq!(mask, None); |
| |
| // Test case 3: Filtering commit time > '2' |
| let reader = FileGroupReader::new_with_options( |
| &base_uri, |
| [(HudiReadConfig::FileGroupStartTimestamp, "2")], |
| )?; |
| let mask = reader.create_filtering_mask_for_base_file_records(&records)?; |
| assert_eq!( |
| mask, |
| Some(BooleanArray::from(vec![false, false, true, true, true])), |
| "Expected only records with commit_time > '2'" |
| ); |
| |
| // Test case 4: Filtering commit time <= '4' |
| let reader = FileGroupReader::new_with_options( |
| &base_uri, |
| [(HudiReadConfig::FileGroupEndTimestamp, "4")], |
| )?; |
| let mask = reader.create_filtering_mask_for_base_file_records(&records)?; |
| assert_eq!(mask, None, "Commit time filtering should not be needed"); |
| |
| // Test case 5: Filtering commit time > '2' and <= '4' |
| let reader = FileGroupReader::new_with_options( |
| &base_uri, |
| [ |
| (HudiReadConfig::FileGroupStartTimestamp, "2"), |
| (HudiReadConfig::FileGroupEndTimestamp, "4"), |
| ], |
| )?; |
| let mask = reader.create_filtering_mask_for_base_file_records(&records)?; |
| assert_eq!( |
| mask, |
| Some(BooleanArray::from(vec![false, false, true, true, false])), |
| "Expected only records with commit_time > '2' and <= '4'" |
| ); |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_read_file_slice_from_paths_with_base_file_only() -> Result<()> { |
| let base_uri = get_base_uri_with_valid_props_minimum(); |
| let reader = FileGroupReader::new_with_options(&base_uri, empty_options())?; |
| |
| // Test with actual test files and empty log files - should trigger base_file_only logic |
| let base_file_path = TEST_SAMPLE_BASE_FILE; |
| let log_file_paths: Vec<&str> = vec![]; |
| |
| let result = reader.read_file_slice_from_paths_blocking(base_file_path, log_file_paths); |
| |
| match result { |
| Ok(batch) => { |
| assert!( |
| batch.num_rows() > 0, |
| "Should have read some records from base file" |
| ); |
| } |
| Err(_) => { |
| // This might fail if the test data doesn't exist, which is acceptable for a unit test |
| } |
| } |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_read_file_slice_from_paths_read_optimized_mode() -> Result<()> { |
| let base_uri = get_base_uri_with_valid_props_minimum(); |
| let reader = FileGroupReader::new_with_options( |
| &base_uri, |
| [(HudiReadConfig::UseReadOptimizedMode.as_ref(), "true")], |
| )?; |
| |
| let base_file_path = TEST_SAMPLE_BASE_FILE; |
| let log_file_paths = vec![TEST_SAMPLE_LOG_FILE.to_string()]; |
| |
| let result = reader.read_file_slice_from_paths_blocking(base_file_path, log_file_paths); |
| |
| // In read-optimized mode, log files should be ignored |
| // This should behave the same as read_file_slice_by_base_file_path |
| match result { |
| Ok(_) => { |
| // Test passes if we get a result - the method correctly ignored log files |
| } |
| Err(e) => { |
| // Expected for missing test data |
| let error_msg = e.to_string(); |
| assert!( |
| error_msg.contains("not found") || error_msg.contains("No such file"), |
| "Expected file not found error, got: {}", |
| error_msg |
| ); |
| } |
| } |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_read_file_slice_from_paths_with_log_files() -> Result<()> { |
| let base_uri = get_base_uri_with_valid_props_minimum(); |
| let reader = FileGroupReader::new_with_options(&base_uri, empty_options())?; |
| |
| let base_file_path = TEST_SAMPLE_BASE_FILE; |
| let log_file_paths = vec![TEST_SAMPLE_LOG_FILE.to_string()]; |
| |
| let result = reader.read_file_slice_from_paths_blocking(base_file_path, log_file_paths); |
| |
| // The actual file reading might fail due to missing test data, which is expected |
| match result { |
| Ok(_batch) => { |
| // Test passes if we get a valid batch |
| } |
| Err(e) => { |
| // Expected for missing test data - verify it's a storage/file not found error |
| let error_msg = e.to_string(); |
| assert!( |
| error_msg.contains("not found") || error_msg.contains("No such file"), |
| "Expected file not found error, got: {}", |
| error_msg |
| ); |
| } |
| } |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_read_file_slice_from_paths_error_handling() -> Result<()> { |
| let base_uri = get_base_uri_with_valid_props_minimum(); |
| let reader = FileGroupReader::new_with_options(&base_uri, empty_options())?; |
| |
| // Test with non-existent base file |
| let base_file_path = "non_existent_file.parquet"; |
| let log_file_paths: Vec<&str> = vec![]; |
| |
| let result = reader.read_file_slice_from_paths_blocking(base_file_path, log_file_paths); |
| |
| assert!(result.is_err(), "Should return error for non-existent file"); |
| |
| let error_msg = result.unwrap_err().to_string(); |
| assert!( |
| error_msg.contains("not found") || error_msg.contains("Failed to read path"), |
| "Should contain appropriate error message, got: {}", |
| error_msg |
| ); |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_read_file_slice_blocking() -> Result<()> { |
| let base_uri = get_base_uri_with_valid_props_minimum(); |
| let reader = FileGroupReader::new_with_options(&base_uri, empty_options())?; |
| |
| // Create a FileSlice from the test sample base file |
| let base_file = BaseFile::from_str(TEST_SAMPLE_BASE_FILE)?; |
| let file_slice = FileSlice::new(base_file, String::new()); // empty partition path |
| |
| // Call read_file_slice_blocking |
| let result = reader.read_file_slice_blocking(&file_slice); |
| |
| match result { |
| Ok(batch) => { |
| assert!( |
| batch.num_rows() > 0, |
| "Should have read some records from base file" |
| ); |
| } |
| Err(e) => { |
| // Expected for missing test data - verify it's a file not found error |
| let error_msg = e.to_string(); |
| assert!( |
| error_msg.contains("Failed to read path") |
| || error_msg.contains("not found") |
| || error_msg.contains("No such file"), |
| "Expected file not found error, got: {}", |
| error_msg |
| ); |
| } |
| } |
| |
| Ok(()) |
| } |
| } |