| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| //! Integration tests for reading Hudi tables. |
| //! |
| //! This module contains tests for snapshot and time-travel queries, |
| //! organized by table version (v6, v8+) and query type. |
| |
| use arrow::compute::concat_batches; |
| use hudi_core::config::read::HudiReadConfig; |
| use hudi_core::config::util::empty_filters; |
| use hudi_core::error::Result; |
| use hudi_core::table::Table; |
| use hudi_test::{QuickstartTripsTable, SampleTable}; |
| |
| /// Test helper module for v6 tables (pre-1.0 spec) |
| mod v6_tables { |
| use super::*; |
| |
| mod snapshot_queries { |
| use super::*; |
| |
| #[test] |
| fn test_empty_table() -> Result<()> { |
| for base_url in SampleTable::V6Empty.urls() { |
| let hudi_table = Table::new_blocking(base_url.path())?; |
| let records = hudi_table.read_snapshot_blocking(empty_filters())?; |
| assert!(records.is_empty()); |
| } |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_non_partitioned() -> Result<()> { |
| for base_url in SampleTable::V6Nonpartitioned.urls() { |
| let hudi_table = Table::new_blocking(base_url.path())?; |
| let records = hudi_table.read_snapshot_blocking(empty_filters())?; |
| let schema = &records[0].schema(); |
| let records = concat_batches(schema, &records)?; |
| |
| let sample_data = SampleTable::sample_data_order_by_id(&records); |
| assert_eq!( |
| sample_data, |
| vec![ |
| (1, "Alice", false), |
| (2, "Bob", false), |
| (3, "Carol", true), |
| (4, "Diana", true), |
| ] |
| ); |
| } |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_non_partitioned_read_optimized() -> Result<()> { |
| let base_url = SampleTable::V6Nonpartitioned.url_to_mor_parquet(); |
| let hudi_table = Table::new_with_options_blocking( |
| base_url.path(), |
| [(HudiReadConfig::UseReadOptimizedMode.as_ref(), "true")], |
| )?; |
| let commit_timestamps = hudi_table |
| .timeline |
| .completed_commits |
| .iter() |
| .map(|i| i.timestamp.as_str()) |
| .collect::<Vec<_>>(); |
| let latest_commit = commit_timestamps.last().unwrap(); |
| let records = |
| hudi_table.read_snapshot_as_of_blocking(latest_commit, empty_filters())?; |
| let schema = &records[0].schema(); |
| let records = concat_batches(schema, &records)?; |
| |
| let sample_data = SampleTable::sample_data_order_by_id(&records); |
| assert_eq!( |
| sample_data, |
| vec![ |
| (1, "Alice", true), // this was updated to false in a log file and not to be read out |
| (2, "Bob", false), |
| (3, "Carol", true), |
| (4, "Diana", true), // this was inserted in a base file and should be read out |
| ] |
| ); |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_non_partitioned_rollback() -> Result<()> { |
| let base_url = SampleTable::V6NonpartitionedRollback.url_to_mor_parquet(); |
| let hudi_table = Table::new_blocking(base_url.path())?; |
| let records = hudi_table.read_snapshot_blocking(empty_filters())?; |
| let schema = &records[0].schema(); |
| let records = concat_batches(schema, &records)?; |
| |
| let sample_data = SampleTable::sample_data_order_by_id(&records); |
| assert_eq!( |
| sample_data, |
| vec![ |
| (1, "Alice", true), // this was updated to false then rolled back to true |
| (2, "Bob", true), // this was updated to true after rollback |
| (3, "Carol", true), |
| ] |
| ); |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_complex_keygen_hive_style_with_filters() -> Result<()> { |
| for base_url in SampleTable::V6ComplexkeygenHivestyle.urls() { |
| let hudi_table = Table::new_blocking(base_url.path())?; |
| |
| let filters = vec![ |
| ("byteField", ">=", "10"), |
| ("byteField", "<", "20"), |
| ("shortField", "!=", "100"), |
| ]; |
| let records = hudi_table.read_snapshot_blocking(filters)?; |
| let schema = &records[0].schema(); |
| let records = concat_batches(schema, &records)?; |
| |
| let sample_data = SampleTable::sample_data_order_by_id(&records); |
| assert_eq!(sample_data, vec![(1, "Alice", false), (3, "Carol", true),]); |
| } |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_simple_keygen_hivestyle_no_metafields() -> Result<()> { |
| for base_url in SampleTable::V6SimplekeygenHivestyleNoMetafields.urls() { |
| let hudi_table = Table::new_blocking(base_url.path())?; |
| let records = hudi_table.read_snapshot_blocking(empty_filters())?; |
| let schema = &records[0].schema(); |
| let records = concat_batches(schema, &records)?; |
| |
| let sample_data = SampleTable::sample_data_order_by_id(&records); |
| assert_eq!( |
| sample_data, |
| vec![ |
| (1, "Alice", false), |
| (2, "Bob", false), |
| (3, "Carol", true), |
| (4, "Diana", true), |
| ] |
| ) |
| } |
| Ok(()) |
| } |
| } |
| |
| mod time_travel_queries { |
| use super::*; |
| |
| #[test] |
| fn test_simple_keygen_nonhivestyle_time_travel() -> Result<()> { |
| for base_url in SampleTable::V6SimplekeygenNonhivestyle.urls() { |
| let hudi_table = Table::new_blocking(base_url.path())?; |
| let commit_timestamps = hudi_table |
| .timeline |
| .completed_commits |
| .iter() |
| .map(|i| i.timestamp.as_str()) |
| .collect::<Vec<_>>(); |
| let first_commit = commit_timestamps[0]; |
| let records = |
| hudi_table.read_snapshot_as_of_blocking(first_commit, empty_filters())?; |
| let schema = &records[0].schema(); |
| let records = concat_batches(schema, &records)?; |
| |
| let sample_data = SampleTable::sample_data_order_by_id(&records); |
| assert_eq!( |
| sample_data, |
| vec![(1, "Alice", true), (2, "Bob", false), (3, "Carol", true),] |
| ); |
| } |
| Ok(()) |
| } |
| } |
| |
| mod mor_log_file_queries { |
| use super::*; |
| |
| #[test] |
| fn test_quickstart_trips_inserts_updates() -> Result<()> { |
| let base_url = QuickstartTripsTable::V6Trips8I1U.url_to_mor_avro(); |
| let hudi_table = Table::new_blocking(base_url.path())?; |
| |
| let updated_rider = "rider-D"; |
| |
| // verify updated record as of the latest commit |
| let records = hudi_table.read_snapshot_blocking(empty_filters())?; |
| let schema = &records[0].schema(); |
| let records = concat_batches(schema, &records)?; |
| let uuid_rider_and_fare = QuickstartTripsTable::uuid_rider_and_fare(&records) |
| .into_iter() |
| .filter(|(_, rider, _)| rider == updated_rider) |
| .collect::<Vec<_>>(); |
| assert_eq!(uuid_rider_and_fare.len(), 1); |
| assert_eq!( |
| uuid_rider_and_fare[0].0, |
| "9909a8b1-2d15-4d3d-8ec9-efc48c536a00" |
| ); |
| assert_eq!(uuid_rider_and_fare[0].2, 25.0); |
| |
| // verify updated record as of the first commit |
| let commit_timestamps = hudi_table |
| .timeline |
| .completed_commits |
| .iter() |
| .map(|i| i.timestamp.as_str()) |
| .collect::<Vec<_>>(); |
| let first_commit = commit_timestamps[0]; |
| let records = hudi_table.read_snapshot_as_of_blocking(first_commit, empty_filters())?; |
| let schema = &records[0].schema(); |
| let records = concat_batches(schema, &records)?; |
| let uuid_rider_and_fare = QuickstartTripsTable::uuid_rider_and_fare(&records) |
| .into_iter() |
| .filter(|(_, rider, _)| rider == updated_rider) |
| .collect::<Vec<_>>(); |
| assert_eq!(uuid_rider_and_fare.len(), 1); |
| assert_eq!( |
| uuid_rider_and_fare[0].0, |
| "9909a8b1-2d15-4d3d-8ec9-efc48c536a00" |
| ); |
| assert_eq!(uuid_rider_and_fare[0].2, 33.9); |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_quickstart_trips_inserts_deletes() -> Result<()> { |
| let base_url = QuickstartTripsTable::V6Trips8I3D.url_to_mor_avro(); |
| let hudi_table = Table::new_blocking(base_url.path())?; |
| |
| let deleted_riders = ["rider-A", "rider-C", "rider-D"]; |
| |
| // verify deleted record as of the latest commit |
| let records = hudi_table.read_snapshot_blocking(empty_filters())?; |
| let schema = &records[0].schema(); |
| let records = concat_batches(schema, &records)?; |
| let riders = QuickstartTripsTable::uuid_rider_and_fare(&records) |
| .into_iter() |
| .map(|(_, rider, _)| rider) |
| .collect::<Vec<_>>(); |
| assert!(riders |
| .iter() |
| .all(|rider| { !deleted_riders.contains(&rider.as_str()) })); |
| |
| // verify deleted record as of the first commit |
| let commit_timestamps = hudi_table |
| .timeline |
| .completed_commits |
| .iter() |
| .map(|i| i.timestamp.as_str()) |
| .collect::<Vec<_>>(); |
| let first_commit = commit_timestamps[0]; |
| let records = hudi_table.read_snapshot_as_of_blocking(first_commit, empty_filters())?; |
| let schema = &records[0].schema(); |
| let records = concat_batches(schema, &records)?; |
| let mut uuid_rider_and_fare = QuickstartTripsTable::uuid_rider_and_fare(&records) |
| .into_iter() |
| .filter(|(_, rider, _)| deleted_riders.contains(&rider.as_str())) |
| .collect::<Vec<_>>(); |
| uuid_rider_and_fare.sort_unstable_by_key(|(_, rider, _)| rider.to_string()); |
| assert_eq!(uuid_rider_and_fare.len(), 3); |
| assert_eq!(uuid_rider_and_fare[0].1, "rider-A"); |
| assert_eq!(uuid_rider_and_fare[0].2, 19.10); |
| assert_eq!(uuid_rider_and_fare[1].1, "rider-C"); |
| assert_eq!(uuid_rider_and_fare[1].2, 27.70); |
| assert_eq!(uuid_rider_and_fare[2].1, "rider-D"); |
| assert_eq!(uuid_rider_and_fare[2].2, 33.90); |
| |
| Ok(()) |
| } |
| } |
| |
| mod incremental_queries { |
| use super::*; |
| |
| #[test] |
| fn test_empty_table() -> Result<()> { |
| for base_url in SampleTable::V6Empty.urls() { |
| let hudi_table = Table::new_blocking(base_url.path())?; |
| let records = hudi_table.read_incremental_records_blocking("0", None)?; |
| assert!(records.is_empty()) |
| } |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_simplekeygen_nonhivestyle_overwritetable() -> Result<()> { |
| for base_url in SampleTable::V6SimplekeygenNonhivestyleOverwritetable.urls() { |
| let hudi_table = Table::new_blocking(base_url.path())?; |
| let commit_timestamps = hudi_table |
| .timeline |
| .completed_commits |
| .iter() |
| .map(|i| i.timestamp.as_str()) |
| .collect::<Vec<_>>(); |
| assert_eq!(commit_timestamps.len(), 3); |
| let first_commit = commit_timestamps[0]; |
| let second_commit = commit_timestamps[1]; |
| let third_commit = commit_timestamps[2]; |
| |
| // read records changed from the beginning to the 1st commit |
| let records = hudi_table |
| .read_incremental_records_blocking("19700101000000", Some(first_commit))?; |
| let schema = &records[0].schema(); |
| let records = concat_batches(schema, &records)?; |
| let sample_data = SampleTable::sample_data_order_by_id(&records); |
| assert_eq!( |
| sample_data, |
| vec![(1, "Alice", true), (2, "Bob", false), (3, "Carol", true),], |
| "Should return 3 records inserted in the 1st commit" |
| ); |
| |
| // read records changed from the 1st to the 2nd commit |
| let records = hudi_table |
| .read_incremental_records_blocking(first_commit, Some(second_commit))?; |
| let schema = &records[0].schema(); |
| let records = concat_batches(schema, &records)?; |
| let sample_data = SampleTable::sample_data_order_by_id(&records); |
| assert_eq!( |
| sample_data, |
| vec![(1, "Alice", false), (4, "Diana", true),], |
| "Should return 2 records inserted or updated in the 2nd commit" |
| ); |
| |
| // read records changed from the 2nd to the 3rd commit |
| let records = hudi_table |
| .read_incremental_records_blocking(second_commit, Some(third_commit))?; |
| let schema = &records[0].schema(); |
| let records = concat_batches(schema, &records)?; |
| let sample_data = SampleTable::sample_data_order_by_id(&records); |
| assert_eq!( |
| sample_data, |
| vec![(4, "Diana", false),], |
| "Should return 1 record insert-overwritten in the 3rd commit" |
| ); |
| |
| // read records changed from the 1st commit |
| let records = hudi_table.read_incremental_records_blocking(first_commit, None)?; |
| let schema = &records[0].schema(); |
| let records = concat_batches(schema, &records)?; |
| let sample_data = SampleTable::sample_data_order_by_id(&records); |
| assert_eq!( |
| sample_data, |
| vec![(4, "Diana", false),], |
| "Should return 1 record insert-overwritten in the 3rd commit" |
| ); |
| |
| // read records changed from the 3rd commit |
| let records = hudi_table.read_incremental_records_blocking(third_commit, None)?; |
| assert!( |
| records.is_empty(), |
| "Should return 0 record as it's the latest commit" |
| ); |
| } |
| Ok(()) |
| } |
| } |
| } |
| |
| /// Test helper module for v8 tables (1.0 spec) |
| mod v8_tables { |
| use super::*; |
| |
| mod snapshot_queries { |
| use super::*; |
| |
| #[test] |
| fn test_empty_table() -> Result<()> { |
| let base_url = SampleTable::V8Empty.url_to_cow(); |
| let hudi_table = Table::new_blocking(base_url.path())?; |
| let records = hudi_table.read_snapshot_blocking(empty_filters())?; |
| assert!(records.is_empty()); |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_non_partitioned() -> Result<()> { |
| let base_url = SampleTable::V8Nonpartitioned.url_to_cow(); |
| let hudi_table = Table::new_blocking(base_url.path())?; |
| let records = hudi_table.read_snapshot_blocking(empty_filters())?; |
| let schema = &records[0].schema(); |
| let records = concat_batches(schema, &records)?; |
| |
| let sample_data = SampleTable::sample_data_order_by_id(&records); |
| assert_eq!( |
| sample_data, |
| vec![ |
| (1, "Alice", false), |
| (2, "Bob", false), |
| (3, "Carol", true), |
| (4, "Diana", true), |
| ] |
| ); |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_complex_keygen_hive_style() -> Result<()> { |
| let base_url = SampleTable::V8ComplexkeygenHivestyle.url_to_cow(); |
| let hudi_table = Table::new_blocking(base_url.path())?; |
| |
| let records = hudi_table.read_snapshot_blocking(empty_filters())?; |
| let schema = &records[0].schema(); |
| let records = concat_batches(schema, &records)?; |
| |
| let sample_data = SampleTable::sample_data_order_by_id(&records); |
| assert_eq!( |
| sample_data, |
| vec![ |
| (1, "Alice", false), |
| (2, "Bob", false), |
| (3, "Carol", true), |
| (4, "Diana", true), |
| ] |
| ); |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_simple_keygen_nonhivestyle() -> Result<()> { |
| let base_url = SampleTable::V8SimplekeygenNonhivestyle.url_to_cow(); |
| let hudi_table = Table::new_blocking(base_url.path())?; |
| |
| let records = hudi_table.read_snapshot_blocking(empty_filters())?; |
| let schema = &records[0].schema(); |
| let records = concat_batches(schema, &records)?; |
| |
| let sample_data = SampleTable::sample_data_order_by_id(&records); |
| assert_eq!( |
| sample_data, |
| vec![ |
| (1, "Alice", false), |
| (2, "Bob", false), |
| (3, "Carol", true), |
| (4, "Diana", true), |
| ] |
| ); |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_simple_keygen_hivestyle_no_metafields() -> Result<()> { |
| let base_url = SampleTable::V8SimplekeygenHivestyleNoMetafields.url_to_cow(); |
| let hudi_table = Table::new_blocking(base_url.path())?; |
| |
| let records = hudi_table.read_snapshot_blocking(empty_filters())?; |
| let schema = &records[0].schema(); |
| let records = concat_batches(schema, &records)?; |
| |
| let sample_data = SampleTable::sample_data_order_by_id(&records); |
| assert_eq!( |
| sample_data, |
| vec![ |
| (1, "Alice", false), |
| (2, "Bob", false), |
| (3, "Carol", true), |
| (4, "Diana", true), |
| ] |
| ); |
| Ok(()) |
| } |
| } |
| |
| /// MOR log file tests for v8 tables |
| mod mor_log_file_queries { |
| use super::*; |
| |
| #[test] |
| fn test_quickstart_trips_inserts_updates_deletes() -> Result<()> { |
| // V8Trips8I3U1D: 8 inserts, 3 updates (A, J, G fare=0), 2 deletes (F, J) |
| let base_url = QuickstartTripsTable::V8Trips8I3U1D.url_to_mor_avro(); |
| let hudi_table = Table::new_blocking(base_url.path())?; |
| |
| let deleted_riders = ["rider-F", "rider-J"]; |
| |
| // verify deleted records are not present in latest snapshot |
| let records = hudi_table.read_snapshot_blocking(empty_filters())?; |
| let schema = &records[0].schema(); |
| let records = concat_batches(schema, &records)?; |
| let uuid_rider_and_fare = QuickstartTripsTable::uuid_rider_and_fare(&records); |
| let riders: Vec<_> = uuid_rider_and_fare |
| .iter() |
| .map(|(_, rider, _)| rider.as_str()) |
| .collect(); |
| |
| // Deleted riders should not be present |
| assert!(riders |
| .iter() |
| .all(|rider| { !deleted_riders.contains(rider) })); |
| |
| // Should have 6 active riders (8 - 2 deleted) |
| assert_eq!(riders.len(), 6); |
| |
| // Verify updated fares (rider-A and rider-G have fare=0) |
| let rider_a = uuid_rider_and_fare |
| .iter() |
| .find(|(_, r, _)| r == "rider-A") |
| .expect("rider-A should exist"); |
| assert_eq!(rider_a.2, 0.0, "rider-A fare should be updated to 0"); |
| |
| let rider_g = uuid_rider_and_fare |
| .iter() |
| .find(|(_, r, _)| r == "rider-G") |
| .expect("rider-G should exist"); |
| assert_eq!(rider_g.2, 0.0, "rider-G fare should be updated to 0"); |
| |
| // verify deleted records were present in first commit (before updates/deletes) |
| let commit_timestamps = hudi_table |
| .timeline |
| .completed_commits |
| .iter() |
| .map(|i| i.timestamp.as_str()) |
| .collect::<Vec<_>>(); |
| let first_commit = commit_timestamps[0]; |
| let records = hudi_table.read_snapshot_as_of_blocking(first_commit, empty_filters())?; |
| let schema = &records[0].schema(); |
| let records = concat_batches(schema, &records)?; |
| let mut uuid_rider_and_fare = QuickstartTripsTable::uuid_rider_and_fare(&records) |
| .into_iter() |
| .filter(|(_, rider, _)| deleted_riders.contains(&rider.as_str())) |
| .collect::<Vec<_>>(); |
| uuid_rider_and_fare.sort_unstable_by_key(|(_, rider, _)| rider.to_string()); |
| |
| // Both deleted riders should be present before delete |
| assert_eq!(uuid_rider_and_fare.len(), 2); |
| assert_eq!(uuid_rider_and_fare[0].1, "rider-F"); |
| assert_eq!(uuid_rider_and_fare[0].2, 34.15); |
| assert_eq!(uuid_rider_and_fare[1].1, "rider-J"); |
| assert_eq!(uuid_rider_and_fare[1].2, 17.85); |
| |
| Ok(()) |
| } |
| } |
| } |