blob: 7218188d84a14d5afbbb3acb9fa70671c36549bc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
use crate::error::CoreError;
use crate::file_group::base_file::BaseFile;
use crate::file_group::log_file::LogFile;
use crate::storage::Storage;
use crate::Result;
use std::collections::BTreeSet;
use std::fmt::Display;
use std::path::PathBuf;
/// Within a [crate::file_group::FileGroup],
/// a [FileSlice] is a logical group of [BaseFile] and [LogFile]s.
#[derive(Clone, Debug)]
pub struct FileSlice {
pub base_file: BaseFile,
pub log_files: BTreeSet<LogFile>,
pub partition_path: String,
}
impl Display for FileSlice {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"FileSlice {{ base_file: {}, log_files: {:?}, partition_path: {} }}",
self.base_file, self.log_files, self.partition_path
)
}
}
impl PartialEq for FileSlice {
fn eq(&self, other: &Self) -> bool {
self.base_file == other.base_file && self.partition_path == other.partition_path
}
}
impl Eq for FileSlice {}
impl FileSlice {
pub fn new(base_file: BaseFile, partition_path: String) -> Self {
Self {
base_file,
log_files: BTreeSet::new(),
partition_path,
}
}
#[inline]
pub fn has_log_file(&self) -> bool {
!self.log_files.is_empty()
}
pub fn merge(&mut self, other: &FileSlice) -> Result<()> {
if self != other {
return Err(CoreError::FileGroup(format!(
"Cannot merge different file slices: {self} and {other}"
)));
}
self.log_files.extend(other.log_files.iter().cloned());
Ok(())
}
fn relative_path_for_file(&self, file_name: &str) -> Result<String> {
let path = PathBuf::from(self.partition_path.as_str()).join(file_name);
path.to_str().map(|s| s.to_string()).ok_or_else(|| {
CoreError::FileGroup(format!("Failed to get relative path for file: {file_name}",))
})
}
/// Returns the relative path of the [BaseFile] in the [FileSlice].
pub fn base_file_relative_path(&self) -> Result<String> {
let file_name = &self.base_file.file_name();
self.relative_path_for_file(file_name)
}
/// Returns the relative path of the given [LogFile] in the [FileSlice].
pub fn log_file_relative_path(&self, log_file: &LogFile) -> Result<String> {
let file_name = &log_file.file_name();
self.relative_path_for_file(file_name)
}
/// Returns the enclosing [FileGroup]'s id.
#[inline]
pub fn file_id(&self) -> &str {
&self.base_file.file_id
}
/// Returns the instant time that marks the [FileSlice] creation.
///
/// This is also an instant time stored in the [Timeline].
#[inline]
pub fn creation_instant_time(&self) -> &str {
&self.base_file.commit_timestamp
}
/// Load [FileMetadata] from storage layer for the [BaseFile] if `file_metadata` is [None]
/// or if `file_metadata` is not fully populated.
pub async fn load_metadata_if_needed(&mut self, storage: &Storage) -> Result<()> {
if let Some(metadata) = &self.base_file.file_metadata {
if metadata.fully_populated {
return Ok(());
}
}
let relative_path = self.base_file_relative_path()?;
let fetched_metadata = storage.get_file_metadata(&relative_path).await?;
self.base_file.file_metadata = Some(fetched_metadata);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::table::partition::EMPTY_PARTITION_PATH;
use std::str::FromStr;
#[test]
fn test_file_slices_merge() -> Result<()> {
let base = BaseFile::from_str(
"54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_0-7-24_20250109233025121.parquet",
)?;
let mut log_set1 = BTreeSet::new();
log_set1.insert(LogFile::from_str(
".54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_20250109233025121.log.4_0-51-115",
)?);
log_set1.insert(LogFile::from_str(
".54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_20250109233025121.log.2_0-51-115",
)?);
let mut log_set2 = BTreeSet::new();
log_set2.insert(LogFile::from_str(
".54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_20250109233025121.log.3_0-51-115",
)?);
log_set2.insert(LogFile::from_str(
".54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_20250109233025121.log.1_0-51-115",
)?);
log_set1.insert(LogFile::from_str(
".54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_20250109233025121.log.2_0-51-115",
)?);
let mut slice1 = FileSlice {
base_file: base.clone(),
log_files: log_set1,
partition_path: EMPTY_PARTITION_PATH.to_string(),
};
let slice2 = FileSlice {
base_file: base,
log_files: log_set2,
partition_path: EMPTY_PARTITION_PATH.to_string(),
};
slice1.merge(&slice2)?;
// Verify merged result
assert_eq!(slice1.log_files.len(), 4);
let log_file_names = slice1
.log_files
.iter()
.map(|log| log.file_name())
.collect::<Vec<String>>();
assert_eq!(
log_file_names.as_slice(),
&[
".54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_20250109233025121.log.1_0-51-115",
".54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_20250109233025121.log.2_0-51-115",
".54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_20250109233025121.log.3_0-51-115",
".54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_20250109233025121.log.4_0-51-115",
]
);
Ok(())
}
#[test]
fn test_merge_different_base_files() -> Result<()> {
let mut slice1 = FileSlice {
base_file: BaseFile::from_str(
"54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_0-7-24_20250109233025121.parquet",
)?,
log_files: BTreeSet::new(),
partition_path: EMPTY_PARTITION_PATH.to_string(),
};
let slice2 = FileSlice {
base_file: BaseFile::from_str(
"54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_1-19-51_20250109233025121.parquet",
)?,
log_files: BTreeSet::new(),
partition_path: EMPTY_PARTITION_PATH.to_string(),
};
// Should return error for different base files
assert!(slice1.merge(&slice2).is_err());
Ok(())
}
#[test]
fn test_merge_different_partition_paths() -> Result<()> {
let base = BaseFile::from_str(
"54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_1-19-51_20250109233025121.parquet",
)?;
let mut slice1 = FileSlice {
base_file: base.clone(),
log_files: BTreeSet::new(),
partition_path: "path/to/partition1".to_string(),
};
let slice2 = FileSlice {
base_file: base,
log_files: BTreeSet::new(),
partition_path: "path/to/partition2".to_string(),
};
// Should return error for different partition paths
assert!(slice1.merge(&slice2).is_err());
Ok(())
}
}