| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| //! This module is responsible for Hudi table APIs. |
| //! |
| //! It provides a quick entry point for reading Hudi table metadata and data, |
| //! facilitating adaptation and compatibility across various engines. |
| //! |
| //! **Example** |
| //! 1. create hudi table |
| //! ```rust |
| //! use url::Url; |
| //! use hudi_core::table::Table; |
| //! |
| //! pub async fn test() { |
| //! let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap(); |
| //! let hudi_table = Table::new(base_uri.path()).await.unwrap(); |
| //! } |
| //! ``` |
| //! 2. get hudi table schema(arrow_schema::Schema) |
| //! ```rust |
| //! use url::Url; |
| //! use hudi_core::table::Table; |
| //! |
| //! pub async fn test() { |
| //! use arrow_schema::Schema; |
| //! let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap(); |
| //! let hudi_table = Table::new(base_uri.path()).await.unwrap(); |
| //! let schema = hudi_table.get_schema().await.unwrap(); |
| //! } |
| //! ``` |
| //! 3. read hudi table |
| //! ```rust |
| //! use url::Url; |
| //! use hudi_core::config::util::empty_filters; |
| //! use hudi_core::table::Table; |
| //! |
| //! pub async fn test() { |
| //! let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap(); |
| //! let hudi_table = Table::new(base_uri.path()).await.unwrap(); |
| //! let record_read = hudi_table.read_snapshot(empty_filters()).await.unwrap(); |
| //! } |
| //! ``` |
| //! 4. get file slice |
| //! Users can obtain metadata to customize reading methods, read in batches, perform parallel reads, and more. |
| //! ```rust |
| //! use url::Url; |
| //! use hudi_core::config::util::empty_filters; |
| //! use hudi_core::table::Table; |
| //! use hudi_core::storage::util::parse_uri; |
| //! use hudi_core::storage::util::join_url_segments; |
| //! |
| //! pub async fn test() { |
| //! let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap(); |
| //! let hudi_table = Table::new(base_uri.path()).await.unwrap(); |
| //! let file_slices = hudi_table |
| //! .get_file_slices_splits(2, empty_filters()) |
| //! .await.unwrap(); |
| //! // define every parquet task reader how many slice |
| //! let mut parquet_file_groups: Vec<Vec<String>> = Vec::new(); |
| //! for file_slice_vec in file_slices { |
| //! let file_group_vec = file_slice_vec |
| //! .iter() |
| //! .map(|f| { |
| //! let relative_path = f.base_file_relative_path().unwrap(); |
| //! let url = join_url_segments(&base_uri, &[relative_path.as_str()]).unwrap(); |
| //! url.path().to_string() |
| //! }) |
| //! .collect(); |
| //! parquet_file_groups.push(file_group_vec) |
| //! } |
| //! } |
| //! ``` |
| |
| pub mod builder; |
| mod fs_view; |
| mod listing; |
| pub mod partition; |
| mod validation; |
| |
| use crate::config::read::HudiReadConfig; |
| use crate::config::table::HudiTableConfig::PartitionFields; |
| use crate::config::table::{HudiTableConfig, TableTypeValue}; |
| use crate::config::HudiConfigs; |
| use crate::expr::filter::{from_str_tuples, Filter}; |
| use crate::file_group::file_slice::FileSlice; |
| use crate::file_group::reader::FileGroupReader; |
| use crate::schema::resolver::{resolve_avro_schema, resolve_schema}; |
| use crate::table::builder::TableBuilder; |
| use crate::table::fs_view::FileSystemView; |
| use crate::table::partition::PartitionPruner; |
| use crate::timeline::util::format_timestamp; |
| use crate::timeline::{Timeline, EARLIEST_START_TIMESTAMP}; |
| use crate::util::collection::split_into_chunks; |
| use crate::Result; |
| use arrow::record_batch::RecordBatch; |
| use arrow_schema::{Field, Schema}; |
| use std::collections::{HashMap, HashSet}; |
| use std::sync::Arc; |
| use url::Url; |
| |
| /// The main struct that provides table APIs for interacting with a Hudi table. |
| #[derive(Clone, Debug)] |
| pub struct Table { |
| pub hudi_configs: Arc<HudiConfigs>, |
| pub storage_options: Arc<HashMap<String, String>>, |
| pub timeline: Timeline, |
| pub file_system_view: FileSystemView, |
| } |
| |
| impl Table { |
| /// Create hudi table by base_uri |
| pub async fn new(base_uri: &str) -> Result<Self> { |
| TableBuilder::from_base_uri(base_uri).build().await |
| } |
| |
| /// Same as [Table::new], but blocking. |
| pub fn new_blocking(base_uri: &str) -> Result<Self> { |
| tokio::runtime::Builder::new_current_thread() |
| .enable_all() |
| .build()? |
| .block_on(async { Table::new(base_uri).await }) |
| } |
| |
| /// Create hudi table with options |
| pub async fn new_with_options<I, K, V>(base_uri: &str, options: I) -> Result<Self> |
| where |
| I: IntoIterator<Item = (K, V)>, |
| K: AsRef<str>, |
| V: Into<String>, |
| { |
| TableBuilder::from_base_uri(base_uri) |
| .with_options(options) |
| .build() |
| .await |
| } |
| |
| /// Same as [Table::new_with_options], but blocking. |
| pub fn new_with_options_blocking<I, K, V>(base_uri: &str, options: I) -> Result<Self> |
| where |
| I: IntoIterator<Item = (K, V)>, |
| K: AsRef<str>, |
| V: Into<String>, |
| { |
| tokio::runtime::Builder::new_current_thread() |
| .enable_all() |
| .build()? |
| .block_on(async { Table::new_with_options(base_uri, options).await }) |
| } |
| |
| pub fn hudi_options(&self) -> HashMap<String, String> { |
| self.hudi_configs.as_options() |
| } |
| |
| pub fn storage_options(&self) -> HashMap<String, String> { |
| self.storage_options.as_ref().clone() |
| } |
| |
| #[cfg(feature = "datafusion")] |
| pub fn register_storage( |
| &self, |
| runtime_env: Arc<datafusion::execution::runtime_env::RuntimeEnv>, |
| ) { |
| self.timeline |
| .storage |
| .register_object_store(runtime_env.clone()); |
| self.file_system_view |
| .storage |
| .register_object_store(runtime_env.clone()); |
| } |
| |
| pub fn base_url(&self) -> Url { |
| let err_msg = format!("{:?} is missing or invalid.", HudiTableConfig::BasePath); |
| self.hudi_configs |
| .get(HudiTableConfig::BasePath) |
| .expect(&err_msg) |
| .to_url() |
| .expect(&err_msg) |
| } |
| |
| pub fn table_name(&self) -> String { |
| let err_msg = format!("{:?} is missing or invalid.", HudiTableConfig::TableName); |
| self.hudi_configs |
| .get(HudiTableConfig::TableName) |
| .expect(&err_msg) |
| .into() |
| } |
| |
| pub fn table_type(&self) -> String { |
| let err_msg = format!("{:?} is missing or invalid.", HudiTableConfig::TableType); |
| self.hudi_configs |
| .get(HudiTableConfig::TableType) |
| .expect(&err_msg) |
| .into() |
| } |
| |
| pub fn is_mor(&self) -> bool { |
| self.table_type() == TableTypeValue::MergeOnRead.as_ref() |
| } |
| |
| pub fn timezone(&self) -> String { |
| self.hudi_configs |
| .get_or_default(HudiTableConfig::TimelineTimezone) |
| .into() |
| } |
| |
| /// Get the latest Avro schema string of the table. |
| /// |
| /// The implementation looks for the schema in the following order: |
| /// 1. Timeline commit metadata. |
| /// 2. `hoodie.properties` file's [HudiTableConfig::CreateSchema]. |
| /// |
| /// ### Note |
| /// |
| /// The schema returned does not contain Hudi's [MetaField]s, |
| /// which is different from the one returned by [Table::get_schema]. |
| pub async fn get_avro_schema(&self) -> Result<String> { |
| resolve_avro_schema(self).await |
| } |
| |
| /// Same as [Table::get_avro_schema], but blocking. |
| pub fn get_avro_schema_blocking(&self) -> Result<String> { |
| tokio::runtime::Builder::new_current_thread() |
| .enable_all() |
| .build()? |
| .block_on(async { self.get_avro_schema().await }) |
| } |
| |
| /// Get the latest [arrow_schema::Schema] of the table. |
| /// |
| /// The implementation looks for the schema in the following order: |
| /// 1. Timeline commit metadata. |
| /// 2. Base file schema. |
| /// 3. `hoodie.properties` file's [HudiTableConfig::CreateSchema]. |
| pub async fn get_schema(&self) -> Result<Schema> { |
| resolve_schema(self).await |
| } |
| |
| /// Same as [Table::get_schema], but blocking. |
| pub fn get_schema_blocking(&self) -> Result<Schema> { |
| tokio::runtime::Builder::new_current_thread() |
| .enable_all() |
| .build()? |
| .block_on(async { self.get_schema().await }) |
| } |
| |
| /// Get the latest partition [arrow_schema::Schema] of the table. |
| pub async fn get_partition_schema(&self) -> Result<Schema> { |
| let partition_fields: HashSet<String> = { |
| let fields: Vec<String> = self.hudi_configs.get_or_default(PartitionFields).into(); |
| fields.into_iter().collect() |
| }; |
| |
| let schema = self.get_schema().await?; |
| let partition_fields: Vec<Arc<Field>> = schema |
| .fields() |
| .iter() |
| .filter(|field| partition_fields.contains(field.name())) |
| .cloned() |
| .collect(); |
| |
| Ok(Schema::new(partition_fields)) |
| } |
| |
| /// Same as [Table::get_partition_schema], but blocking. |
| pub fn get_partition_schema_blocking(&self) -> Result<Schema> { |
| tokio::runtime::Builder::new_current_thread() |
| .enable_all() |
| .build()? |
| .block_on(async { self.get_partition_schema().await }) |
| } |
| |
| /// Get the [Timeline] of the table. |
| pub fn get_timeline(&self) -> &Timeline { |
| &self.timeline |
| } |
| |
| /// Get all the [FileSlice]s in splits from the table. |
| /// |
| /// # Arguments |
| /// * `num_splits` - The number of chunks to split the file slices into. |
| /// * `filters` - Partition filters to apply. |
| pub async fn get_file_slices_splits<I, S>( |
| &self, |
| num_splits: usize, |
| filters: I, |
| ) -> Result<Vec<Vec<FileSlice>>> |
| where |
| I: IntoIterator<Item = (S, S, S)>, |
| S: AsRef<str>, |
| { |
| if let Some(timestamp) = self.timeline.get_latest_commit_timestamp_as_option() { |
| let filters = from_str_tuples(filters)?; |
| self.get_file_slices_splits_internal(num_splits, timestamp, &filters) |
| .await |
| } else { |
| Ok(Vec::new()) |
| } |
| } |
| |
| /// Same as [Table::get_file_slices_splits], but blocking. |
| pub fn get_file_slices_splits_blocking<I, S>( |
| &self, |
| num_splits: usize, |
| filters: I, |
| ) -> Result<Vec<Vec<FileSlice>>> |
| where |
| I: IntoIterator<Item = (S, S, S)>, |
| S: AsRef<str>, |
| { |
| tokio::runtime::Builder::new_current_thread() |
| .enable_all() |
| .build()? |
| .block_on(async { self.get_file_slices_splits(num_splits, filters).await }) |
| } |
| |
| /// Get all the [FileSlice]s in splits from the table at a given timestamp. |
| /// |
| /// # Arguments |
| /// * `num_splits` - The number of chunks to split the file slices into. |
| /// * `timestamp` - The timestamp which file slices associated with. |
| /// * `filters` - Partition filters to apply. |
| pub async fn get_file_slices_splits_as_of<I, S>( |
| &self, |
| num_splits: usize, |
| timestamp: &str, |
| filters: I, |
| ) -> Result<Vec<Vec<FileSlice>>> |
| where |
| I: IntoIterator<Item = (S, S, S)>, |
| S: AsRef<str>, |
| { |
| let timestamp = format_timestamp(timestamp, &self.timezone())?; |
| let filters = from_str_tuples(filters)?; |
| self.get_file_slices_splits_internal(num_splits, ×tamp, &filters) |
| .await |
| } |
| |
| /// Same as [Table::get_file_slices_splits_as_of], but blocking. |
| pub fn get_file_slices_splits_as_of_blocking<I, S>( |
| &self, |
| num_splits: usize, |
| timestamp: &str, |
| filters: I, |
| ) -> Result<Vec<Vec<FileSlice>>> |
| where |
| I: IntoIterator<Item = (S, S, S)>, |
| S: AsRef<str>, |
| { |
| tokio::runtime::Builder::new_current_thread() |
| .enable_all() |
| .build()? |
| .block_on(async { |
| self.get_file_slices_splits_as_of(num_splits, timestamp, filters) |
| .await |
| }) |
| } |
| |
| async fn get_file_slices_splits_internal( |
| &self, |
| num_splits: usize, |
| timestamp: &str, |
| filters: &[Filter], |
| ) -> Result<Vec<Vec<FileSlice>>> { |
| let file_slices = self.get_file_slices_internal(timestamp, filters).await?; |
| Ok(split_into_chunks(file_slices, num_splits)) |
| } |
| |
| /// Get all the [FileSlice]s in the table. |
| /// |
| /// # Arguments |
| /// * `filters` - Partition filters to apply. |
| /// |
| /// # Notes |
| /// * This API is useful for implementing snapshot query. |
| pub async fn get_file_slices<I, S>(&self, filters: I) -> Result<Vec<FileSlice>> |
| where |
| I: IntoIterator<Item = (S, S, S)>, |
| S: AsRef<str>, |
| { |
| if let Some(timestamp) = self.timeline.get_latest_commit_timestamp_as_option() { |
| let filters = from_str_tuples(filters)?; |
| self.get_file_slices_internal(timestamp, &filters).await |
| } else { |
| Ok(Vec::new()) |
| } |
| } |
| |
| /// Same as [Table::get_file_slices], but blocking. |
| pub fn get_file_slices_blocking<I, S>(&self, filters: I) -> Result<Vec<FileSlice>> |
| where |
| I: IntoIterator<Item = (S, S, S)>, |
| S: AsRef<str>, |
| { |
| tokio::runtime::Builder::new_current_thread() |
| .enable_all() |
| .build()? |
| .block_on(async { self.get_file_slices(filters).await }) |
| } |
| |
| /// Get all the [FileSlice]s in the table at a given timestamp. |
| /// |
| /// # Arguments |
| /// * `timestamp` - The timestamp which file slices associated with. |
| /// * `filters` - Partition filters to apply. |
| /// |
| /// # Notes |
| /// * This API is useful for implementing time travel query. |
| pub async fn get_file_slices_as_of<I, S>( |
| &self, |
| timestamp: &str, |
| filters: I, |
| ) -> Result<Vec<FileSlice>> |
| where |
| I: IntoIterator<Item = (S, S, S)>, |
| S: AsRef<str>, |
| { |
| let timestamp = format_timestamp(timestamp, &self.timezone())?; |
| let filters = from_str_tuples(filters)?; |
| self.get_file_slices_internal(×tamp, &filters).await |
| } |
| |
| /// Same as [Table::get_file_slices_as_of], but blocking. |
| pub fn get_file_slices_as_of_blocking<I, S>( |
| &self, |
| timestamp: &str, |
| filters: I, |
| ) -> Result<Vec<FileSlice>> |
| where |
| I: IntoIterator<Item = (S, S, S)>, |
| S: AsRef<str>, |
| { |
| tokio::runtime::Builder::new_current_thread() |
| .enable_all() |
| .build()? |
| .block_on(async { self.get_file_slices_as_of(timestamp, filters).await }) |
| } |
| |
| async fn get_file_slices_internal( |
| &self, |
| timestamp: &str, |
| filters: &[Filter], |
| ) -> Result<Vec<FileSlice>> { |
| let excludes = self |
| .timeline |
| .get_replaced_file_groups_as_of(timestamp) |
| .await?; |
| let partition_schema = self.get_partition_schema().await?; |
| let partition_pruner = |
| PartitionPruner::new(filters, &partition_schema, self.hudi_configs.as_ref())?; |
| self.file_system_view |
| .get_file_slices_as_of(timestamp, &partition_pruner, &excludes) |
| .await |
| } |
| |
| /// Get all the changed [FileSlice]s in the table between the given timestamps. |
| /// |
| /// # Arguments |
| /// * `start_timestamp` - If provided, only file slices that were changed after this timestamp will be returned. |
| /// * `end_timestamp` - If provided, only file slices that were changed before or at this timestamp will be returned. |
| /// |
| /// # Notes |
| /// * This API is useful for implementing incremental query. |
| pub async fn get_file_slices_between( |
| &self, |
| start_timestamp: Option<&str>, |
| end_timestamp: Option<&str>, |
| ) -> Result<Vec<FileSlice>> { |
| // If the end timestamp is not provided, use the latest commit timestamp. |
| let Some(end) = |
| end_timestamp.or_else(|| self.timeline.get_latest_commit_timestamp_as_option()) |
| else { |
| // No latest commit timestamp means the table is empty. |
| return Ok(Vec::new()); |
| }; |
| |
| let start = start_timestamp.unwrap_or(EARLIEST_START_TIMESTAMP); |
| |
| self.get_file_slices_between_internal(start, end).await |
| } |
| |
| /// Same as [Table::get_file_slices_between], but blocking. |
| pub fn get_file_slices_between_blocking( |
| &self, |
| start_timestamp: Option<&str>, |
| end_timestamp: Option<&str>, |
| ) -> Result<Vec<FileSlice>> { |
| tokio::runtime::Builder::new_current_thread() |
| .enable_all() |
| .build()? |
| .block_on(async { |
| self.get_file_slices_between(start_timestamp, end_timestamp) |
| .await |
| }) |
| } |
| |
| /// Get all the changed [FileSlice]s in splits from the table between the given timestamps. |
| /// |
| /// # Arguments |
| /// * `num_splits` - The number of chunks to split the file slices into. |
| /// * `start_timestamp` - If provided, only file slices that were changed after this timestamp will be returned. |
| /// * `end_timestamp` - If provided, only file slices that were changed before or at this timestamp will be returned. |
| /// |
| /// # Notes |
| /// * This API is useful for implementing incremental query with read parallelism. |
| /// * Uses the same splitting flow as the time-travel API to respect read parallelism config. |
| pub async fn get_file_slices_splits_between( |
| &self, |
| num_splits: usize, |
| start_timestamp: Option<&str>, |
| end_timestamp: Option<&str>, |
| ) -> Result<Vec<Vec<FileSlice>>> { |
| // If the end timestamp is not provided, use the latest commit timestamp. |
| let Some(end) = |
| end_timestamp.or_else(|| self.timeline.get_latest_commit_timestamp_as_option()) |
| else { |
| // No latest commit timestamp means the table is empty. |
| return Ok(Vec::new()); |
| }; |
| |
| let start = start_timestamp.unwrap_or(EARLIEST_START_TIMESTAMP); |
| |
| self.get_file_slices_splits_between_internal(num_splits, start, end) |
| .await |
| } |
| |
| /// Same as [Table::get_file_slices_splits_between], but blocking. |
| pub fn get_file_slices_splits_between_blocking( |
| &self, |
| num_splits: usize, |
| start_timestamp: Option<&str>, |
| end_timestamp: Option<&str>, |
| ) -> Result<Vec<Vec<FileSlice>>> { |
| tokio::runtime::Builder::new_current_thread() |
| .enable_all() |
| .build()? |
| .block_on(async { |
| self.get_file_slices_splits_between(num_splits, start_timestamp, end_timestamp) |
| .await |
| }) |
| } |
| |
| async fn get_file_slices_splits_between_internal( |
| &self, |
| num_splits: usize, |
| start_timestamp: &str, |
| end_timestamp: &str, |
| ) -> Result<Vec<Vec<FileSlice>>> { |
| let file_slices = self |
| .get_file_slices_between_internal(start_timestamp, end_timestamp) |
| .await?; |
| Ok(split_into_chunks(file_slices, num_splits)) |
| } |
| |
| async fn get_file_slices_between_internal( |
| &self, |
| start_timestamp: &str, |
| end_timestamp: &str, |
| ) -> Result<Vec<FileSlice>> { |
| let mut file_slices: Vec<FileSlice> = Vec::new(); |
| let file_groups = self |
| .timeline |
| .get_file_groups_between(Some(start_timestamp), Some(end_timestamp)) |
| .await?; |
| for file_group in file_groups { |
| if let Some(file_slice) = file_group.get_file_slice_as_of(end_timestamp) { |
| file_slices.push(file_slice.clone()); |
| } |
| } |
| |
| Ok(file_slices) |
| } |
| |
| /// Create a [FileGroupReader] using the [Table]'s Hudi configs, and overwriting options. |
| pub fn create_file_group_reader_with_options<I, K, V>( |
| &self, |
| options: I, |
| ) -> Result<FileGroupReader> |
| where |
| I: IntoIterator<Item = (K, V)>, |
| K: AsRef<str>, |
| V: Into<String>, |
| { |
| let mut overwriting_options = HashMap::with_capacity(self.storage_options.len()); |
| for (k, v) in self.storage_options.iter() { |
| overwriting_options.insert(k.clone(), v.clone()); |
| } |
| for (k, v) in options { |
| overwriting_options.insert(k.as_ref().to_string(), v.into()); |
| } |
| FileGroupReader::new_with_configs_and_overwriting_options( |
| self.hudi_configs.clone(), |
| overwriting_options, |
| ) |
| } |
| |
| /// Get all the latest records in the table. |
| /// |
| /// # Arguments |
| /// * `filters` - Partition filters to apply. |
| pub async fn read_snapshot<I, S>(&self, filters: I) -> Result<Vec<RecordBatch>> |
| where |
| I: IntoIterator<Item = (S, S, S)>, |
| S: AsRef<str>, |
| { |
| if let Some(timestamp) = self.timeline.get_latest_commit_timestamp_as_option() { |
| let filters = from_str_tuples(filters)?; |
| self.read_snapshot_internal(timestamp, &filters).await |
| } else { |
| Ok(Vec::new()) |
| } |
| } |
| |
| /// Same as [Table::read_snapshot], but blocking. |
| pub fn read_snapshot_blocking<I, S>(&self, filters: I) -> Result<Vec<RecordBatch>> |
| where |
| I: IntoIterator<Item = (S, S, S)>, |
| S: AsRef<str>, |
| { |
| tokio::runtime::Builder::new_current_thread() |
| .enable_all() |
| .build()? |
| .block_on(async { self.read_snapshot(filters).await }) |
| } |
| |
| /// Get all the records in the table at a given timestamp. |
| /// |
| /// # Arguments |
| /// * `timestamp` - The timestamp which records associated with. |
| /// * `filters` - Partition filters to apply. |
| pub async fn read_snapshot_as_of<I, S>( |
| &self, |
| timestamp: &str, |
| filters: I, |
| ) -> Result<Vec<RecordBatch>> |
| where |
| I: IntoIterator<Item = (S, S, S)>, |
| S: AsRef<str>, |
| { |
| let timestamp = format_timestamp(timestamp, &self.timezone())?; |
| let filters = from_str_tuples(filters)?; |
| self.read_snapshot_internal(×tamp, &filters).await |
| } |
| |
| /// Same as [Table::read_snapshot_as_of], but blocking. |
| pub fn read_snapshot_as_of_blocking<I, S>( |
| &self, |
| timestamp: &str, |
| filters: I, |
| ) -> Result<Vec<RecordBatch>> |
| where |
| I: IntoIterator<Item = (S, S, S)>, |
| S: AsRef<str>, |
| { |
| tokio::runtime::Builder::new_current_thread() |
| .enable_all() |
| .build()? |
| .block_on(async { self.read_snapshot_as_of(timestamp, filters).await }) |
| } |
| |
| async fn read_snapshot_internal( |
| &self, |
| timestamp: &str, |
| filters: &[Filter], |
| ) -> Result<Vec<RecordBatch>> { |
| let file_slices = self.get_file_slices_internal(timestamp, filters).await?; |
| let fg_reader = self.create_file_group_reader_with_options([( |
| HudiReadConfig::FileGroupEndTimestamp, |
| timestamp, |
| )])?; |
| let batches = |
| futures::future::try_join_all(file_slices.iter().map(|f| fg_reader.read_file_slice(f))) |
| .await?; |
| Ok(batches) |
| } |
| |
| /// Get records that were inserted or updated between the given timestamps. |
| /// |
| /// Records that were updated multiple times should have their latest states within |
| /// the time span being returned. |
| /// |
| /// # Arguments |
| /// * `start_timestamp` - Only records that were inserted or updated after this timestamp will be returned. |
| /// * `end_timestamp` - If provided, only records that were inserted or updated before or at this timestamp will be returned. |
| pub async fn read_incremental_records( |
| &self, |
| start_timestamp: &str, |
| end_timestamp: Option<&str>, |
| ) -> Result<Vec<RecordBatch>> { |
| // If the end timestamp is not provided, use the latest commit timestamp. |
| let Some(end_timestamp) = |
| end_timestamp.or_else(|| self.timeline.get_latest_commit_timestamp_as_option()) |
| else { |
| return Ok(Vec::new()); |
| }; |
| |
| let timezone = self.timezone(); |
| let start_timestamp = format_timestamp(start_timestamp, &timezone)?; |
| let end_timestamp = format_timestamp(end_timestamp, &timezone)?; |
| |
| let file_slices = self |
| .get_file_slices_between_internal(&start_timestamp, &end_timestamp) |
| .await?; |
| |
| let fg_reader = self.create_file_group_reader_with_options([ |
| (HudiReadConfig::FileGroupStartTimestamp, start_timestamp), |
| (HudiReadConfig::FileGroupEndTimestamp, end_timestamp), |
| ])?; |
| |
| let batches = |
| futures::future::try_join_all(file_slices.iter().map(|f| fg_reader.read_file_slice(f))) |
| .await?; |
| Ok(batches) |
| } |
| |
| /// Same as [Table::read_incremental_records], but blocking. |
| pub fn read_incremental_records_blocking( |
| &self, |
| start_timestamp: &str, |
| end_timestamp: Option<&str>, |
| ) -> Result<Vec<RecordBatch>> { |
| tokio::runtime::Builder::new_current_thread() |
| .enable_all() |
| .build()? |
| .block_on(async { |
| self.read_incremental_records(start_timestamp, end_timestamp) |
| .await |
| }) |
| } |
| |
| /// Get the change-data-capture (CDC) records between the given timestamps. |
| /// |
| /// The CDC records should reflect the records that were inserted, updated, and deleted |
| /// between the timestamps. |
| #[allow(dead_code)] |
| async fn read_incremental_changes( |
| &self, |
| _start_timestamp: &str, |
| _end_timestamp: Option<&str>, |
| ) -> Result<Vec<RecordBatch>> { |
| todo!("read_incremental_changes") |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| use crate::config::table::HudiTableConfig::{ |
| BaseFileFormat, Checksum, DatabaseName, DropsPartitionFields, IsHiveStylePartitioning, |
| IsPartitionPathUrlencoded, KeyGeneratorClass, PartitionFields, PopulatesMetaFields, |
| PrecombineField, RecordKeyFields, TableName, TableType, TableVersion, |
| TimelineLayoutVersion, TimelineTimezone, |
| }; |
| use crate::config::util::{empty_filters, empty_options}; |
| use crate::config::HUDI_CONF_DIR; |
| use crate::error::CoreError; |
| use crate::metadata::meta_field::MetaField; |
| use crate::storage::util::join_url_segments; |
| use crate::storage::Storage; |
| use hudi_test::{assert_arrow_field_names_eq, assert_avro_field_names_eq, SampleTable}; |
| use std::collections::HashSet; |
| use std::fs::canonicalize; |
| use std::path::PathBuf; |
| use std::{env, panic}; |
| |
| /// Test helper to create a new `Table` instance without validating the configuration. |
| /// |
| /// # Arguments |
| /// |
| /// * `table_dir_name` - Name of the table root directory; all under `crates/core/tests/data/`. |
| fn get_test_table_without_validation(table_dir_name: &str) -> Table { |
| let base_url = Url::from_file_path( |
| canonicalize(PathBuf::from("tests").join("data").join(table_dir_name)).unwrap(), |
| ) |
| .unwrap(); |
| Table::new_with_options_blocking( |
| base_url.as_str(), |
| [("hoodie.internal.skip.config.validation", "true")], |
| ) |
| .unwrap() |
| } |
| |
| /// Test helper to get relative file paths from the table with filters. |
| fn get_file_paths_with_filters( |
| table: &Table, |
| filters: &[(&str, &str, &str)], |
| ) -> Result<Vec<String>> { |
| let mut file_paths = Vec::new(); |
| let base_url = table.base_url(); |
| for f in table.get_file_slices_blocking(filters.to_vec())? { |
| let relative_path = f.base_file_relative_path()?; |
| let file_url = join_url_segments(&base_url, &[relative_path.as_str()])?; |
| file_paths.push(file_url.to_string()); |
| } |
| Ok(file_paths) |
| } |
| |
| #[test] |
| fn test_hudi_table_get_hudi_options() { |
| let base_url = SampleTable::V6Nonpartitioned.url_to_cow(); |
| let hudi_table = Table::new_blocking(base_url.path()).unwrap(); |
| let hudi_options = hudi_table.hudi_options(); |
| for (k, v) in hudi_options.iter() { |
| assert!(k.starts_with("hoodie.")); |
| assert!(!v.is_empty()); |
| } |
| } |
| |
| #[test] |
| fn test_hudi_table_get_storage_options() { |
| let base_url = SampleTable::V6Nonpartitioned.url_to_cow(); |
| let hudi_table = Table::new_blocking(base_url.path()).unwrap(); |
| |
| let cloud_prefixes: HashSet<_> = Storage::CLOUD_STORAGE_PREFIXES |
| .iter() |
| .map(|prefix| prefix.to_lowercase()) |
| .collect(); |
| |
| for (key, value) in hudi_table.storage_options.iter() { |
| let key_lower = key.to_lowercase(); |
| assert!( |
| cloud_prefixes |
| .iter() |
| .any(|prefix| key_lower.starts_with(prefix)), |
| "Storage option key '{}' should start with a cloud storage prefix", |
| key |
| ); |
| assert!( |
| !value.is_empty(), |
| "Storage option value for key '{}' should not be empty", |
| key |
| ); |
| } |
| } |
| |
| #[test] |
| fn hudi_table_get_schema_from_empty_table_without_create_schema() { |
| let table = get_test_table_without_validation("table_props_no_create_schema"); |
| |
| let schema = table.get_schema_blocking(); |
| assert!(schema.is_err()); |
| assert!(matches!(schema.unwrap_err(), CoreError::SchemaNotFound(_))); |
| |
| let schema = table.get_avro_schema_blocking(); |
| assert!(schema.is_err()); |
| assert!(matches!(schema.unwrap_err(), CoreError::SchemaNotFound(_))); |
| } |
| |
| #[test] |
| fn hudi_table_get_schema_from_empty_table_resolves_to_table_create_schema() { |
| for base_url in SampleTable::V6Empty.urls() { |
| let hudi_table = Table::new_blocking(base_url.path()).unwrap(); |
| |
| // Validate the Arrow schema |
| let schema = hudi_table.get_schema_blocking(); |
| assert!(schema.is_ok()); |
| let schema = schema.unwrap(); |
| assert_arrow_field_names_eq!( |
| schema, |
| [MetaField::field_names(), vec!["id", "name", "isActive"]].concat() |
| ); |
| |
| // Validate the Avro schema |
| let avro_schema = hudi_table.get_avro_schema_blocking(); |
| assert!(avro_schema.is_ok()); |
| let avro_schema = avro_schema.unwrap(); |
| assert_avro_field_names_eq!(&avro_schema, ["id", "name", "isActive"]) |
| } |
| } |
| |
| #[test] |
| fn hudi_table_get_schema() { |
| let base_url = SampleTable::V6Nonpartitioned.url_to_cow(); |
| let hudi_table = Table::new_blocking(base_url.path()).unwrap(); |
| let original_field_names = [ |
| "id", |
| "name", |
| "isActive", |
| "byteField", |
| "shortField", |
| "intField", |
| "longField", |
| "floatField", |
| "doubleField", |
| "decimalField", |
| "dateField", |
| "timestampField", |
| "binaryField", |
| "arrayField", |
| "mapField", |
| "structField", |
| ]; |
| |
| // Check Arrow schema |
| let arrow_schema = hudi_table.get_schema_blocking(); |
| assert!(arrow_schema.is_ok()); |
| let arrow_schema = arrow_schema.unwrap(); |
| assert_arrow_field_names_eq!( |
| arrow_schema, |
| [MetaField::field_names(), original_field_names.to_vec()].concat() |
| ); |
| |
| // Check Avro schema |
| let avro_schema = hudi_table.get_avro_schema_blocking(); |
| assert!(avro_schema.is_ok()); |
| let avro_schema = avro_schema.unwrap(); |
| assert_avro_field_names_eq!(&avro_schema, original_field_names); |
| } |
| |
| #[test] |
| fn hudi_table_get_partition_schema() { |
| let base_url = SampleTable::V6TimebasedkeygenNonhivestyle.url_to_cow(); |
| let hudi_table = Table::new_blocking(base_url.path()).unwrap(); |
| let schema = hudi_table.get_partition_schema_blocking(); |
| assert!(schema.is_ok()); |
| let schema = schema.unwrap(); |
| assert_arrow_field_names_eq!(schema, ["ts_str"]); |
| } |
| |
| #[test] |
| fn validate_invalid_table_props() { |
| let table = get_test_table_without_validation("table_props_invalid"); |
| let configs = table.hudi_configs; |
| assert!( |
| configs.validate(BaseFileFormat).is_err(), |
| "required config is missing" |
| ); |
| assert!(configs.validate(Checksum).is_err()); |
| assert!( |
| configs.validate(DatabaseName).is_ok(), |
| "non-required config is missing" |
| ); |
| assert!(configs.validate(DropsPartitionFields).is_err()); |
| assert!(configs.validate(IsHiveStylePartitioning).is_err()); |
| assert!(configs.validate(IsPartitionPathUrlencoded).is_err()); |
| assert!( |
| configs.validate(KeyGeneratorClass).is_ok(), |
| "non-required config is missing" |
| ); |
| assert!( |
| configs.validate(PartitionFields).is_ok(), |
| "non-required config is missing" |
| ); |
| assert!( |
| configs.validate(PrecombineField).is_ok(), |
| "non-required config is missing" |
| ); |
| assert!( |
| configs.validate(PopulatesMetaFields).is_ok(), |
| "non-required config is missing" |
| ); |
| assert!( |
| configs.validate(RecordKeyFields).is_ok(), |
| "non-required config is missing" |
| ); |
| assert!( |
| configs.validate(TableName).is_err(), |
| "required config is missing" |
| ); |
| assert!( |
| configs.validate(TableType).is_ok(), |
| "Valid table type value" |
| ); |
| assert!(configs.validate(TableVersion).is_err()); |
| assert!(configs.validate(TimelineLayoutVersion).is_err()); |
| assert!( |
| configs.validate(TimelineTimezone).is_ok(), |
| "non-required config is missing" |
| ); |
| } |
| |
| #[test] |
| fn get_invalid_table_props() { |
| let table = get_test_table_without_validation("table_props_invalid"); |
| let configs = table.hudi_configs; |
| assert!(configs.get(BaseFileFormat).is_err()); |
| assert!(configs.get(Checksum).is_err()); |
| assert!(configs.get(DatabaseName).is_err()); |
| assert!(configs.get(DropsPartitionFields).is_err()); |
| assert!(configs.get(IsHiveStylePartitioning).is_err()); |
| assert!(configs.get(IsPartitionPathUrlencoded).is_err()); |
| assert!(configs.get(KeyGeneratorClass).is_err()); |
| assert!(configs.get(PartitionFields).is_err()); |
| assert!(configs.get(PrecombineField).is_err()); |
| assert!(configs.get(PopulatesMetaFields).is_err()); |
| assert!(configs.get(RecordKeyFields).is_err()); |
| assert!(configs.get(TableName).is_err()); |
| assert!(configs.get(TableType).is_ok(), "Valid table type value"); |
| assert!(configs.get(TableVersion).is_err()); |
| assert!(configs.get(TimelineLayoutVersion).is_err()); |
| assert!(configs.get(TimelineTimezone).is_err()); |
| } |
| |
| #[test] |
| fn get_default_for_invalid_table_props() { |
| let table = get_test_table_without_validation("table_props_invalid"); |
| let configs = table.hudi_configs; |
| let actual: String = configs.get_or_default(BaseFileFormat).into(); |
| assert_eq!(actual, "parquet"); |
| assert!(panic::catch_unwind(|| configs.get_or_default(Checksum)).is_err()); |
| let actual: String = configs.get_or_default(DatabaseName).into(); |
| assert_eq!(actual, "default"); |
| let actual: bool = configs.get_or_default(DropsPartitionFields).into(); |
| assert!(!actual); |
| assert!(panic::catch_unwind(|| configs.get_or_default(IsHiveStylePartitioning)).is_err()); |
| assert!(panic::catch_unwind(|| configs.get_or_default(IsPartitionPathUrlencoded)).is_err()); |
| assert!(panic::catch_unwind(|| configs.get_or_default(KeyGeneratorClass)).is_err()); |
| let actual: Vec<String> = configs.get_or_default(PartitionFields).into(); |
| assert!(actual.is_empty()); |
| assert!(panic::catch_unwind(|| configs.get_or_default(PrecombineField)).is_err()); |
| let actual: bool = configs.get_or_default(PopulatesMetaFields).into(); |
| assert!(actual); |
| assert!(panic::catch_unwind(|| configs.get_or_default(RecordKeyFields)).is_err()); |
| assert!(panic::catch_unwind(|| configs.get_or_default(TableName)).is_err()); |
| let actual: String = configs.get_or_default(TableType).into(); |
| assert_eq!(actual, "COPY_ON_WRITE"); |
| assert!(panic::catch_unwind(|| configs.get_or_default(TableVersion)).is_err()); |
| assert!(panic::catch_unwind(|| configs.get_or_default(TimelineLayoutVersion)).is_err()); |
| let actual: String = configs.get_or_default(TimelineTimezone).into(); |
| assert_eq!(actual, "utc"); |
| } |
| |
| #[test] |
| fn get_valid_table_props() { |
| let table = get_test_table_without_validation("table_props_valid"); |
| let configs = table.hudi_configs; |
| let actual: String = configs.get(BaseFileFormat).unwrap().into(); |
| assert_eq!(actual, "parquet"); |
| let actual: isize = configs.get(Checksum).unwrap().into(); |
| assert_eq!(actual, 3761586722); |
| let actual: String = configs.get(DatabaseName).unwrap().into(); |
| assert_eq!(actual, "db"); |
| let actual: bool = configs.get(DropsPartitionFields).unwrap().into(); |
| assert!(!actual); |
| let actual: bool = configs.get(IsHiveStylePartitioning).unwrap().into(); |
| assert!(!actual); |
| let actual: bool = configs.get(IsPartitionPathUrlencoded).unwrap().into(); |
| assert!(!actual); |
| let actual: String = configs.get(KeyGeneratorClass).unwrap().into(); |
| assert_eq!(actual, "org.apache.hudi.keygen.SimpleKeyGenerator"); |
| let actual: Vec<String> = configs.get(PartitionFields).unwrap().into(); |
| assert_eq!(actual, vec!["city"]); |
| let actual: String = configs.get(PrecombineField).unwrap().into(); |
| assert_eq!(actual, "ts"); |
| let actual: bool = configs.get(PopulatesMetaFields).unwrap().into(); |
| assert!(actual); |
| let actual: Vec<String> = configs.get(RecordKeyFields).unwrap().into(); |
| assert_eq!(actual, vec!["uuid"]); |
| let actual: String = configs.get(TableName).unwrap().into(); |
| assert_eq!(actual, "trips"); |
| let actual: String = configs.get(TableType).unwrap().into(); |
| assert_eq!(actual, "COPY_ON_WRITE"); |
| let actual: isize = configs.get(TableVersion).unwrap().into(); |
| assert_eq!(actual, 6); |
| let actual: isize = configs.get(TimelineLayoutVersion).unwrap().into(); |
| assert_eq!(actual, 1); |
| let actual: String = configs.get(TimelineTimezone).unwrap().into(); |
| assert_eq!(actual, "local"); |
| } |
| |
| #[test] |
| fn get_global_table_props() { |
| // Without the environment variable HUDI_CONF_DIR |
| let table = get_test_table_without_validation("table_props_partial"); |
| let configs = table.hudi_configs; |
| assert!(configs.get(DatabaseName).is_err()); |
| assert!(configs.get(TableType).is_err()); |
| let actual: String = configs.get(TableName).unwrap().into(); |
| assert_eq!(actual, "trips"); |
| |
| // Environment variable HUDI_CONF_DIR points to nothing |
| let base_path = env::current_dir().unwrap(); |
| let hudi_conf_dir = base_path.join("random/wrong/dir"); |
| env::set_var(HUDI_CONF_DIR, hudi_conf_dir.as_os_str()); |
| let table = get_test_table_without_validation("table_props_partial"); |
| let configs = table.hudi_configs; |
| assert!(configs.get(DatabaseName).is_err()); |
| assert!(configs.get(TableType).is_err()); |
| let actual: String = configs.get(TableName).unwrap().into(); |
| assert_eq!(actual, "trips"); |
| |
| // With global config |
| let base_path = env::current_dir().unwrap(); |
| let hudi_conf_dir = base_path.join("tests/data/hudi_conf_dir"); |
| env::set_var(HUDI_CONF_DIR, hudi_conf_dir.as_os_str()); |
| let table = get_test_table_without_validation("table_props_partial"); |
| let configs = table.hudi_configs; |
| let actual: String = configs.get(DatabaseName).unwrap().into(); |
| assert_eq!(actual, "tmpdb"); |
| let actual: String = configs.get(TableType).unwrap().into(); |
| assert_eq!(actual, "MERGE_ON_READ"); |
| let actual: String = configs.get(TableName).unwrap().into(); |
| assert_eq!(actual, "trips"); |
| env::remove_var(HUDI_CONF_DIR) |
| } |
| |
| #[test] |
| fn hudi_table_read_file_slice() { |
| let base_url = SampleTable::V6Nonpartitioned.url_to_cow(); |
| let hudi_table = Table::new_blocking(base_url.path()).unwrap(); |
| let batches = hudi_table |
| .create_file_group_reader_with_options(empty_options()) |
| .unwrap() |
| .read_file_slice_by_base_file_path_blocking( |
| "a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet", |
| ) |
| .unwrap(); |
| assert_eq!(batches.num_rows(), 4); |
| assert_eq!(batches.num_columns(), 21); |
| } |
| |
| #[test] |
| fn empty_hudi_table_get_file_slices_splits() { |
| let base_url = SampleTable::V6Empty.url_to_cow(); |
| |
| let hudi_table = Table::new_blocking(base_url.path()).unwrap(); |
| let file_slices_splits = hudi_table |
| .get_file_slices_splits_blocking(2, empty_filters()) |
| .unwrap(); |
| assert!(file_slices_splits.is_empty()); |
| } |
| |
| #[test] |
| fn hudi_table_get_file_slices_splits() { |
| let base_url = SampleTable::V6SimplekeygenNonhivestyle.url_to_cow(); |
| |
| let hudi_table = Table::new_blocking(base_url.path()).unwrap(); |
| let file_slices_splits = hudi_table |
| .get_file_slices_splits_blocking(2, empty_filters()) |
| .unwrap(); |
| assert_eq!(file_slices_splits.len(), 2); |
| assert_eq!(file_slices_splits[0].len(), 2); |
| assert_eq!(file_slices_splits[1].len(), 1); |
| } |
| |
| #[test] |
| fn hudi_table_get_file_slices_splits_as_of_timestamps() { |
| let base_url = SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_mor_parquet(); |
| let hudi_table = Table::new_blocking(base_url.path()).unwrap(); |
| |
| // before replacecommit (insert overwrite table) |
| let second_latest_timestamp = "20250121000656060"; |
| let file_slices_splits = hudi_table |
| .get_file_slices_splits_as_of_blocking(2, second_latest_timestamp, empty_filters()) |
| .unwrap(); |
| assert_eq!(file_slices_splits.len(), 2); |
| assert_eq!(file_slices_splits[0].len(), 2); |
| assert_eq!(file_slices_splits[1].len(), 1); |
| let file_slices = file_slices_splits |
| .iter() |
| .flatten() |
| .filter(|f| f.partition_path == "10") |
| .collect::<Vec<_>>(); |
| assert_eq!( |
| file_slices.len(), |
| 1, |
| "Partition 10 should have 1 file slice" |
| ); |
| let file_slice = file_slices[0]; |
| assert_eq!( |
| file_slice.base_file.file_name(), |
| "92e64357-e4d1-4639-a9d3-c3535829d0aa-0_1-53-79_20250121000647668.parquet" |
| ); |
| assert_eq!( |
| file_slice.log_files.len(), |
| 1, |
| "File slice should have 1 log file" |
| ); |
| assert_eq!( |
| file_slice.log_files.iter().next().unwrap().file_name(), |
| ".92e64357-e4d1-4639-a9d3-c3535829d0aa-0_20250121000647668.log.1_0-73-101" |
| ); |
| |
| // as of replacecommit (insert overwrite table) |
| let latest_timestamp = "20250121000702475"; |
| let file_slices_splits = hudi_table |
| .get_file_slices_splits_as_of_blocking(2, latest_timestamp, empty_filters()) |
| .unwrap(); |
| assert_eq!(file_slices_splits.len(), 1); |
| assert_eq!(file_slices_splits[0].len(), 1); |
| } |
| |
| #[test] |
| fn hudi_table_get_file_slices_as_of_timestamps() { |
| let base_url = SampleTable::V6Nonpartitioned.url_to_cow(); |
| |
| let hudi_table = Table::new_blocking(base_url.path()).unwrap(); |
| let file_slices = hudi_table |
| .get_file_slices_blocking(empty_filters()) |
| .unwrap(); |
| assert_eq!( |
| file_slices |
| .iter() |
| .map(|f| f.base_file_relative_path().unwrap()) |
| .collect::<Vec<_>>(), |
| vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",] |
| ); |
| |
| // as of the latest timestamp |
| let hudi_table = Table::new_blocking(base_url.path()).unwrap(); |
| let file_slices = hudi_table |
| .get_file_slices_as_of_blocking("20240418173551906", empty_filters()) |
| .unwrap(); |
| assert_eq!( |
| file_slices |
| .iter() |
| .map(|f| f.base_file_relative_path().unwrap()) |
| .collect::<Vec<_>>(), |
| vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",] |
| ); |
| |
| // as of just smaller than the latest timestamp |
| let hudi_table = |
| Table::new_with_options_blocking(base_url.path(), empty_options()).unwrap(); |
| let file_slices = hudi_table |
| .get_file_slices_as_of_blocking("20240418173551905", empty_filters()) |
| .unwrap(); |
| assert_eq!( |
| file_slices |
| .iter() |
| .map(|f| f.base_file_relative_path().unwrap()) |
| .collect::<Vec<_>>(), |
| vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-182-253_20240418173550988.parquet",] |
| ); |
| |
| // as of non-exist old timestamp |
| let hudi_table = |
| Table::new_with_options_blocking(base_url.path(), empty_options()).unwrap(); |
| let file_slices = hudi_table |
| .get_file_slices_as_of_blocking("19700101000000", empty_filters()) |
| .unwrap(); |
| assert_eq!( |
| file_slices |
| .iter() |
| .map(|f| f.base_file_relative_path().unwrap()) |
| .collect::<Vec<_>>(), |
| Vec::<String>::new() |
| ); |
| } |
| |
| #[test] |
| fn empty_hudi_table_get_file_slices_between_timestamps() { |
| let base_url = SampleTable::V6Empty.url_to_cow(); |
| let hudi_table = Table::new_blocking(base_url.path()).unwrap(); |
| let file_slices = hudi_table |
| .get_file_slices_between_blocking(Some(EARLIEST_START_TIMESTAMP), None) |
| .unwrap(); |
| assert!(file_slices.is_empty()) |
| } |
| |
| #[test] |
| fn hudi_table_get_file_slices_between_timestamps() { |
| let base_url = SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_mor_parquet(); |
| let hudi_table = Table::new_blocking(base_url.path()).unwrap(); |
| let mut file_slices = hudi_table |
| .get_file_slices_between_blocking(None, Some("20250121000656060")) |
| .unwrap(); |
| assert_eq!(file_slices.len(), 3); |
| |
| file_slices.sort_unstable_by_key(|f| f.partition_path.clone()); |
| |
| let file_slice_0 = &file_slices[0]; |
| assert_eq!(file_slice_0.partition_path, "10"); |
| assert_eq!( |
| file_slice_0.file_id(), |
| "92e64357-e4d1-4639-a9d3-c3535829d0aa-0" |
| ); |
| assert_eq!(file_slice_0.log_files.len(), 1); |
| |
| let file_slice_1 = &file_slices[1]; |
| assert_eq!(file_slice_1.partition_path, "20"); |
| assert_eq!( |
| file_slice_1.file_id(), |
| "d49ae379-4f20-4549-8e23-a5f9604412c0-0" |
| ); |
| assert!(file_slice_1.log_files.is_empty()); |
| |
| let file_slice_2 = &file_slices[2]; |
| assert_eq!(file_slice_2.partition_path, "30"); |
| assert_eq!( |
| file_slice_2.file_id(), |
| "de3550df-e12c-4591-9335-92ff992258a2-0" |
| ); |
| assert!(file_slice_2.log_files.is_empty()); |
| } |
| |
| #[test] |
| fn empty_hudi_table_get_file_slices_splits_between() { |
| let base_url = SampleTable::V6Empty.url_to_cow(); |
| let hudi_table = Table::new_blocking(base_url.path()).unwrap(); |
| let file_slices_splits = hudi_table |
| .get_file_slices_splits_between_blocking(2, Some(EARLIEST_START_TIMESTAMP), None) |
| .unwrap(); |
| assert!(file_slices_splits.is_empty()) |
| } |
| |
| #[test] |
| fn hudi_table_get_file_slices_splits_between() { |
| let base_url = SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_mor_parquet(); |
| let hudi_table = Table::new_blocking(base_url.path()).unwrap(); |
| let file_slices_splits = hudi_table |
| .get_file_slices_splits_between_blocking(2, None, Some("20250121000656060")) |
| .unwrap(); |
| |
| assert_eq!(file_slices_splits.len(), 2); |
| let total_file_slices: usize = file_slices_splits.iter().map(|split| split.len()).sum(); |
| assert_eq!(total_file_slices, 3); |
| assert_eq!(file_slices_splits[0].len(), 2); |
| assert_eq!(file_slices_splits[1].len(), 1); |
| } |
| |
| #[test] |
| fn hudi_table_get_file_slices_splits_between_with_single_split() { |
| let base_url = SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_mor_parquet(); |
| let hudi_table = Table::new_blocking(base_url.path()).unwrap(); |
| let file_slices_splits = hudi_table |
| .get_file_slices_splits_between_blocking(1, None, Some("20250121000656060")) |
| .unwrap(); |
| |
| // Should have 1 split with all 3 file slices |
| assert_eq!(file_slices_splits.len(), 1); |
| assert_eq!(file_slices_splits[0].len(), 3); |
| } |
| |
| #[test] |
| fn hudi_table_get_file_slices_splits_between_with_many_splits() { |
| let base_url = SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_mor_parquet(); |
| let hudi_table = Table::new_blocking(base_url.path()).unwrap(); |
| let file_slices_splits = hudi_table |
| .get_file_slices_splits_between_blocking(10, None, Some("20250121000656060")) |
| .unwrap(); |
| |
| assert_eq!(file_slices_splits.len(), 3); |
| for split in &file_slices_splits { |
| assert_eq!(split.len(), 1); |
| } |
| } |
| |
| #[test] |
| fn hudi_table_get_file_paths_for_simple_keygen_non_hive_style() { |
| let base_url = SampleTable::V6SimplekeygenNonhivestyle.url_to_cow(); |
| let hudi_table = Table::new_blocking(base_url.path()).unwrap(); |
| assert_eq!(hudi_table.timeline.completed_commits.len(), 2); |
| |
| let partition_filters = &[]; |
| let actual = get_file_paths_with_filters(&hudi_table, partition_filters) |
| .unwrap() |
| .into_iter() |
| .collect::<HashSet<_>>(); |
| let expected = [ |
| "10/97de74b1-2a8e-4bb7-874c-0a74e1f42a77-0_0-119-166_20240418172804498.parquet", |
| "20/76e0556b-390d-4249-b7ad-9059e2bc2cbd-0_0-98-141_20240418172802262.parquet", |
| "30/6db57019-98ee-480e-8eb1-fb3de48e1c24-0_1-119-167_20240418172804498.parquet", |
| ] |
| .map(|f| join_url_segments(&base_url, &[f]).unwrap().to_string()) |
| .into_iter() |
| .collect::<HashSet<_>>(); |
| assert_eq!(actual, expected); |
| |
| let filters = [("byteField", ">=", "10"), ("byteField", "<", "30")]; |
| let actual = get_file_paths_with_filters(&hudi_table, &filters) |
| .unwrap() |
| .into_iter() |
| .collect::<HashSet<_>>(); |
| let expected = [ |
| "10/97de74b1-2a8e-4bb7-874c-0a74e1f42a77-0_0-119-166_20240418172804498.parquet", |
| "20/76e0556b-390d-4249-b7ad-9059e2bc2cbd-0_0-98-141_20240418172802262.parquet", |
| ] |
| .map(|f| join_url_segments(&base_url, &[f]).unwrap().to_string()) |
| .into_iter() |
| .collect::<HashSet<_>>(); |
| assert_eq!(actual, expected); |
| |
| let actual = get_file_paths_with_filters(&hudi_table, &[("byteField", ">", "30")]) |
| .unwrap() |
| .into_iter() |
| .collect::<HashSet<_>>(); |
| let expected = HashSet::new(); |
| assert_eq!(actual, expected); |
| } |
| |
| #[test] |
| fn hudi_table_get_file_paths_for_complex_keygen_hive_style() { |
| let base_url = SampleTable::V6ComplexkeygenHivestyle.url_to_cow(); |
| let hudi_table = Table::new_blocking(base_url.path()).unwrap(); |
| assert_eq!(hudi_table.timeline.completed_commits.len(), 2); |
| |
| let partition_filters = &[]; |
| let actual = get_file_paths_with_filters(&hudi_table, partition_filters) |
| .unwrap() |
| .into_iter() |
| .collect::<HashSet<_>>(); |
| let expected= [ |
| "byteField=10/shortField=300/a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet", |
| "byteField=20/shortField=100/bb7c3a45-387f-490d-aab2-981c3f1a8ada-0_0-140-198_20240418173213674.parquet", |
| "byteField=30/shortField=100/4668e35e-bff8-4be9-9ff2-e7fb17ecb1a7-0_1-161-224_20240418173235694.parquet", |
| ] |
| .map(|f| { join_url_segments(&base_url, &[f]).unwrap().to_string() }) |
| .into_iter() |
| .collect::<HashSet<_>>(); |
| assert_eq!(actual, expected); |
| |
| let filters = [ |
| ("byteField", ">=", "10"), |
| ("byteField", "<", "20"), |
| ("shortField", "!=", "100"), |
| ]; |
| let actual = get_file_paths_with_filters(&hudi_table, &filters) |
| .unwrap() |
| .into_iter() |
| .collect::<HashSet<_>>(); |
| let expected = [ |
| "byteField=10/shortField=300/a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet", |
| ] |
| .map(|f| { join_url_segments(&base_url, &[f]).unwrap().to_string() }) |
| .into_iter() |
| .collect::<HashSet<_>>(); |
| assert_eq!(actual, expected); |
| |
| let filters = [("byteField", ">=", "20"), ("shortField", "=", "300")]; |
| let actual = get_file_paths_with_filters(&hudi_table, &filters) |
| .unwrap() |
| .into_iter() |
| .collect::<HashSet<_>>(); |
| let expected = HashSet::new(); |
| assert_eq!(actual, expected); |
| } |
| |
| mod test_snapshot_and_time_travel_queries { |
| use super::super::*; |
| use crate::config::util::empty_filters; |
| use arrow::compute::concat_batches; |
| use hudi_test::{QuickstartTripsTable, SampleTable}; |
| |
| #[test] |
| fn test_empty() -> Result<()> { |
| for base_url in SampleTable::V6Empty.urls() { |
| let hudi_table = Table::new_blocking(base_url.path())?; |
| let records = hudi_table.read_snapshot_blocking(empty_filters())?; |
| assert!(records.is_empty()); |
| } |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_quickstart_trips_table_inserts_updates() -> Result<()> { |
| let base_url = QuickstartTripsTable::V6Trips8I1U.url_to_mor_avro(); |
| let hudi_table = Table::new_blocking(base_url.path())?; |
| |
| let updated_rider = "rider-D"; |
| |
| // verify updated record as of the latest commit |
| let records = hudi_table.read_snapshot_blocking(empty_filters())?; |
| let schema = &records[0].schema(); |
| let records = concat_batches(schema, &records)?; |
| let uuid_rider_and_fare = QuickstartTripsTable::uuid_rider_and_fare(&records) |
| .into_iter() |
| .filter(|(_, rider, _)| rider == updated_rider) |
| .collect::<Vec<_>>(); |
| assert_eq!(uuid_rider_and_fare.len(), 1); |
| assert_eq!( |
| uuid_rider_and_fare[0].0, |
| "9909a8b1-2d15-4d3d-8ec9-efc48c536a00" |
| ); |
| assert_eq!(uuid_rider_and_fare[0].2, 25.0); |
| |
| // verify updated record as of the first commit |
| let commit_timestamps = hudi_table |
| .timeline |
| .completed_commits |
| .iter() |
| .map(|i| i.timestamp.as_str()) |
| .collect::<Vec<_>>(); |
| let first_commit = commit_timestamps[0]; |
| let records = hudi_table.read_snapshot_as_of_blocking(first_commit, empty_filters())?; |
| let schema = &records[0].schema(); |
| let records = concat_batches(schema, &records)?; |
| let uuid_rider_and_fare = QuickstartTripsTable::uuid_rider_and_fare(&records) |
| .into_iter() |
| .filter(|(_, rider, _)| rider == updated_rider) |
| .collect::<Vec<_>>(); |
| assert_eq!(uuid_rider_and_fare.len(), 1); |
| assert_eq!( |
| uuid_rider_and_fare[0].0, |
| "9909a8b1-2d15-4d3d-8ec9-efc48c536a00" |
| ); |
| assert_eq!(uuid_rider_and_fare[0].2, 33.9); |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_quickstart_trips_table_inserts_deletes() -> Result<()> { |
| let base_url = QuickstartTripsTable::V6Trips8I3D.url_to_mor_avro(); |
| let hudi_table = Table::new_blocking(base_url.path())?; |
| |
| let deleted_riders = ["rider-A", "rider-C", "rider-D"]; |
| |
| // verify deleted record as of the latest commit |
| let records = hudi_table.read_snapshot_blocking(empty_filters())?; |
| let schema = &records[0].schema(); |
| let records = concat_batches(schema, &records)?; |
| let riders = QuickstartTripsTable::uuid_rider_and_fare(&records) |
| .into_iter() |
| .map(|(_, rider, _)| rider) |
| .collect::<Vec<_>>(); |
| assert!(riders |
| .iter() |
| .all(|rider| { !deleted_riders.contains(&rider.as_str()) })); |
| |
| // verify deleted record as of the first commit |
| let commit_timestamps = hudi_table |
| .timeline |
| .completed_commits |
| .iter() |
| .map(|i| i.timestamp.as_str()) |
| .collect::<Vec<_>>(); |
| let first_commit = commit_timestamps[0]; |
| let records = hudi_table.read_snapshot_as_of_blocking(first_commit, empty_filters())?; |
| let schema = &records[0].schema(); |
| let records = concat_batches(schema, &records)?; |
| let mut uuid_rider_and_fare = QuickstartTripsTable::uuid_rider_and_fare(&records) |
| .into_iter() |
| .filter(|(_, rider, _)| deleted_riders.contains(&rider.as_str())) |
| .collect::<Vec<_>>(); |
| uuid_rider_and_fare.sort_unstable_by_key(|(_, rider, _)| rider.to_string()); |
| assert_eq!(uuid_rider_and_fare.len(), 3); |
| assert_eq!(uuid_rider_and_fare[0].1, "rider-A"); |
| assert_eq!(uuid_rider_and_fare[0].2, 19.10); |
| assert_eq!(uuid_rider_and_fare[1].1, "rider-C"); |
| assert_eq!(uuid_rider_and_fare[1].2, 27.70); |
| assert_eq!(uuid_rider_and_fare[2].1, "rider-D"); |
| assert_eq!(uuid_rider_and_fare[2].2, 33.90); |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_non_partitioned() -> Result<()> { |
| for base_url in SampleTable::V6Nonpartitioned.urls() { |
| let hudi_table = Table::new_blocking(base_url.path())?; |
| let records = hudi_table.read_snapshot_blocking(empty_filters())?; |
| let schema = &records[0].schema(); |
| let records = concat_batches(schema, &records)?; |
| |
| let sample_data = SampleTable::sample_data_order_by_id(&records); |
| assert_eq!( |
| sample_data, |
| vec![ |
| (1, "Alice", false), |
| (2, "Bob", false), |
| (3, "Carol", true), |
| (4, "Diana", true), |
| ] |
| ); |
| } |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_non_partitioned_read_optimized() -> Result<()> { |
| let base_url = SampleTable::V6Nonpartitioned.url_to_mor_parquet(); |
| let hudi_table = Table::new_with_options_blocking( |
| base_url.path(), |
| [(HudiReadConfig::UseReadOptimizedMode.as_ref(), "true")], |
| )?; |
| let commit_timestamps = hudi_table |
| .timeline |
| .completed_commits |
| .iter() |
| .map(|i| i.timestamp.as_str()) |
| .collect::<Vec<_>>(); |
| let latest_commit = commit_timestamps.last().unwrap(); |
| let records = |
| hudi_table.read_snapshot_as_of_blocking(latest_commit, empty_filters())?; |
| let schema = &records[0].schema(); |
| let records = concat_batches(schema, &records)?; |
| |
| let sample_data = SampleTable::sample_data_order_by_id(&records); |
| assert_eq!( |
| sample_data, |
| vec![ |
| (1, "Alice", true), // this was updated to false in a log file and not to be read out |
| (2, "Bob", false), |
| (3, "Carol", true), |
| (4, "Diana", true), // this was inserted in a base file and should be read out |
| ] |
| ); |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_non_partitioned_rollback() -> Result<()> { |
| let base_url = SampleTable::V6NonpartitionedRollback.url_to_mor_parquet(); |
| let hudi_table = Table::new_blocking(base_url.path())?; |
| let records = hudi_table.read_snapshot_blocking(empty_filters())?; |
| let schema = &records[0].schema(); |
| let records = concat_batches(schema, &records)?; |
| |
| let sample_data = SampleTable::sample_data_order_by_id(&records); |
| assert_eq!( |
| sample_data, |
| vec![ |
| (1, "Alice", true), // this was updated to false then rolled back to true |
| (2, "Bob", true), // this was updated to true after rollback |
| (3, "Carol", true), |
| ] |
| ); |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_complex_keygen_hive_style_with_filters() -> Result<()> { |
| for base_url in SampleTable::V6ComplexkeygenHivestyle.urls() { |
| let hudi_table = Table::new_blocking(base_url.path())?; |
| |
| let filters = vec![ |
| ("byteField", ">=", "10"), |
| ("byteField", "<", "20"), |
| ("shortField", "!=", "100"), |
| ]; |
| let records = hudi_table.read_snapshot_blocking(filters)?; |
| let schema = &records[0].schema(); |
| let records = concat_batches(schema, &records)?; |
| |
| let sample_data = SampleTable::sample_data_order_by_id(&records); |
| assert_eq!(sample_data, vec![(1, "Alice", false), (3, "Carol", true),]); |
| } |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_simple_keygen_nonhivestyle_time_travel() -> Result<()> { |
| for base_url in SampleTable::V6SimplekeygenNonhivestyle.urls() { |
| let hudi_table = Table::new_blocking(base_url.path())?; |
| let commit_timestamps = hudi_table |
| .timeline |
| .completed_commits |
| .iter() |
| .map(|i| i.timestamp.as_str()) |
| .collect::<Vec<_>>(); |
| let first_commit = commit_timestamps[0]; |
| let records = |
| hudi_table.read_snapshot_as_of_blocking(first_commit, empty_filters())?; |
| let schema = &records[0].schema(); |
| let records = concat_batches(schema, &records)?; |
| |
| let sample_data = SampleTable::sample_data_order_by_id(&records); |
| assert_eq!( |
| sample_data, |
| vec![(1, "Alice", true), (2, "Bob", false), (3, "Carol", true),] |
| ); |
| } |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_simple_keygen_hivestyle_no_metafields() -> Result<()> { |
| for base_url in SampleTable::V6SimplekeygenHivestyleNoMetafields.urls() { |
| let hudi_table = Table::new_blocking(base_url.path())?; |
| let records = hudi_table.read_snapshot_blocking(empty_filters())?; |
| let schema = &records[0].schema(); |
| let records = concat_batches(schema, &records)?; |
| |
| let sample_data = SampleTable::sample_data_order_by_id(&records); |
| assert_eq!( |
| sample_data, |
| vec![ |
| (1, "Alice", false), |
| (2, "Bob", false), |
| (3, "Carol", true), |
| (4, "Diana", true), |
| ] |
| ) |
| } |
| Ok(()) |
| } |
| } |
| |
| mod test_incremental_queries { |
| use super::super::*; |
| use arrow_select::concat::concat_batches; |
| use hudi_test::SampleTable; |
| |
| #[test] |
| fn test_empty() -> Result<()> { |
| for base_url in SampleTable::V6Empty.urls() { |
| let hudi_table = Table::new_blocking(base_url.path())?; |
| let records = hudi_table.read_incremental_records_blocking("0", None)?; |
| assert!(records.is_empty()) |
| } |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_simplekeygen_nonhivestyle_overwritetable() -> Result<()> { |
| for base_url in SampleTable::V6SimplekeygenNonhivestyleOverwritetable.urls() { |
| let hudi_table = Table::new_blocking(base_url.path())?; |
| let commit_timestamps = hudi_table |
| .timeline |
| .completed_commits |
| .iter() |
| .map(|i| i.timestamp.as_str()) |
| .collect::<Vec<_>>(); |
| assert_eq!(commit_timestamps.len(), 3); |
| let first_commit = commit_timestamps[0]; |
| let second_commit = commit_timestamps[1]; |
| let third_commit = commit_timestamps[2]; |
| |
| // read records changed from the beginning to the 1st commit |
| let records = hudi_table |
| .read_incremental_records_blocking("19700101000000", Some(first_commit))?; |
| let schema = &records[0].schema(); |
| let records = concat_batches(schema, &records)?; |
| let sample_data = SampleTable::sample_data_order_by_id(&records); |
| assert_eq!( |
| sample_data, |
| vec![(1, "Alice", true), (2, "Bob", false), (3, "Carol", true),], |
| "Should return 3 records inserted in the 1st commit" |
| ); |
| |
| // read records changed from the 1st to the 2nd commit |
| let records = hudi_table |
| .read_incremental_records_blocking(first_commit, Some(second_commit))?; |
| let schema = &records[0].schema(); |
| let records = concat_batches(schema, &records)?; |
| let sample_data = SampleTable::sample_data_order_by_id(&records); |
| assert_eq!( |
| sample_data, |
| vec![(1, "Alice", false), (4, "Diana", true),], |
| "Should return 2 records inserted or updated in the 2nd commit" |
| ); |
| |
| // read records changed from the 2nd to the 3rd commit |
| let records = hudi_table |
| .read_incremental_records_blocking(second_commit, Some(third_commit))?; |
| let schema = &records[0].schema(); |
| let records = concat_batches(schema, &records)?; |
| let sample_data = SampleTable::sample_data_order_by_id(&records); |
| assert_eq!( |
| sample_data, |
| vec![(4, "Diana", false),], |
| "Should return 1 record insert-overwritten in the 3rd commit" |
| ); |
| |
| // read records changed from the 1st commit |
| let records = hudi_table.read_incremental_records_blocking(first_commit, None)?; |
| let schema = &records[0].schema(); |
| let records = concat_batches(schema, &records)?; |
| let sample_data = SampleTable::sample_data_order_by_id(&records); |
| assert_eq!( |
| sample_data, |
| vec![(4, "Diana", false),], |
| "Should return 1 record insert-overwritten in the 3rd commit" |
| ); |
| |
| // read records changed from the 3rd commit |
| let records = hudi_table.read_incremental_records_blocking(third_commit, None)?; |
| assert!( |
| records.is_empty(), |
| "Should return 0 record as it's the latest commit" |
| ); |
| } |
| Ok(()) |
| } |
| } |
| } |