feat: add hfile reader (#490)
diff --git a/Cargo.toml b/Cargo.toml
index bc991d1..5e44b0a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -85,3 +85,9 @@
dashmap = { version = "~6.1" }
futures = { version = "~0.3" }
tokio = { version = "~1.45", features = ["rt-multi-thread"] }
+
+# protobuf
+prost = { version = "~0.13" }
+
+# compression
+flate2 = { version = "^1.1" }
diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml
index eb958a9..45d423d 100644
--- a/crates/core/Cargo.toml
+++ b/crates/core/Cargo.toml
@@ -73,6 +73,12 @@
futures = { workspace = true }
tokio = { workspace = true }
+# protobuf
+prost = { workspace = true }
+
+# compression
+flate2 = { workspace = true }
+
# datafusion
datafusion = { workspace = true, optional = true }
datafusion-expr = { workspace = true, optional = true }
diff --git a/crates/core/src/hfile/block.rs b/crates/core/src/hfile/block.rs
new file mode 100644
index 0000000..29b308e
--- /dev/null
+++ b/crates/core/src/hfile/block.rs
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! HFile block parsing and handling.
+
+use crate::hfile::block_type::{HFileBlockType, MAGIC_LENGTH};
+use crate::hfile::compression::CompressionCodec;
+use crate::hfile::error::{HFileError, Result};
+use crate::hfile::key::{Key, KeyValue, KEY_VALUE_HEADER_SIZE};
+
+/// Size constants for block header
+const SIZEOF_INT32: usize = 4;
+const SIZEOF_INT64: usize = 8;
+const SIZEOF_BYTE: usize = 1;
+
+/// Block header size without checksum info (HFile v2)
+const HEADER_SIZE_NO_CHECKSUM: usize = MAGIC_LENGTH + 2 * SIZEOF_INT32 + SIZEOF_INT64;
+
+/// Block header size with checksum (HFile v3)
+/// Header fields:
+/// - 8 bytes: magic
+/// - 4 bytes: on-disk size without header
+/// - 4 bytes: uncompressed size without header
+/// - 8 bytes: previous block offset
+/// - 1 byte: checksum type
+/// - 4 bytes: bytes per checksum
+/// - 4 bytes: on-disk data size with header
+pub const BLOCK_HEADER_SIZE: usize = HEADER_SIZE_NO_CHECKSUM + SIZEOF_BYTE + 2 * SIZEOF_INT32;
+
+/// Checksum size (each checksum is 4 bytes)
+const CHECKSUM_SIZE: usize = SIZEOF_INT32;
+
+/// Parsed block header information.
+#[derive(Debug, Clone)]
+#[allow(dead_code)]
+pub struct BlockHeader {
+ pub block_type: HFileBlockType,
+ pub on_disk_size_without_header: usize,
+ pub uncompressed_size_without_header: usize,
+ pub prev_block_offset: i64,
+ pub checksum_type: u8,
+ pub bytes_per_checksum: usize,
+ pub on_disk_data_size_with_header: usize,
+}
+
+impl BlockHeader {
+ /// Parse a block header from bytes.
+ pub fn parse(bytes: &[u8]) -> Result<Self> {
+ if bytes.len() < BLOCK_HEADER_SIZE {
+ return Err(HFileError::InvalidFormat(format!(
+ "Buffer too small for block header: {} bytes, need {}",
+ bytes.len(),
+ BLOCK_HEADER_SIZE
+ )));
+ }
+
+ let block_type = HFileBlockType::from_magic(bytes)?;
+
+ let on_disk_size_without_header =
+ i32::from_be_bytes([bytes[8], bytes[9], bytes[10], bytes[11]]) as usize;
+
+ let uncompressed_size_without_header =
+ i32::from_be_bytes([bytes[12], bytes[13], bytes[14], bytes[15]]) as usize;
+
+ let prev_block_offset = i64::from_be_bytes([
+ bytes[16], bytes[17], bytes[18], bytes[19], bytes[20], bytes[21], bytes[22], bytes[23],
+ ]);
+
+ let checksum_type = bytes[24];
+
+ let bytes_per_checksum =
+ i32::from_be_bytes([bytes[25], bytes[26], bytes[27], bytes[28]]) as usize;
+
+ let on_disk_data_size_with_header =
+ i32::from_be_bytes([bytes[29], bytes[30], bytes[31], bytes[32]]) as usize;
+
+ Ok(Self {
+ block_type,
+ on_disk_size_without_header,
+ uncompressed_size_without_header,
+ prev_block_offset,
+ checksum_type,
+ bytes_per_checksum,
+ on_disk_data_size_with_header,
+ })
+ }
+
+ /// Calculate the number of checksum bytes.
+ pub fn checksum_bytes_count(&self) -> usize {
+ let on_disk_with_header = BLOCK_HEADER_SIZE + self.on_disk_size_without_header;
+ let num_chunks = on_disk_with_header.div_ceil(self.bytes_per_checksum);
+ num_chunks * CHECKSUM_SIZE
+ }
+
+ /// Returns the total on-disk size including header.
+ pub fn on_disk_size_with_header(&self) -> usize {
+ BLOCK_HEADER_SIZE + self.on_disk_size_without_header
+ }
+}
+
+/// An HFile block with parsed content.
+#[derive(Debug)]
+pub struct HFileBlock {
+ pub header: BlockHeader,
+ /// Uncompressed block data (after header, before checksum)
+ pub data: Vec<u8>,
+}
+
+impl HFileBlock {
+ /// Parse a block from bytes, decompressing if necessary.
+ pub fn parse(bytes: &[u8], codec: CompressionCodec) -> Result<Self> {
+ let header = BlockHeader::parse(bytes)?;
+
+ // Extract the compressed data (between header and checksum)
+ let data_start = BLOCK_HEADER_SIZE;
+ let data_end =
+ data_start + header.on_disk_size_without_header - header.checksum_bytes_count();
+
+ // For uncompressed blocks, on_disk_size == uncompressed_size
+ // For compressed blocks, we need to decompress
+ let data = if codec == CompressionCodec::None {
+ bytes[data_start..data_start + header.uncompressed_size_without_header].to_vec()
+ } else {
+ let compressed_data = &bytes[data_start..data_end];
+ codec.decompress(compressed_data, header.uncompressed_size_without_header)?
+ };
+
+ Ok(Self { header, data })
+ }
+
+ /// Returns the block type.
+ pub fn block_type(&self) -> HFileBlockType {
+ self.header.block_type
+ }
+}
+
+/// A block index entry pointing to a data or meta block.
+#[derive(Debug, Clone)]
+pub struct BlockIndexEntry {
+ /// First key in the block (may be shortened/fake for optimization)
+ pub first_key: Key,
+ /// First key of the next block (if present)
+ pub next_block_first_key: Option<Key>,
+ /// Offset of the block in the file
+ pub offset: u64,
+ /// On-disk size of the block
+ pub size: u32,
+}
+
+impl BlockIndexEntry {
+ /// Create a new block index entry.
+ pub fn new(first_key: Key, next_block_first_key: Option<Key>, offset: u64, size: u32) -> Self {
+ Self {
+ first_key,
+ next_block_first_key,
+ offset,
+ size,
+ }
+ }
+}
+
+impl PartialEq for BlockIndexEntry {
+ fn eq(&self, other: &Self) -> bool {
+ self.first_key == other.first_key
+ }
+}
+
+impl Eq for BlockIndexEntry {}
+
+impl PartialOrd for BlockIndexEntry {
+ fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+ Some(self.cmp(other))
+ }
+}
+
+impl Ord for BlockIndexEntry {
+ fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+ self.first_key.cmp(&other.first_key)
+ }
+}
+
+/// Data block containing key-value pairs.
+pub struct DataBlock {
+ /// Uncompressed block data
+ data: Vec<u8>,
+ /// End offset of content (excluding checksum)
+ content_end: usize,
+}
+
+impl DataBlock {
+ /// Create a data block from an HFileBlock.
+ pub fn from_block(block: HFileBlock) -> Self {
+ let content_end = block.data.len();
+ Self {
+ data: block.data,
+ content_end,
+ }
+ }
+
+ /// Read a key-value at the given offset within the data block.
+ pub fn read_key_value(&self, offset: usize) -> KeyValue {
+ KeyValue::parse(&self.data, offset)
+ }
+
+ /// Check if the offset is within valid content range.
+ pub fn is_valid_offset(&self, offset: usize) -> bool {
+ offset < self.content_end
+ }
+
+ /// Iterate over all key-value pairs in the block.
+ #[allow(dead_code)]
+ pub fn iter(&self) -> DataBlockIterator<'_> {
+ DataBlockIterator {
+ block: self,
+ offset: 0,
+ }
+ }
+
+ /// Returns the raw data.
+ #[allow(dead_code)]
+ pub fn data(&self) -> &[u8] {
+ &self.data
+ }
+}
+
+/// Iterator over key-value pairs in a data block.
+pub struct DataBlockIterator<'a> {
+ block: &'a DataBlock,
+ offset: usize,
+}
+
+impl<'a> Iterator for DataBlockIterator<'a> {
+ type Item = KeyValue;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ if !self.block.is_valid_offset(self.offset) {
+ return None;
+ }
+
+ // Need at least 8 bytes for key/value lengths
+ if self.offset + KEY_VALUE_HEADER_SIZE > self.block.content_end {
+ return None;
+ }
+
+ let kv = self.block.read_key_value(self.offset);
+ self.offset += kv.record_size();
+ Some(kv)
+ }
+}
+
+/// Read a Hadoop VLong encoded integer from bytes.
+/// This is the encoding used in HFile root/leaf index blocks.
+/// Returns (value, bytes_consumed).
+///
+/// Hadoop VLong format:
+/// - First byte determines total size
+/// - If first byte >= -112 (signed), value fits in single byte
+/// - Otherwise, -111 - first_byte gives number of additional data bytes
+pub fn read_var_long(bytes: &[u8], offset: usize) -> (u64, usize) {
+ let first_byte = bytes[offset];
+ let size = var_long_size_on_disk_single(first_byte);
+
+ if size == 1 {
+ // Single byte encoding: value is the byte itself (as signed, then cast to u64)
+ return (first_byte as i8 as i64 as u64, 1);
+ }
+
+ // Multi-byte encoding: read size-1 bytes as big-endian
+ let mut value: u64 = 0;
+ for i in 0..size - 1 {
+ value = (value << 8) | (bytes[offset + 1 + i] as u64);
+ }
+
+ // Check if negative (first byte < -120 in signed representation)
+ let is_negative = (first_byte as i8) < -120;
+ if is_negative {
+ (!value, size)
+ } else {
+ (value, size)
+ }
+}
+
+/// Calculate the size of a Hadoop VLong encoded integer from the first byte.
+fn var_long_size_on_disk_single(first_byte: u8) -> usize {
+ let signed = first_byte as i8;
+ if signed >= -112 {
+ 1
+ } else {
+ (-111 - signed as i32) as usize
+ }
+}
+
+/// Calculate the size of a Hadoop VLong encoded integer.
+pub fn var_long_size_on_disk(bytes: &[u8], offset: usize) -> usize {
+ var_long_size_on_disk_single(bytes[offset])
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_block_header_parse_too_small() {
+ let bytes = vec![0u8; 10]; // Too small for header
+ let err = BlockHeader::parse(&bytes).unwrap_err();
+ assert!(matches!(err, HFileError::InvalidFormat(_)));
+ }
+
+ #[test]
+ fn test_block_header_on_disk_size_with_header() {
+ // Create a minimal valid block header with DATA magic
+ let mut bytes = vec![];
+ bytes.extend_from_slice(b"DATABLK*"); // magic
+ bytes.extend_from_slice(&100i32.to_be_bytes()); // on_disk_size_without_header
+ bytes.extend_from_slice(&200i32.to_be_bytes()); // uncompressed_size_without_header
+ bytes.extend_from_slice(&(-1i64).to_be_bytes()); // prev_block_offset
+ bytes.push(1); // checksum_type
+ bytes.extend_from_slice(&16384i32.to_be_bytes()); // bytes_per_checksum
+ bytes.extend_from_slice(&133i32.to_be_bytes()); // on_disk_data_size_with_header
+
+ let header = BlockHeader::parse(&bytes).unwrap();
+ assert_eq!(header.on_disk_size_with_header(), BLOCK_HEADER_SIZE + 100);
+ assert_eq!(header.block_type, HFileBlockType::Data);
+ }
+
+ #[test]
+ fn test_block_index_entry_ordering() {
+ let key1 = Key::from_bytes(vec![0, 3, b'a', b'a', b'a']);
+ let key2 = Key::from_bytes(vec![0, 3, b'b', b'b', b'b']);
+
+ let entry1 = BlockIndexEntry::new(key1.clone(), None, 0, 100);
+ let entry2 = BlockIndexEntry::new(key2.clone(), None, 1000, 100);
+ let entry3 = BlockIndexEntry::new(key1.clone(), None, 2000, 200);
+
+ assert!(entry1 < entry2);
+ assert_eq!(entry1, entry3); // Same key = equal
+ assert_eq!(entry1.partial_cmp(&entry2), Some(std::cmp::Ordering::Less));
+ }
+
+ #[test]
+ fn test_read_var_long_single_byte() {
+ // Single byte values (first byte >= -112 as signed, i.e., 0-143 or 144-255 when unsigned)
+ let bytes = vec![0u8]; // value 0
+ let (value, size) = read_var_long(&bytes, 0);
+ assert_eq!(value, 0);
+ assert_eq!(size, 1);
+
+ let bytes = vec![100u8]; // value 100
+ let (value, size) = read_var_long(&bytes, 0);
+ assert_eq!(value, 100);
+ assert_eq!(size, 1);
+
+ // 127 as unsigned byte is 127, as signed it's 127 (>= -112), so single byte
+ let bytes = vec![127u8];
+ let (value, size) = read_var_long(&bytes, 0);
+ assert_eq!(value, 127);
+ assert_eq!(size, 1);
+ }
+
+ #[test]
+ fn test_read_var_long_multi_byte() {
+ // Multi-byte encoding: first byte < -112 (signed)
+ // -113 as signed = 143 as unsigned = 0x8F
+ // This means 2 additional bytes (size = -111 - (-113) = 2, total size = 3)
+ // Actually let's test with known values
+
+ // For value 1000:
+ // In Hadoop VLong, values 128-255 use 2 bytes
+ // First byte = -113 (0x8F) means 2 additional bytes
+ // But let's use a simpler approach - test with offset
+
+ let bytes = vec![50u8, 0x8F, 0x03, 0xE8]; // offset 0 = 50 (single byte)
+ let (value, size) = read_var_long(&bytes, 0);
+ assert_eq!(value, 50);
+ assert_eq!(size, 1);
+ }
+
+ #[test]
+ fn test_var_long_size_on_disk() {
+ // Single byte values (first byte >= -112 as signed)
+ // 0..=127 are positive when signed, so single byte
+ assert_eq!(var_long_size_on_disk(&[0], 0), 1);
+ assert_eq!(var_long_size_on_disk(&[100], 0), 1);
+ assert_eq!(var_long_size_on_disk(&[127], 0), 1);
+ // 128..=143 are -128..-113 as signed, still >= -112? No.
+ // 143 as u8 = -113 as i8, which is >= -112? -113 >= -112? No, -113 < -112
+ // So 143 should be multi-byte. Let's check: -111 - (-113) = 2
+ assert_eq!(var_long_size_on_disk(&[143], 0), 2); // -113 as signed, size = 2
+
+ // 144 as u8 = -112 as i8, which is >= -112, so single byte
+ assert_eq!(var_long_size_on_disk(&[144], 0), 1);
+ }
+
+ #[test]
+ fn test_data_block_is_valid_offset() {
+ // Create a simple HFileBlock
+ let mut header_bytes = vec![];
+ header_bytes.extend_from_slice(b"DATABLK*"); // magic
+ header_bytes.extend_from_slice(&100i32.to_be_bytes()); // on_disk_size
+ header_bytes.extend_from_slice(&50i32.to_be_bytes()); // uncompressed_size
+ header_bytes.extend_from_slice(&(-1i64).to_be_bytes()); // prev_block_offset
+ header_bytes.push(1); // checksum_type
+ header_bytes.extend_from_slice(&16384i32.to_be_bytes()); // bytes_per_checksum
+ header_bytes.extend_from_slice(&133i32.to_be_bytes()); // on_disk_data_size
+
+ // Add data after header
+ header_bytes.extend_from_slice(&[0u8; 50]); // 50 bytes of data
+
+ let block = HFileBlock::parse(&header_bytes, CompressionCodec::None).unwrap();
+ let data_block = DataBlock::from_block(block);
+
+ assert!(data_block.is_valid_offset(0));
+ assert!(data_block.is_valid_offset(49));
+ assert!(!data_block.is_valid_offset(50));
+ assert!(!data_block.is_valid_offset(100));
+ }
+}
diff --git a/crates/core/src/hfile/block_type.rs b/crates/core/src/hfile/block_type.rs
new file mode 100644
index 0000000..620fd81
--- /dev/null
+++ b/crates/core/src/hfile/block_type.rs
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! HFile block type definitions.
+
+use crate::hfile::error::{HFileError, Result};
+
+/// Length of block magic bytes
+pub const MAGIC_LENGTH: usize = 8;
+
+/// HFile block types with their magic byte sequences.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum HFileBlockType {
+ /// Data block containing key-value pairs
+ Data,
+ /// Leaf-level index block (multi-level index)
+ LeafIndex,
+ /// Meta block (e.g., bloom filter data)
+ Meta,
+ /// Intermediate-level index block (multi-level index)
+ IntermediateIndex,
+ /// Root-level index block
+ RootIndex,
+ /// File info block containing metadata key-value pairs
+ FileInfo,
+ /// HFile trailer
+ Trailer,
+}
+
+impl HFileBlockType {
+ /// Returns the magic bytes for this block type.
+ pub fn magic(&self) -> &'static [u8; MAGIC_LENGTH] {
+ match self {
+ HFileBlockType::Data => b"DATABLK*",
+ HFileBlockType::LeafIndex => b"IDXLEAF2",
+ HFileBlockType::Meta => b"METABLKc",
+ HFileBlockType::IntermediateIndex => b"IDXINTE2",
+ HFileBlockType::RootIndex => b"IDXROOT2",
+ HFileBlockType::FileInfo => b"FILEINF2",
+ HFileBlockType::Trailer => b"TRABLK\"$",
+ }
+ }
+
+ /// Parse block type from magic bytes.
+ pub fn from_magic(magic: &[u8]) -> Result<Self> {
+ if magic.len() < MAGIC_LENGTH {
+ return Err(HFileError::InvalidFormat(format!(
+ "Magic bytes too short: {} bytes",
+ magic.len()
+ )));
+ }
+
+ let magic_arr: &[u8; MAGIC_LENGTH] = magic[..MAGIC_LENGTH].try_into().unwrap();
+
+ match magic_arr {
+ b"DATABLK*" | b"DATABLKE" => Ok(HFileBlockType::Data),
+ b"IDXLEAF2" => Ok(HFileBlockType::LeafIndex),
+ b"METABLKc" => Ok(HFileBlockType::Meta),
+ b"IDXINTE2" => Ok(HFileBlockType::IntermediateIndex),
+ b"IDXROOT2" => Ok(HFileBlockType::RootIndex),
+ b"FILEINF2" => Ok(HFileBlockType::FileInfo),
+ b"TRABLK\"$" => Ok(HFileBlockType::Trailer),
+ _ => Err(HFileError::InvalidBlockMagic {
+ expected: "valid block magic".to_string(),
+ actual: String::from_utf8_lossy(magic_arr).to_string(),
+ }),
+ }
+ }
+
+ /// Check if the given bytes start with this block type's magic.
+ pub fn check_magic(&self, bytes: &[u8]) -> Result<()> {
+ if bytes.len() < MAGIC_LENGTH {
+ return Err(HFileError::InvalidFormat(format!(
+ "Buffer too short for magic check: {} bytes",
+ bytes.len()
+ )));
+ }
+
+ let expected = self.magic();
+ let actual = &bytes[..MAGIC_LENGTH];
+
+ if actual != expected {
+ return Err(HFileError::InvalidBlockMagic {
+ expected: String::from_utf8_lossy(expected).to_string(),
+ actual: String::from_utf8_lossy(actual).to_string(),
+ });
+ }
+
+ Ok(())
+ }
+}
+
+impl std::fmt::Display for HFileBlockType {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ HFileBlockType::Data => write!(f, "DATA"),
+ HFileBlockType::LeafIndex => write!(f, "LEAF_INDEX"),
+ HFileBlockType::Meta => write!(f, "META"),
+ HFileBlockType::IntermediateIndex => write!(f, "INTERMEDIATE_INDEX"),
+ HFileBlockType::RootIndex => write!(f, "ROOT_INDEX"),
+ HFileBlockType::FileInfo => write!(f, "FILE_INFO"),
+ HFileBlockType::Trailer => write!(f, "TRAILER"),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_magic_bytes() {
+ assert_eq!(HFileBlockType::Data.magic(), b"DATABLK*");
+ assert_eq!(HFileBlockType::LeafIndex.magic(), b"IDXLEAF2");
+ assert_eq!(HFileBlockType::Meta.magic(), b"METABLKc");
+ assert_eq!(HFileBlockType::IntermediateIndex.magic(), b"IDXINTE2");
+ assert_eq!(HFileBlockType::RootIndex.magic(), b"IDXROOT2");
+ assert_eq!(HFileBlockType::FileInfo.magic(), b"FILEINF2");
+ assert_eq!(HFileBlockType::Trailer.magic(), b"TRABLK\"$");
+ }
+
+ #[test]
+ fn test_from_magic_data_block() {
+ let magic = b"DATABLK*";
+ assert_eq!(
+ HFileBlockType::from_magic(magic).unwrap(),
+ HFileBlockType::Data
+ );
+ }
+
+ #[test]
+ fn test_from_magic_data_block_encoded() {
+ // Encoded data block magic
+ let magic = b"DATABLKE";
+ assert_eq!(
+ HFileBlockType::from_magic(magic).unwrap(),
+ HFileBlockType::Data
+ );
+ }
+
+ #[test]
+ fn test_from_magic_all_types() {
+ assert_eq!(
+ HFileBlockType::from_magic(b"IDXLEAF2").unwrap(),
+ HFileBlockType::LeafIndex
+ );
+ assert_eq!(
+ HFileBlockType::from_magic(b"METABLKc").unwrap(),
+ HFileBlockType::Meta
+ );
+ assert_eq!(
+ HFileBlockType::from_magic(b"IDXINTE2").unwrap(),
+ HFileBlockType::IntermediateIndex
+ );
+ assert_eq!(
+ HFileBlockType::from_magic(b"IDXROOT2").unwrap(),
+ HFileBlockType::RootIndex
+ );
+ assert_eq!(
+ HFileBlockType::from_magic(b"FILEINF2").unwrap(),
+ HFileBlockType::FileInfo
+ );
+ assert_eq!(
+ HFileBlockType::from_magic(b"TRABLK\"$").unwrap(),
+ HFileBlockType::Trailer
+ );
+ }
+
+ #[test]
+ fn test_from_magic_too_short() {
+ let err = HFileBlockType::from_magic(b"DATA").unwrap_err();
+ assert!(matches!(err, HFileError::InvalidFormat(_)));
+ }
+
+ #[test]
+ fn test_from_magic_invalid() {
+ let err = HFileBlockType::from_magic(b"INVALID!").unwrap_err();
+ assert!(matches!(err, HFileError::InvalidBlockMagic { .. }));
+ }
+
+ #[test]
+ fn test_check_magic_success() {
+ let bytes = b"DATABLK*extra_data_here";
+ assert!(HFileBlockType::Data.check_magic(bytes).is_ok());
+ }
+
+ #[test]
+ fn test_check_magic_too_short() {
+ let err = HFileBlockType::Data.check_magic(b"DATA").unwrap_err();
+ assert!(matches!(err, HFileError::InvalidFormat(_)));
+ }
+
+ #[test]
+ fn test_check_magic_mismatch() {
+ let err = HFileBlockType::Data.check_magic(b"IDXROOT2").unwrap_err();
+ assert!(matches!(err, HFileError::InvalidBlockMagic { .. }));
+ }
+
+ #[test]
+ fn test_display() {
+ assert_eq!(format!("{}", HFileBlockType::Data), "DATA");
+ assert_eq!(format!("{}", HFileBlockType::LeafIndex), "LEAF_INDEX");
+ assert_eq!(format!("{}", HFileBlockType::Meta), "META");
+ assert_eq!(
+ format!("{}", HFileBlockType::IntermediateIndex),
+ "INTERMEDIATE_INDEX"
+ );
+ assert_eq!(format!("{}", HFileBlockType::RootIndex), "ROOT_INDEX");
+ assert_eq!(format!("{}", HFileBlockType::FileInfo), "FILE_INFO");
+ assert_eq!(format!("{}", HFileBlockType::Trailer), "TRAILER");
+ }
+}
diff --git a/crates/core/src/hfile/compression.rs b/crates/core/src/hfile/compression.rs
new file mode 100644
index 0000000..91d5cf6
--- /dev/null
+++ b/crates/core/src/hfile/compression.rs
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! Compression codec support for HFile blocks.
+
+use crate::hfile::error::{HFileError, Result};
+use flate2::read::GzDecoder;
+use std::io::Read;
+
+/// Compression codec IDs used in HFile.
+/// These IDs are stored in the HFile trailer and must not change.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
+pub enum CompressionCodec {
+ /// LZO compression (ID: 0)
+ Lzo = 0,
+ /// GZIP compression (ID: 1)
+ Gzip = 1,
+ /// No compression (ID: 2)
+ #[default]
+ None = 2,
+ /// Snappy compression (ID: 3)
+ Snappy = 3,
+ /// LZ4 compression (ID: 4)
+ Lz4 = 4,
+ /// BZIP2 compression (ID: 5)
+ Bzip2 = 5,
+ /// ZSTD compression (ID: 6)
+ Zstd = 6,
+}
+
+impl CompressionCodec {
+ /// Decode compression codec from ID stored in HFile.
+ pub fn from_id(id: u32) -> Result<Self> {
+ match id {
+ 0 => Ok(CompressionCodec::Lzo),
+ 1 => Ok(CompressionCodec::Gzip),
+ 2 => Ok(CompressionCodec::None),
+ 3 => Ok(CompressionCodec::Snappy),
+ 4 => Ok(CompressionCodec::Lz4),
+ 5 => Ok(CompressionCodec::Bzip2),
+ 6 => Ok(CompressionCodec::Zstd),
+ _ => Err(HFileError::UnsupportedCompression(id)),
+ }
+ }
+
+ /// Decompress data using this codec.
+ ///
+ /// # Arguments
+ /// * `compressed_data` - The compressed data bytes
+ /// * `uncompressed_size` - Expected size of uncompressed data
+ ///
+ /// # Returns
+ /// Decompressed data as a byte vector
+ pub fn decompress(&self, compressed_data: &[u8], uncompressed_size: usize) -> Result<Vec<u8>> {
+ match self {
+ CompressionCodec::None => Ok(compressed_data.to_vec()),
+ CompressionCodec::Gzip => {
+ let mut decoder = GzDecoder::new(compressed_data);
+ let mut decompressed = Vec::with_capacity(uncompressed_size);
+ decoder.read_to_end(&mut decompressed).map_err(|e| {
+ HFileError::DecompressionError(format!("GZIP decompression failed: {}", e))
+ })?;
+ Ok(decompressed)
+ }
+ CompressionCodec::Lzo => Err(HFileError::DecompressionError(
+ "LZO compression not yet supported".to_string(),
+ )),
+ CompressionCodec::Snappy => Err(HFileError::DecompressionError(
+ "Snappy compression not yet supported".to_string(),
+ )),
+ CompressionCodec::Lz4 => Err(HFileError::DecompressionError(
+ "LZ4 compression not yet supported".to_string(),
+ )),
+ CompressionCodec::Bzip2 => Err(HFileError::DecompressionError(
+ "BZIP2 compression not yet supported".to_string(),
+ )),
+ CompressionCodec::Zstd => Err(HFileError::DecompressionError(
+ "ZSTD compression not yet supported".to_string(),
+ )),
+ }
+ }
+}
diff --git a/crates/core/src/hfile/error.rs b/crates/core/src/hfile/error.rs
new file mode 100644
index 0000000..f69c72c
--- /dev/null
+++ b/crates/core/src/hfile/error.rs
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! HFile error types.
+
+use std::io;
+use thiserror::Error;
+
+pub type Result<T, E = HFileError> = std::result::Result<T, E>;
+
+#[derive(Error, Debug)]
+pub enum HFileError {
+ #[error("Invalid HFile: {0}")]
+ InvalidFormat(String),
+
+ #[error("Invalid block magic: expected {expected}, got {actual}")]
+ InvalidBlockMagic { expected: String, actual: String },
+
+ #[error("Unsupported HFile version: major={major}, minor={minor}")]
+ UnsupportedVersion { major: u32, minor: u32 },
+
+ #[error("Unsupported compression codec: {0}")]
+ UnsupportedCompression(u32),
+
+ #[error("HFiles with MVCC timestamps are not supported")]
+ UnsupportedMvccTimestamp,
+
+ #[error("Decompression error: {0}")]
+ DecompressionError(String),
+
+ #[error("Protobuf decode error: {0}")]
+ ProtobufError(#[from] prost::DecodeError),
+
+ #[error("IO error: {0}")]
+ IoError(#[from] io::Error),
+
+ #[error("Unexpected block type: expected {expected}, got {actual}")]
+ UnexpectedBlockType { expected: String, actual: String },
+
+ #[error("Backward seek not supported: current position is ahead of lookup key")]
+ BackwardSeekNotSupported,
+
+ #[error("Not seeked: must call seek_to() before reading key-value pairs")]
+ NotSeeked,
+
+ #[error("End of file reached")]
+ Eof,
+}
diff --git a/crates/core/src/hfile/key.rs b/crates/core/src/hfile/key.rs
new file mode 100644
index 0000000..51e664a
--- /dev/null
+++ b/crates/core/src/hfile/key.rs
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! Key and KeyValue types for HFile.
+
+use std::cmp::Ordering;
+
+/// Size constants
+const SIZEOF_INT32: usize = 4;
+const SIZEOF_INT16: usize = 2;
+
+/// Key offset after key length (int32) and value length (int32)
+pub const KEY_VALUE_HEADER_SIZE: usize = SIZEOF_INT32 * 2;
+
+/// A key in HFile format.
+///
+/// In HFile, keys have the following structure:
+/// - 2 bytes: key content length (short)
+/// - N bytes: key content
+/// - Additional bytes: other information (not used by Hudi)
+///
+/// For comparison and hashing, only the key content is used.
+#[derive(Debug, Clone)]
+pub struct Key {
+ /// Raw key bytes including the length prefix
+ bytes: Vec<u8>,
+ /// Offset to the start of the key within bytes
+ offset: usize,
+ /// Total length of the key part (including length prefix and other info)
+ length: usize,
+}
+
+impl Key {
+ /// Create a new Key from bytes at the given offset with the specified length.
+ pub fn new(bytes: &[u8], offset: usize, length: usize) -> Self {
+ Self {
+ bytes: bytes.to_vec(),
+ offset,
+ length,
+ }
+ }
+
+ /// Create a Key from raw bytes (the entire key).
+ pub fn from_bytes(bytes: Vec<u8>) -> Self {
+ let length = bytes.len();
+ Self {
+ bytes,
+ offset: 0,
+ length,
+ }
+ }
+
+ /// Returns the offset to the key content (after length prefix).
+ pub fn content_offset(&self) -> usize {
+ self.offset + SIZEOF_INT16
+ }
+
+ /// Returns the length of the key content.
+ pub fn content_length(&self) -> usize {
+ if self.bytes.len() < self.offset + SIZEOF_INT16 {
+ return 0;
+ }
+ let len_bytes = &self.bytes[self.offset..self.offset + SIZEOF_INT16];
+ i16::from_be_bytes([len_bytes[0], len_bytes[1]]) as usize
+ }
+
+ /// Returns the key content as a byte slice.
+ pub fn content(&self) -> &[u8] {
+ let start = self.content_offset();
+ let len = self.content_length();
+ if start + len > self.bytes.len() {
+ return &[];
+ }
+ &self.bytes[start..start + len]
+ }
+
+ /// Returns the key content as a UTF-8 string.
+ pub fn content_as_str(&self) -> Result<&str, std::str::Utf8Error> {
+ std::str::from_utf8(self.content())
+ }
+
+ /// Returns the total length of the key part.
+ pub fn length(&self) -> usize {
+ self.length
+ }
+
+ /// Returns the raw bytes.
+ pub fn bytes(&self) -> &[u8] {
+ &self.bytes
+ }
+}
+
+impl PartialEq for Key {
+ fn eq(&self, other: &Self) -> bool {
+ self.content() == other.content()
+ }
+}
+
+impl Eq for Key {}
+
+impl PartialOrd for Key {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ Some(self.cmp(other))
+ }
+}
+
+impl Ord for Key {
+ fn cmp(&self, other: &Self) -> Ordering {
+ self.content().cmp(other.content())
+ }
+}
+
+impl std::hash::Hash for Key {
+ fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+ self.content().hash(state);
+ }
+}
+
+impl std::fmt::Display for Key {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self.content_as_str() {
+ Ok(s) => write!(f, "Key{{{}}}", s),
+ Err(_) => write!(f, "Key{{<binary>}}"),
+ }
+ }
+}
+
+/// A UTF-8 string key without length prefix.
+///
+/// Used for lookup keys and meta block keys where the key is just the content
+/// without the HFile key structure.
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct Utf8Key {
+ content: String,
+}
+
+impl Utf8Key {
+ /// Create a new UTF-8 key from a string.
+ pub fn new(s: impl Into<String>) -> Self {
+ Self { content: s.into() }
+ }
+
+ /// Returns the key content as bytes.
+ pub fn as_bytes(&self) -> &[u8] {
+ self.content.as_bytes()
+ }
+
+ /// Returns the key content as a string slice.
+ pub fn as_str(&self) -> &str {
+ &self.content
+ }
+}
+
+impl PartialOrd for Utf8Key {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ Some(self.cmp(other))
+ }
+}
+
+impl Ord for Utf8Key {
+ fn cmp(&self, other: &Self) -> Ordering {
+ self.content.as_bytes().cmp(other.content.as_bytes())
+ }
+}
+
+impl From<&str> for Utf8Key {
+ fn from(s: &str) -> Self {
+ Utf8Key::new(s)
+ }
+}
+
+impl From<String> for Utf8Key {
+ fn from(s: String) -> Self {
+ Utf8Key::new(s)
+ }
+}
+
+impl std::fmt::Display for Utf8Key {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "Utf8Key{{{}}}", self.content)
+ }
+}
+
+/// A key-value pair from HFile data block.
+///
+/// The HFile key-value format is:
+/// - 4 bytes: key length (int32)
+/// - 4 bytes: value length (int32)
+/// - N bytes: key (structured as Key)
+/// - M bytes: value
+/// - 1 byte: MVCC timestamp version (always 0 for Hudi)
+#[derive(Debug, Clone)]
+pub struct KeyValue {
+ /// The backing byte array containing the entire key-value record
+ bytes: Vec<u8>,
+ /// Offset to the start of this record in bytes
+ offset: usize,
+ /// The parsed key
+ key: Key,
+ /// Length of key part
+ key_length: usize,
+ /// Length of value part
+ value_length: usize,
+}
+
+impl KeyValue {
+ /// Parse a KeyValue from bytes at the given offset.
+ pub fn parse(bytes: &[u8], offset: usize) -> Self {
+ let key_length = i32::from_be_bytes([
+ bytes[offset],
+ bytes[offset + 1],
+ bytes[offset + 2],
+ bytes[offset + 3],
+ ]) as usize;
+
+ let value_length = i32::from_be_bytes([
+ bytes[offset + 4],
+ bytes[offset + 5],
+ bytes[offset + 6],
+ bytes[offset + 7],
+ ]) as usize;
+
+ let key_offset = offset + KEY_VALUE_HEADER_SIZE;
+ let key = Key::new(bytes, key_offset, key_length);
+
+ Self {
+ bytes: bytes.to_vec(),
+ offset,
+ key,
+ key_length,
+ value_length,
+ }
+ }
+
+ /// Returns the key.
+ pub fn key(&self) -> &Key {
+ &self.key
+ }
+
+ /// Returns the value as a byte slice.
+ pub fn value(&self) -> &[u8] {
+ let value_offset = self.offset + KEY_VALUE_HEADER_SIZE + self.key_length;
+ &self.bytes[value_offset..value_offset + self.value_length]
+ }
+
+ /// Returns the total size of this key-value record including MVCC timestamp.
+ pub fn record_size(&self) -> usize {
+ // header (8) + key + value + mvcc timestamp (1)
+ KEY_VALUE_HEADER_SIZE + self.key_length + self.value_length + 1
+ }
+
+ /// Returns the key length.
+ pub fn key_length(&self) -> usize {
+ self.key_length
+ }
+
+ /// Returns the value length.
+ pub fn value_length(&self) -> usize {
+ self.value_length
+ }
+}
+
+impl std::fmt::Display for KeyValue {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "KeyValue{{key={}}}", self.key)
+ }
+}
+
+/// Compare a Key with a Utf8Key (for lookups).
+///
+/// This compares the key content bytes lexicographically.
+pub fn compare_keys(key: &Key, lookup: &Utf8Key) -> Ordering {
+ key.content().cmp(lookup.as_bytes())
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_utf8_key_comparison() {
+ let k1 = Utf8Key::new("abc");
+ let k2 = Utf8Key::new("abd");
+ let k3 = Utf8Key::new("abc");
+
+ assert!(k1 < k2);
+ assert_eq!(k1, k3);
+ }
+
+ #[test]
+ fn test_utf8_key_from_str() {
+ let k1: Utf8Key = "test".into();
+ let k2 = Utf8Key::from("test");
+ assert_eq!(k1, k2);
+ assert_eq!(k1.as_str(), "test");
+ assert_eq!(k1.as_bytes(), b"test");
+ }
+
+ #[test]
+ fn test_utf8_key_from_string() {
+ let s = String::from("hello");
+ let k: Utf8Key = s.into();
+ assert_eq!(k.as_str(), "hello");
+ }
+
+ #[test]
+ fn test_utf8_key_display() {
+ let k = Utf8Key::new("mykey");
+ assert_eq!(format!("{}", k), "Utf8Key{mykey}");
+ }
+
+ #[test]
+ fn test_key_new() {
+ // Create a key with length prefix: 2 bytes for length (0, 4) + 4 bytes content "test"
+ let bytes = vec![0, 4, b't', b'e', b's', b't', 0, 0]; // extra bytes at end
+ let key = Key::new(&bytes, 0, 6);
+
+ assert_eq!(key.content_length(), 4);
+ assert_eq!(key.content(), b"test");
+ assert_eq!(key.content_as_str().unwrap(), "test");
+ assert_eq!(key.length(), 6);
+ }
+
+ #[test]
+ fn test_key_from_bytes() {
+ let bytes = vec![0, 3, b'a', b'b', b'c'];
+ let key = Key::from_bytes(bytes);
+
+ assert_eq!(key.content_length(), 3);
+ assert_eq!(key.content(), b"abc");
+ }
+
+ #[test]
+ fn test_key_content_empty() {
+ // Test with buffer too small for length prefix
+ let bytes = vec![0];
+ let key = Key::new(&bytes, 0, 1);
+ assert_eq!(key.content_length(), 0);
+ }
+
+ #[test]
+ fn test_key_content_out_of_bounds() {
+ // Key claims content length of 10 but only has 3 bytes
+ let bytes = vec![0, 10, b'a', b'b', b'c'];
+ let key = Key::from_bytes(bytes);
+ // content() should return empty slice when out of bounds
+ assert_eq!(key.content(), &[] as &[u8]);
+ }
+
+ #[test]
+ fn test_key_equality() {
+ let bytes1 = vec![0, 3, b'a', b'b', b'c'];
+ let bytes2 = vec![0, 3, b'a', b'b', b'c'];
+ let bytes3 = vec![0, 3, b'x', b'y', b'z'];
+
+ let k1 = Key::from_bytes(bytes1);
+ let k2 = Key::from_bytes(bytes2);
+ let k3 = Key::from_bytes(bytes3);
+
+ assert_eq!(k1, k2);
+ assert_ne!(k1, k3);
+ }
+
+ #[test]
+ fn test_key_ordering() {
+ let k1 = Key::from_bytes(vec![0, 3, b'a', b'b', b'c']);
+ let k2 = Key::from_bytes(vec![0, 3, b'a', b'b', b'd']);
+ let k3 = Key::from_bytes(vec![0, 3, b'a', b'b', b'c']);
+
+ assert!(k1 < k2);
+ assert_eq!(k1.cmp(&k3), Ordering::Equal);
+ }
+
+ #[test]
+ fn test_key_hash() {
+ use std::collections::HashSet;
+
+ let k1 = Key::from_bytes(vec![0, 3, b'a', b'b', b'c']);
+ let k2 = Key::from_bytes(vec![0, 3, b'a', b'b', b'c']);
+
+ let mut set = HashSet::new();
+ set.insert(k1);
+ assert!(set.contains(&k2));
+ }
+
+ #[test]
+ fn test_key_display() {
+ let k1 = Key::from_bytes(vec![0, 4, b't', b'e', b's', b't']);
+ assert_eq!(format!("{}", k1), "Key{test}");
+
+ // Binary key (invalid UTF-8)
+ let k2 = Key::from_bytes(vec![0, 3, 0xFF, 0xFE, 0xFD]);
+ assert_eq!(format!("{}", k2), "Key{<binary>}");
+ }
+
+ #[test]
+ fn test_key_bytes() {
+ let original = vec![0, 3, b'a', b'b', b'c'];
+ let key = Key::from_bytes(original.clone());
+ assert_eq!(key.bytes(), &original);
+ }
+
+ #[test]
+ fn test_keyvalue_parse() {
+ // Build a KeyValue structure:
+ // 4 bytes key length (11) + 4 bytes value length (5)
+ // + key: 2 bytes content length (4) + 4 bytes "test" + 5 extra key bytes
+ // + value: 5 bytes "value"
+ // + 1 byte MVCC timestamp
+ let mut bytes = vec![];
+ bytes.extend_from_slice(&11i32.to_be_bytes()); // key length
+ bytes.extend_from_slice(&5i32.to_be_bytes()); // value length
+ bytes.extend_from_slice(&[0, 4]); // key content length (4)
+ bytes.extend_from_slice(b"test"); // key content
+ bytes.extend_from_slice(&[0, 0, 0, 0, 0]); // extra key bytes
+ bytes.extend_from_slice(b"value"); // value
+ bytes.push(0); // MVCC timestamp
+
+ let kv = KeyValue::parse(&bytes, 0);
+
+ assert_eq!(kv.key().content_as_str().unwrap(), "test");
+ assert_eq!(kv.value(), b"value");
+ assert_eq!(kv.key_length(), 11);
+ assert_eq!(kv.value_length(), 5);
+ assert_eq!(kv.record_size(), 8 + 11 + 5 + 1); // header + key + value + mvcc
+ }
+
+ #[test]
+ fn test_keyvalue_display() {
+ let mut bytes = vec![];
+ bytes.extend_from_slice(&6i32.to_be_bytes()); // key length
+ bytes.extend_from_slice(&3i32.to_be_bytes()); // value length
+ bytes.extend_from_slice(&[0, 4]); // key content length
+ bytes.extend_from_slice(b"test"); // key content
+ bytes.extend_from_slice(b"val"); // value
+ bytes.push(0); // MVCC
+
+ let kv = KeyValue::parse(&bytes, 0);
+ assert!(format!("{}", kv).contains("test"));
+ }
+
+ #[test]
+ fn test_compare_keys() {
+ let key = Key::from_bytes(vec![0, 3, b'a', b'b', b'c']);
+ let lookup1 = Utf8Key::new("abc");
+ let lookup2 = Utf8Key::new("abd");
+ let lookup3 = Utf8Key::new("abb");
+
+ assert_eq!(compare_keys(&key, &lookup1), Ordering::Equal);
+ assert_eq!(compare_keys(&key, &lookup2), Ordering::Less);
+ assert_eq!(compare_keys(&key, &lookup3), Ordering::Greater);
+ }
+}
diff --git a/crates/core/src/hfile/mod.rs b/crates/core/src/hfile/mod.rs
new file mode 100644
index 0000000..67c9370
--- /dev/null
+++ b/crates/core/src/hfile/mod.rs
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! HFile reader implementation.
+//!
+//! HFile is an SSTable based row-oriented file format, optimized for
+//! range scans and point lookups. HFile is used as the base file format
+//! for Hudi's metadata table.
+//!
+//! See [Hudi's HFile format specification](https://github.com/apache/hudi/blob/master/hudi-io/hfile_format.md).
+//!
+//! # Example
+//! ```ignore
+//! use hudi_core::hfile::{HFileReader, Utf8Key, SeekResult};
+//!
+//! let bytes = std::fs::read("data.hfile")?;
+//! let mut reader = HFileReader::new(bytes)?;
+//!
+//! // Iterate all entries
+//! for result in reader.iter()? {
+//! let kv = result?;
+//! println!("{}: {:?}", kv.key(), kv.value());
+//! }
+//! ```
+
+mod block;
+mod block_type;
+mod compression;
+mod error;
+mod key;
+mod proto;
+mod reader;
+mod record;
+mod trailer;
+
+pub use block::BlockIndexEntry;
+pub use block_type::HFileBlockType;
+pub use error::{HFileError, Result};
+pub use key::{Key, KeyValue, Utf8Key};
+pub use reader::{HFileReader, HFileRecordIterator, SeekResult};
+pub use record::HFileRecord;
diff --git a/crates/core/src/hfile/proto.rs b/crates/core/src/hfile/proto.rs
new file mode 100644
index 0000000..6a70a04
--- /dev/null
+++ b/crates/core/src/hfile/proto.rs
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! Protobuf message definitions for HFile format.
+//!
+//! These are manually defined to match the HBase HFile protobuf schema.
+//! The definitions correspond to:
+//! - TrailerProto: HFile trailer metadata
+//! - InfoProto/BytesBytesPair: File info key-value pairs
+
+use prost::Message;
+
+/// HFile trailer protobuf message.
+///
+/// Contains metadata about the HFile structure including offsets,
+/// counts, and compression settings.
+#[derive(Clone, PartialEq, Message)]
+pub struct TrailerProto {
+ #[prost(uint64, optional, tag = "1")]
+ pub file_info_offset: Option<u64>,
+
+ #[prost(uint64, optional, tag = "2")]
+ pub load_on_open_data_offset: Option<u64>,
+
+ #[prost(uint64, optional, tag = "3")]
+ pub uncompressed_data_index_size: Option<u64>,
+
+ #[prost(uint64, optional, tag = "4")]
+ pub total_uncompressed_bytes: Option<u64>,
+
+ #[prost(uint32, optional, tag = "5")]
+ pub data_index_count: Option<u32>,
+
+ #[prost(uint32, optional, tag = "6")]
+ pub meta_index_count: Option<u32>,
+
+ #[prost(uint64, optional, tag = "7")]
+ pub entry_count: Option<u64>,
+
+ #[prost(uint32, optional, tag = "8")]
+ pub num_data_index_levels: Option<u32>,
+
+ #[prost(uint64, optional, tag = "9")]
+ pub first_data_block_offset: Option<u64>,
+
+ #[prost(uint64, optional, tag = "10")]
+ pub last_data_block_offset: Option<u64>,
+
+ #[prost(string, optional, tag = "11")]
+ pub comparator_class_name: Option<String>,
+
+ #[prost(uint32, optional, tag = "12")]
+ pub compression_codec: Option<u32>,
+
+ #[prost(bytes, optional, tag = "13")]
+ pub encryption_key: Option<Vec<u8>>,
+}
+
+/// A key-value pair with both key and value as byte arrays.
+#[derive(Clone, PartialEq, Message)]
+pub struct BytesBytesPair {
+ #[prost(bytes, required, tag = "1")]
+ pub first: Vec<u8>,
+
+ #[prost(bytes, required, tag = "2")]
+ pub second: Vec<u8>,
+}
+
+/// File info protobuf message containing a list of key-value pairs.
+#[derive(Clone, PartialEq, Message)]
+pub struct InfoProto {
+ #[prost(message, repeated, tag = "1")]
+ pub map_entry: Vec<BytesBytesPair>,
+}
diff --git a/crates/core/src/hfile/reader.rs b/crates/core/src/hfile/reader.rs
new file mode 100644
index 0000000..0195581
--- /dev/null
+++ b/crates/core/src/hfile/reader.rs
@@ -0,0 +1,1903 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! HFile reader implementation.
+
+use std::collections::BTreeMap;
+
+use crate::hfile::block::{
+ read_var_long, var_long_size_on_disk, BlockIndexEntry, DataBlock, HFileBlock, BLOCK_HEADER_SIZE,
+};
+use crate::hfile::block_type::HFileBlockType;
+use crate::hfile::compression::CompressionCodec;
+use crate::hfile::error::{HFileError, Result};
+use crate::hfile::key::{compare_keys, Key, KeyValue, Utf8Key};
+use crate::hfile::proto::InfoProto;
+use crate::hfile::record::HFileRecord;
+use crate::hfile::trailer::HFileTrailer;
+use prost::Message;
+
+/// Magic bytes indicating protobuf format in file info block
+const PBUF_MAGIC: &[u8; 4] = b"PBUF";
+
+/// Seek result codes (matching Java implementation)
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum SeekResult {
+ /// Lookup key is before the fake first key of a block but >= actual first key
+ BeforeBlockFirstKey = -2,
+ /// Lookup key is before the first key of the file
+ BeforeFileFirstKey = -1,
+ /// Exact match found
+ Found = 0,
+ /// Key not found but within range; cursor points to greatest key < lookup
+ InRange = 1,
+ /// Key is greater than the last key; EOF reached
+ Eof = 2,
+}
+
+/// File info key for last key in the file
+const FILE_INFO_LAST_KEY: &str = "hfile.LASTKEY";
+/// File info key for key-value version
+const FILE_INFO_KEY_VALUE_VERSION: &str = "KEY_VALUE_VERSION";
+/// File info key for max MVCC timestamp
+const FILE_INFO_MAX_MVCC_TS: &str = "MAX_MEMSTORE_TS_KEY";
+
+/// Key-value version indicating MVCC timestamp support
+const KEY_VALUE_VERSION_WITH_MVCC_TS: i32 = 1;
+
+/// HFile reader that supports sequential reads and seeks.
+pub struct HFileReader {
+ /// Raw file bytes
+ bytes: Vec<u8>,
+ /// Parsed trailer
+ trailer: HFileTrailer,
+ /// Compression codec from trailer
+ codec: CompressionCodec,
+ /// Data block index (first key -> entry)
+ data_block_index: BTreeMap<Key, BlockIndexEntry>,
+ /// Meta block index (name -> entry)
+ meta_block_index: BTreeMap<String, BlockIndexEntry>,
+ /// File info map
+ file_info: BTreeMap<String, Vec<u8>>,
+ /// Last key in the file
+ last_key: Option<Key>,
+ /// Current cursor position
+ cursor: Cursor,
+ /// Currently loaded data block
+ current_block: Option<DataBlock>,
+ /// Current block's index entry
+ current_block_entry: Option<BlockIndexEntry>,
+}
+
+/// Cursor tracking current position in the file.
+#[derive(Debug, Clone, Default)]
+struct Cursor {
+ /// Absolute offset in file
+ offset: usize,
+ /// Cached key-value at current position
+ cached_kv: Option<KeyValue>,
+ /// Whether we've reached EOF
+ eof: bool,
+ /// Whether seek has been called
+ seeked: bool,
+}
+
+impl HFileReader {
+ /// Create a new HFile reader from raw bytes.
+ pub fn new(bytes: Vec<u8>) -> Result<Self> {
+ let trailer = HFileTrailer::read(&bytes)?;
+ let codec = trailer.compression_codec;
+
+ let mut reader = Self {
+ bytes,
+ trailer,
+ codec,
+ data_block_index: BTreeMap::new(),
+ meta_block_index: BTreeMap::new(),
+ file_info: BTreeMap::new(),
+ last_key: None,
+ cursor: Cursor::default(),
+ current_block: None,
+ current_block_entry: None,
+ };
+
+ reader.initialize_metadata()?;
+ Ok(reader)
+ }
+
+ /// Initialize metadata by reading index blocks and file info.
+ fn initialize_metadata(&mut self) -> Result<()> {
+ // Read the "load-on-open" section starting from load_on_open_data_offset
+ let start = self.trailer.load_on_open_data_offset as usize;
+
+ // Read root data index block
+ let (data_index, offset) = self.read_root_index_block(start)?;
+ self.data_block_index = data_index;
+
+ // Handle multi-level index if needed
+ if self.trailer.num_data_index_levels > 1 {
+ self.load_multi_level_index()?;
+ }
+
+ // Read meta index block
+ let (meta_index, offset) = self.read_meta_index_block(offset)?;
+ self.meta_block_index = meta_index;
+
+ // Read file info block
+ self.read_file_info_block(offset)?;
+
+ // Parse last key from file info
+ if let Some(last_key_bytes) = self.file_info.get(FILE_INFO_LAST_KEY) {
+ self.last_key = Some(Key::from_bytes(last_key_bytes.clone()));
+ }
+
+ // Check MVCC timestamp support
+ self.check_mvcc_support()?;
+
+ Ok(())
+ }
+
+ /// Check if the file uses MVCC timestamps (not supported).
+ fn check_mvcc_support(&self) -> Result<()> {
+ if let Some(version_bytes) = self.file_info.get(FILE_INFO_KEY_VALUE_VERSION) {
+ if version_bytes.len() >= 4 {
+ let version = i32::from_be_bytes([
+ version_bytes[0],
+ version_bytes[1],
+ version_bytes[2],
+ version_bytes[3],
+ ]);
+ if version == KEY_VALUE_VERSION_WITH_MVCC_TS {
+ if let Some(ts_bytes) = self.file_info.get(FILE_INFO_MAX_MVCC_TS) {
+ if ts_bytes.len() >= 8 {
+ let max_ts = i64::from_be_bytes([
+ ts_bytes[0],
+ ts_bytes[1],
+ ts_bytes[2],
+ ts_bytes[3],
+ ts_bytes[4],
+ ts_bytes[5],
+ ts_bytes[6],
+ ts_bytes[7],
+ ]);
+ if max_ts > 0 {
+ return Err(HFileError::UnsupportedMvccTimestamp);
+ }
+ }
+ }
+ }
+ }
+ }
+ Ok(())
+ }
+
+ /// Read root index block and return the index entries.
+ fn read_root_index_block(
+ &self,
+ start: usize,
+ ) -> Result<(BTreeMap<Key, BlockIndexEntry>, usize)> {
+ let block = HFileBlock::parse(&self.bytes[start..], self.codec)?;
+ if block.block_type() != HFileBlockType::RootIndex {
+ return Err(HFileError::UnexpectedBlockType {
+ expected: HFileBlockType::RootIndex.to_string(),
+ actual: block.block_type().to_string(),
+ });
+ }
+
+ let entries = self.parse_root_index_entries(
+ &block.data,
+ self.trailer.data_index_count as usize,
+ false,
+ )?;
+ let next_offset = start + block.header.on_disk_size_with_header();
+
+ // Convert entries to BTreeMap
+ let mut index_map = BTreeMap::new();
+ for i in 0..entries.len() {
+ let entry = &entries[i];
+ let next_key = if i + 1 < entries.len() {
+ Some(entries[i + 1].first_key.clone())
+ } else {
+ None
+ };
+ index_map.insert(
+ entry.first_key.clone(),
+ BlockIndexEntry::new(entry.first_key.clone(), next_key, entry.offset, entry.size),
+ );
+ }
+
+ Ok((index_map, next_offset))
+ }
+
+ /// Load multi-level data block index (BFS traversal).
+ fn load_multi_level_index(&mut self) -> Result<()> {
+ let mut levels_remaining = self.trailer.num_data_index_levels - 1;
+ let mut current_entries: Vec<BlockIndexEntry> =
+ self.data_block_index.values().cloned().collect();
+
+ while levels_remaining > 0 {
+ let mut next_level_entries = Vec::new();
+
+ for entry in ¤t_entries {
+ let block = self.read_block_at(entry.offset as usize, entry.size as usize)?;
+
+ let entries = self.parse_leaf_index_entries(&block.data)?;
+
+ next_level_entries.extend(entries);
+ }
+
+ current_entries = next_level_entries;
+ levels_remaining -= 1;
+ }
+
+ // Build final index map from leaf entries
+ let mut index_map = BTreeMap::new();
+ for i in 0..current_entries.len() {
+ let entry = ¤t_entries[i];
+ let next_key = if i + 1 < current_entries.len() {
+ Some(current_entries[i + 1].first_key.clone())
+ } else {
+ None
+ };
+ index_map.insert(
+ entry.first_key.clone(),
+ BlockIndexEntry::new(entry.first_key.clone(), next_key, entry.offset, entry.size),
+ );
+ }
+
+ self.data_block_index = index_map;
+ Ok(())
+ }
+
+ /// Parse root index entries from block data.
+ fn parse_root_index_entries(
+ &self,
+ data: &[u8],
+ num_entries: usize,
+ content_key_only: bool,
+ ) -> Result<Vec<BlockIndexEntry>> {
+ let mut entries = Vec::with_capacity(num_entries);
+ let mut offset = 0;
+
+ for _ in 0..num_entries {
+ // Read offset (8 bytes)
+ let block_offset = i64::from_be_bytes([
+ data[offset],
+ data[offset + 1],
+ data[offset + 2],
+ data[offset + 3],
+ data[offset + 4],
+ data[offset + 5],
+ data[offset + 6],
+ data[offset + 7],
+ ]) as u64;
+ offset += 8;
+
+ // Read size (4 bytes)
+ let block_size = i32::from_be_bytes([
+ data[offset],
+ data[offset + 1],
+ data[offset + 2],
+ data[offset + 3],
+ ]) as u32;
+ offset += 4;
+
+ // Read key length (varint)
+ let var_len_size = var_long_size_on_disk(data, offset);
+ let (key_length, _) = read_var_long(data, offset);
+ offset += var_len_size;
+
+ // Read key bytes
+ let key_bytes = data[offset..offset + key_length as usize].to_vec();
+ offset += key_length as usize;
+
+ let key = if content_key_only {
+ // For meta index: key is just the content
+ Key::from_bytes(key_bytes)
+ } else {
+ // For data index: key has structure (length prefix + content + other info)
+ Key::new(&key_bytes, 0, key_bytes.len())
+ };
+
+ entries.push(BlockIndexEntry::new(key, None, block_offset, block_size));
+ }
+
+ Ok(entries)
+ }
+
+ /// Parse leaf index entries from block data.
+ fn parse_leaf_index_entries(&self, data: &[u8]) -> Result<Vec<BlockIndexEntry>> {
+ let mut entries = Vec::new();
+ let mut offset = 0;
+
+ // Read number of entries (4 bytes)
+ let num_entries = i32::from_be_bytes([data[0], data[1], data[2], data[3]]) as usize;
+ offset += 4;
+
+ // Read secondary index (offsets to entries)
+ let mut relative_offsets = Vec::with_capacity(num_entries + 1);
+ for _ in 0..=num_entries {
+ let rel_offset = i32::from_be_bytes([
+ data[offset],
+ data[offset + 1],
+ data[offset + 2],
+ data[offset + 3],
+ ]) as usize;
+ relative_offsets.push(rel_offset);
+ offset += 4;
+ }
+
+ let base_offset = offset;
+
+ // Read entries
+ for i in 0..num_entries {
+ // Read offset (8 bytes)
+ let block_offset = i64::from_be_bytes([
+ data[offset],
+ data[offset + 1],
+ data[offset + 2],
+ data[offset + 3],
+ data[offset + 4],
+ data[offset + 5],
+ data[offset + 6],
+ data[offset + 7],
+ ]) as u64;
+
+ // Read size (4 bytes)
+ let block_size = i32::from_be_bytes([
+ data[offset + 8],
+ data[offset + 9],
+ data[offset + 10],
+ data[offset + 11],
+ ]) as u32;
+
+ // Key is from offset+12 to next entry
+ let key_start = offset + 12;
+ let next_entry_start = base_offset + relative_offsets[i + 1];
+ let key_length = next_entry_start - key_start;
+
+ let key_bytes = data[key_start..key_start + key_length].to_vec();
+ let key = Key::new(&key_bytes, 0, key_bytes.len());
+
+ entries.push(BlockIndexEntry::new(key, None, block_offset, block_size));
+ offset = next_entry_start;
+ }
+
+ Ok(entries)
+ }
+
+ /// Read meta index block.
+ fn read_meta_index_block(
+ &self,
+ start: usize,
+ ) -> Result<(BTreeMap<String, BlockIndexEntry>, usize)> {
+ let block = HFileBlock::parse(&self.bytes[start..], self.codec)?;
+ if block.block_type() != HFileBlockType::RootIndex {
+ return Err(HFileError::UnexpectedBlockType {
+ expected: HFileBlockType::RootIndex.to_string(),
+ actual: block.block_type().to_string(),
+ });
+ }
+
+ let entries = self.parse_root_index_entries(
+ &block.data,
+ self.trailer.meta_index_count as usize,
+ true,
+ )?;
+ let next_offset = start + block.header.on_disk_size_with_header();
+
+ // Convert to string-keyed map
+ let mut index_map = BTreeMap::new();
+ for entry in entries {
+ let key_str = String::from_utf8_lossy(entry.first_key.content()).to_string();
+ index_map.insert(key_str, entry);
+ }
+
+ Ok((index_map, next_offset))
+ }
+
+ /// Read file info block.
+ fn read_file_info_block(&mut self, start: usize) -> Result<()> {
+ let block = HFileBlock::parse(&self.bytes[start..], self.codec)?;
+ if block.block_type() != HFileBlockType::FileInfo {
+ return Err(HFileError::UnexpectedBlockType {
+ expected: HFileBlockType::FileInfo.to_string(),
+ actual: block.block_type().to_string(),
+ });
+ }
+
+ // Check PBUF magic
+ if block.data.len() < 4 || &block.data[0..4] != PBUF_MAGIC {
+ return Err(HFileError::InvalidFormat(
+ "File info block missing PBUF magic".to_string(),
+ ));
+ }
+
+ // Parse protobuf (length-delimited after magic)
+ let proto_data = &block.data[4..];
+ let (length, consumed) = read_varint(proto_data);
+ let info_proto = InfoProto::decode(&proto_data[consumed..consumed + length as usize])?;
+
+ // Build file info map
+ for entry in info_proto.map_entry {
+ let key = String::from_utf8_lossy(&entry.first).to_string();
+ self.file_info.insert(key, entry.second);
+ }
+
+ Ok(())
+ }
+
+ /// Read a block at the given offset and size.
+ fn read_block_at(&self, offset: usize, size: usize) -> Result<HFileBlock> {
+ HFileBlock::parse(&self.bytes[offset..offset + size], self.codec)
+ }
+
+ /// Get the number of key-value entries in the file.
+ pub fn num_entries(&self) -> u64 {
+ self.trailer.entry_count
+ }
+
+ /// Get file info value by key.
+ pub fn get_file_info(&self, key: &str) -> Option<&[u8]> {
+ self.file_info.get(key).map(|v| v.as_slice())
+ }
+
+ /// Get meta block content by name.
+ pub fn get_meta_block(&self, name: &str) -> Result<Option<Vec<u8>>> {
+ let entry = match self.meta_block_index.get(name) {
+ Some(e) => e,
+ None => return Ok(None),
+ };
+
+ let block = self.read_block_at(entry.offset as usize, entry.size as usize)?;
+ if block.block_type() != HFileBlockType::Meta {
+ return Err(HFileError::UnexpectedBlockType {
+ expected: HFileBlockType::Meta.to_string(),
+ actual: block.block_type().to_string(),
+ });
+ }
+
+ Ok(Some(block.data))
+ }
+
+ /// Seek to the beginning of the file.
+ pub fn seek_to_first(&mut self) -> Result<bool> {
+ if self.trailer.entry_count == 0 {
+ self.cursor.eof = true;
+ self.cursor.seeked = true;
+ return Ok(false);
+ }
+
+ // Get first data block
+ let first_entry = match self.data_block_index.first_key_value() {
+ Some((_, entry)) => entry.clone(),
+ None => {
+ self.cursor.eof = true;
+ self.cursor.seeked = true;
+ return Ok(false);
+ }
+ };
+
+ self.current_block_entry = Some(first_entry.clone());
+ self.load_data_block(&first_entry)?;
+
+ self.cursor.offset = first_entry.offset as usize + BLOCK_HEADER_SIZE;
+ self.cursor.cached_kv = None;
+ self.cursor.eof = false;
+ self.cursor.seeked = true;
+
+ Ok(true)
+ }
+
+ /// Seek to the given key.
+ pub fn seek_to(&mut self, lookup_key: &Utf8Key) -> Result<SeekResult> {
+ if !self.cursor.seeked {
+ self.seek_to_first()?;
+ }
+
+ if self.trailer.entry_count == 0 {
+ return Ok(SeekResult::Eof);
+ }
+
+ // Get current key-value
+ let current_kv = match self.get_key_value()? {
+ Some(kv) => kv,
+ None => return Ok(SeekResult::Eof),
+ };
+
+ let cmp_current = compare_keys(current_kv.key(), lookup_key);
+
+ match cmp_current {
+ std::cmp::Ordering::Equal => Ok(SeekResult::Found),
+ std::cmp::Ordering::Greater => {
+ // Current key > lookup key: backward seek
+ // Check if we're at the first key of a block and lookup >= fake first key
+ if let Some(entry) = &self.current_block_entry {
+ if self.is_at_first_key_of_block()
+ && compare_keys(&entry.first_key, lookup_key) != std::cmp::Ordering::Greater
+ {
+ return Ok(SeekResult::BeforeBlockFirstKey);
+ }
+ }
+
+ // Check if before file's first key
+ if self.data_block_index.first_key_value().is_some()
+ && self.is_at_first_key_of_block()
+ {
+ return Ok(SeekResult::BeforeFileFirstKey);
+ }
+
+ Err(HFileError::BackwardSeekNotSupported)
+ }
+ std::cmp::Ordering::Less => {
+ // Current key < lookup key: forward seek
+ self.forward_seek(lookup_key)
+ }
+ }
+ }
+
+ /// Forward seek to find the lookup key.
+ fn forward_seek(&mut self, lookup_key: &Utf8Key) -> Result<SeekResult> {
+ // Check if we need to jump to a different block
+ if let Some(entry) = &self.current_block_entry {
+ if let Some(next_key) = &entry.next_block_first_key {
+ if compare_keys(next_key, lookup_key) != std::cmp::Ordering::Greater {
+ // Need to find the right block
+ self.find_block_for_key(lookup_key)?;
+ }
+ } else {
+ // Last block - check against last key
+ if let Some(last_key) = &self.last_key {
+ if compare_keys(last_key, lookup_key) == std::cmp::Ordering::Less {
+ self.cursor.eof = true;
+ self.current_block = None;
+ self.current_block_entry = None;
+ return Ok(SeekResult::Eof);
+ }
+ }
+ }
+ }
+
+ // Scan within the current block
+ self.scan_block_for_key(lookup_key)
+ }
+
+ /// Find the block that may contain the lookup key.
+ fn find_block_for_key(&mut self, lookup_key: &Utf8Key) -> Result<()> {
+ // Binary search using BTreeMap's range
+ let lookup_bytes = lookup_key.as_bytes();
+ let fake_key = Key::from_bytes(lookup_bytes.to_vec());
+
+ // Find the entry with greatest key <= lookup_key
+ let entry = self
+ .data_block_index
+ .range(..=fake_key)
+ .next_back()
+ .map(|(_, e)| e.clone());
+
+ if let Some(entry) = entry {
+ self.current_block_entry = Some(entry.clone());
+ self.load_data_block(&entry)?;
+ self.cursor.offset = entry.offset as usize + BLOCK_HEADER_SIZE;
+ self.cursor.cached_kv = None;
+ }
+
+ Ok(())
+ }
+
+ /// Scan within the current block to find the key.
+ /// Uses iteration instead of recursion to avoid stack overflow with many blocks.
+ fn scan_block_for_key(&mut self, lookup_key: &Utf8Key) -> Result<SeekResult> {
+ loop {
+ let block = match &self.current_block {
+ Some(b) => b,
+ None => return Ok(SeekResult::Eof),
+ };
+
+ let block_start = self.current_block_entry.as_ref().unwrap().offset as usize;
+ let mut offset = self.cursor.offset - block_start - BLOCK_HEADER_SIZE;
+ let mut last_offset = offset;
+ let mut last_kv = self.cursor.cached_kv.clone();
+
+ while block.is_valid_offset(offset) {
+ let kv = block.read_key_value(offset);
+ let cmp = compare_keys(kv.key(), lookup_key);
+
+ match cmp {
+ std::cmp::Ordering::Equal => {
+ self.cursor.offset = block_start + BLOCK_HEADER_SIZE + offset;
+ self.cursor.cached_kv = Some(kv);
+ return Ok(SeekResult::Found);
+ }
+ std::cmp::Ordering::Greater => {
+ // Key at offset > lookup key
+ // Set cursor to previous position
+ if let Some(prev_kv) = last_kv {
+ self.cursor.offset = block_start + BLOCK_HEADER_SIZE + last_offset;
+ self.cursor.cached_kv = Some(prev_kv);
+ }
+ if self.is_at_first_key_of_block() {
+ return Ok(SeekResult::BeforeBlockFirstKey);
+ }
+ return Ok(SeekResult::InRange);
+ }
+ std::cmp::Ordering::Less => {
+ last_offset = offset;
+ last_kv = Some(kv.clone());
+ offset += kv.record_size();
+ }
+ }
+ }
+
+ // Reached end of block - need to check if there are more blocks
+ let current_entry = self.current_block_entry.clone().unwrap();
+ let next_entry = self.get_next_block_entry(¤t_entry);
+
+ match next_entry {
+ Some(entry) => {
+ // Move to next block and continue scanning (iterate instead of recurse)
+ self.current_block_entry = Some(entry.clone());
+ self.load_data_block(&entry)?;
+ self.cursor.offset = entry.offset as usize + BLOCK_HEADER_SIZE;
+ self.cursor.cached_kv = None;
+ // Continue the loop to scan the next block
+ }
+ None => {
+ // No more blocks - this is the last block
+ // Check if lookup key is past the last key in the file
+ if let Some(kv) = last_kv {
+ if compare_keys(kv.key(), lookup_key) == std::cmp::Ordering::Less {
+ // We're past the last key in the file
+ self.cursor.eof = true;
+ self.cursor.cached_kv = None;
+ return Ok(SeekResult::Eof);
+ }
+ // Otherwise, stay at the last key
+ self.cursor.offset = block_start + BLOCK_HEADER_SIZE + last_offset;
+ self.cursor.cached_kv = Some(kv);
+ }
+ return Ok(SeekResult::InRange);
+ }
+ }
+ }
+ }
+
+ /// Load a data block.
+ fn load_data_block(&mut self, entry: &BlockIndexEntry) -> Result<()> {
+ let block = self.read_block_at(entry.offset as usize, entry.size as usize)?;
+ if block.block_type() != HFileBlockType::Data {
+ return Err(HFileError::UnexpectedBlockType {
+ expected: HFileBlockType::Data.to_string(),
+ actual: block.block_type().to_string(),
+ });
+ }
+ self.current_block = Some(DataBlock::from_block(block));
+ Ok(())
+ }
+
+ /// Move to the next key-value pair.
+ #[allow(clippy::should_implement_trait)]
+ pub fn next(&mut self) -> Result<bool> {
+ if !self.cursor.seeked || self.cursor.eof {
+ return Ok(false);
+ }
+
+ let block = match &self.current_block {
+ Some(b) => b,
+ None => return Ok(false),
+ };
+
+ let block_start = self.current_block_entry.as_ref().unwrap().offset as usize;
+ let current_offset = self.cursor.offset - block_start - BLOCK_HEADER_SIZE;
+
+ // Get current key-value to calculate next offset
+ let kv = if let Some(cached) = &self.cursor.cached_kv {
+ cached.clone()
+ } else {
+ block.read_key_value(current_offset)
+ };
+
+ let next_offset = current_offset + kv.record_size();
+
+ if block.is_valid_offset(next_offset) {
+ self.cursor.offset = block_start + BLOCK_HEADER_SIZE + next_offset;
+ self.cursor.cached_kv = None;
+ return Ok(true);
+ }
+
+ // Move to next block
+ let current_entry = self.current_block_entry.clone().unwrap();
+ let next_entry = self.get_next_block_entry(¤t_entry);
+
+ match next_entry {
+ Some(entry) => {
+ self.current_block_entry = Some(entry.clone());
+ self.load_data_block(&entry)?;
+ self.cursor.offset = entry.offset as usize + BLOCK_HEADER_SIZE;
+ self.cursor.cached_kv = None;
+ Ok(true)
+ }
+ None => {
+ self.cursor.eof = true;
+ Ok(false)
+ }
+ }
+ }
+
+ /// Get the next block index entry.
+ fn get_next_block_entry(&self, current: &BlockIndexEntry) -> Option<BlockIndexEntry> {
+ self.data_block_index
+ .range((
+ std::ops::Bound::Excluded(¤t.first_key),
+ std::ops::Bound::Unbounded,
+ ))
+ .next()
+ .map(|(_, e)| e.clone())
+ }
+
+ /// Get the current key-value pair.
+ pub fn get_key_value(&mut self) -> Result<Option<KeyValue>> {
+ if !self.cursor.seeked || self.cursor.eof {
+ return Ok(None);
+ }
+
+ if let Some(cached) = &self.cursor.cached_kv {
+ return Ok(Some(cached.clone()));
+ }
+
+ let block = match &self.current_block {
+ Some(b) => b,
+ None => {
+ // Need to load the block
+ let entry = self.current_block_entry.clone().unwrap();
+ self.load_data_block(&entry)?;
+ self.current_block.as_ref().unwrap()
+ }
+ };
+
+ let block_start = self.current_block_entry.as_ref().unwrap().offset as usize;
+ let offset = self.cursor.offset - block_start - BLOCK_HEADER_SIZE;
+
+ let kv = block.read_key_value(offset);
+ self.cursor.cached_kv = Some(kv.clone());
+
+ Ok(Some(kv))
+ }
+
+ /// Check if cursor is at the first key of the current block.
+ fn is_at_first_key_of_block(&self) -> bool {
+ if let Some(entry) = &self.current_block_entry {
+ return self.cursor.offset == entry.offset as usize + BLOCK_HEADER_SIZE;
+ }
+ false
+ }
+
+ /// Check if the reader has been seeked.
+ pub fn is_seeked(&self) -> bool {
+ self.cursor.seeked
+ }
+
+ /// Iterate over all key-value pairs.
+ pub fn iter(&mut self) -> Result<HFileIterator<'_>> {
+ self.seek_to_first()?;
+ Ok(HFileIterator { reader: self })
+ }
+
+ // ================== HFileRecord API for MDT ==================
+
+ /// Convert a KeyValue to an owned HFileRecord.
+ ///
+ /// This extracts the key content (without length prefix) and value bytes
+ /// into an owned struct suitable for MDT operations.
+ fn key_value_to_record(kv: &KeyValue) -> HFileRecord {
+ HFileRecord::new(kv.key().content().to_vec(), kv.value().to_vec())
+ }
+
+ /// Collect all records from the HFile as owned HFileRecords.
+ ///
+ /// This is useful for MDT operations where records need to be stored
+ /// and merged with log file records.
+ ///
+ /// # Example
+ /// ```ignore
+ /// let records = reader.collect_records()?;
+ /// for record in records {
+ /// println!("Key: {}", record.key_as_str().unwrap_or("<binary>"));
+ /// }
+ /// ```
+ pub fn collect_records(&mut self) -> Result<Vec<HFileRecord>> {
+ let mut records = Vec::with_capacity(self.trailer.entry_count as usize);
+ for result in self.iter()? {
+ let kv = result?;
+ records.push(Self::key_value_to_record(&kv));
+ }
+ Ok(records)
+ }
+
+ /// Iterate over all records as owned HFileRecords.
+ ///
+ /// Unlike `iter()` which returns references into file bytes,
+ /// this iterator yields owned `HFileRecord` instances.
+ pub fn record_iter(&mut self) -> Result<HFileRecordIterator<'_>> {
+ self.seek_to_first()?;
+ Ok(HFileRecordIterator { reader: self })
+ }
+
+ /// Get the current position's record as an owned HFileRecord.
+ ///
+ /// Returns None if not seeked or at EOF.
+ pub fn get_record(&mut self) -> Result<Option<HFileRecord>> {
+ match self.get_key_value()? {
+ Some(kv) => Ok(Some(Self::key_value_to_record(&kv))),
+ None => Ok(None),
+ }
+ }
+
+ /// Lookup records by keys and return as HFileRecords.
+ ///
+ /// Keys must be sorted in ascending order. This method efficiently
+ /// scans forward through the file to find matching keys.
+ ///
+ /// Returns a vector of (key, Option<HFileRecord>) tuples where
+ /// the Option is Some if the key was found.
+ pub fn lookup_records(&mut self, keys: &[&str]) -> Result<Vec<(String, Option<HFileRecord>)>> {
+ let mut results = Vec::with_capacity(keys.len());
+
+ if keys.is_empty() {
+ return Ok(results);
+ }
+
+ self.seek_to_first()?;
+ if self.cursor.eof {
+ // Empty file - return all as not found
+ for key in keys {
+ results.push((key.to_string(), None));
+ }
+ return Ok(results);
+ }
+
+ for key in keys {
+ let lookup = Utf8Key::new(*key);
+ match self.seek_to(&lookup)? {
+ SeekResult::Found => {
+ let record = self.get_record()?;
+ results.push((key.to_string(), record));
+ }
+ _ => {
+ results.push((key.to_string(), None));
+ }
+ }
+ }
+
+ Ok(results)
+ }
+
+ /// Collect records matching a key prefix.
+ ///
+ /// Returns all records where the key starts with the given prefix.
+ pub fn collect_records_by_prefix(&mut self, prefix: &str) -> Result<Vec<HFileRecord>> {
+ let mut records = Vec::new();
+ let prefix_bytes = prefix.as_bytes();
+
+ // Seek to the prefix (or first key >= prefix)
+ let lookup = Utf8Key::new(prefix);
+ self.seek_to_first()?;
+
+ if self.cursor.eof {
+ return Ok(records);
+ }
+
+ // Find starting position
+ let start_result = self.seek_to(&lookup)?;
+ match start_result {
+ SeekResult::Eof => return Ok(records),
+ SeekResult::Found | SeekResult::InRange | SeekResult::BeforeBlockFirstKey => {
+ // We may be at or past a matching key
+ }
+ SeekResult::BeforeFileFirstKey => {
+ // Key is before first key, move to first
+ self.seek_to_first()?;
+ }
+ }
+
+ // Scan and collect records with matching prefix
+ loop {
+ if self.cursor.eof {
+ break;
+ }
+
+ match self.get_key_value()? {
+ Some(kv) => {
+ let key_content = kv.key().content();
+ if key_content.starts_with(prefix_bytes) {
+ records.push(Self::key_value_to_record(&kv));
+ } else if key_content > prefix_bytes {
+ // Past the prefix range
+ break;
+ }
+ }
+ None => break,
+ }
+
+ if !self.next()? {
+ break;
+ }
+ }
+
+ Ok(records)
+ }
+}
+
+/// Iterator over all records as owned HFileRecords.
+pub struct HFileRecordIterator<'a> {
+ reader: &'a mut HFileReader,
+}
+
+impl<'a> Iterator for HFileRecordIterator<'a> {
+ type Item = Result<HFileRecord>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ if self.reader.cursor.eof {
+ return None;
+ }
+
+ match self.reader.get_key_value() {
+ Ok(Some(kv)) => {
+ let record = HFileReader::key_value_to_record(&kv);
+ match self.reader.next() {
+ Ok(_) => {}
+ Err(e) => return Some(Err(e)),
+ }
+ Some(Ok(record))
+ }
+ Ok(None) => None,
+ Err(e) => Some(Err(e)),
+ }
+ }
+}
+
+/// Iterator over all key-value pairs in an HFile.
+pub struct HFileIterator<'a> {
+ reader: &'a mut HFileReader,
+}
+
+impl<'a> Iterator for HFileIterator<'a> {
+ type Item = Result<KeyValue>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ if self.reader.cursor.eof {
+ return None;
+ }
+
+ match self.reader.get_key_value() {
+ Ok(Some(kv)) => {
+ match self.reader.next() {
+ Ok(_) => {}
+ Err(e) => return Some(Err(e)),
+ }
+ Some(Ok(kv))
+ }
+ Ok(None) => None,
+ Err(e) => Some(Err(e)),
+ }
+ }
+}
+
+/// Read a varint from bytes. Returns (value, bytes_consumed).
+fn read_varint(bytes: &[u8]) -> (u64, usize) {
+ let mut result: u64 = 0;
+ let mut shift = 0;
+ let mut pos = 0;
+
+ while pos < bytes.len() {
+ let b = bytes[pos] as u64;
+ pos += 1;
+ result |= (b & 0x7F) << shift;
+ if b & 0x80 == 0 {
+ break;
+ }
+ shift += 7;
+ }
+
+ (result, pos)
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::path::PathBuf;
+
+ fn test_data_dir() -> PathBuf {
+ PathBuf::from(env!("CARGO_MANIFEST_DIR"))
+ .join("tests")
+ .join("data")
+ .join("hfile")
+ }
+
+ fn read_test_hfile(filename: &str) -> Vec<u8> {
+ let path = test_data_dir().join(filename);
+ std::fs::read(&path).unwrap_or_else(|_| panic!("Failed to read test file: {:?}", path))
+ }
+
+ #[test]
+ fn test_read_uncompressed_hfile() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+ let reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ // Check entry count
+ assert_eq!(reader.num_entries(), 5000);
+ }
+
+ #[test]
+ fn test_read_gzip_hfile() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_GZ_20000.hfile");
+ let reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ // Check entry count
+ assert_eq!(reader.num_entries(), 20000);
+ }
+
+ #[test]
+ fn test_read_empty_hfile() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_no_entry.hfile");
+ let reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ // Check entry count
+ assert_eq!(reader.num_entries(), 0);
+ }
+
+ #[test]
+ fn test_seek_to_first_uncompressed() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ // Seek to first
+ assert!(reader.seek_to_first().expect("Failed to seek"));
+
+ // Get first key-value
+ let kv = reader.get_key_value().expect("Failed to get kv").unwrap();
+ assert_eq!(kv.key().content_as_str().unwrap(), "hudi-key-000000000");
+
+ let value = std::str::from_utf8(kv.value()).unwrap();
+ assert_eq!(value, "hudi-value-000000000");
+ }
+
+ #[test]
+ fn test_sequential_read_uncompressed() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ // Seek to first
+ assert!(reader.seek_to_first().expect("Failed to seek"));
+
+ // Read first 10 entries
+ for i in 0..10 {
+ let kv = reader.get_key_value().expect("Failed to get kv").unwrap();
+ let expected_key = format!("hudi-key-{:09}", i);
+ let expected_value = format!("hudi-value-{:09}", i);
+
+ assert_eq!(kv.key().content_as_str().unwrap(), expected_key);
+ assert_eq!(std::str::from_utf8(kv.value()).unwrap(), expected_value);
+
+ if i < 9 {
+ assert!(reader.next().expect("Failed to move next"));
+ }
+ }
+ }
+
+ #[test]
+ fn test_seek_to_key_exact() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ // Seek to first
+ assert!(reader.seek_to_first().expect("Failed to seek"));
+
+ // Seek to specific key
+ let lookup = Utf8Key::new("hudi-key-000000100");
+ let result = reader.seek_to(&lookup).expect("Failed to seek");
+ assert_eq!(result, SeekResult::Found);
+
+ // Verify key
+ let kv = reader.get_key_value().expect("Failed to get kv").unwrap();
+ assert_eq!(kv.key().content_as_str().unwrap(), "hudi-key-000000100");
+ }
+
+ #[test]
+ fn test_seek_to_key_eof() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ // Seek to first
+ assert!(reader.seek_to_first().expect("Failed to seek"));
+
+ // Seek past last key
+ let lookup = Utf8Key::new("hudi-key-999999999");
+ let result = reader.seek_to(&lookup).expect("Failed to seek");
+ assert_eq!(result, SeekResult::Eof);
+ }
+
+ #[test]
+ fn test_seek_to_key_before_first() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ // Seek to first
+ assert!(reader.seek_to_first().expect("Failed to seek"));
+
+ // Seek before first key
+ let lookup = Utf8Key::new("aaa");
+ let result = reader.seek_to(&lookup).expect("Failed to seek");
+ assert_eq!(result, SeekResult::BeforeFileFirstKey);
+ }
+
+ #[test]
+ fn test_iterate_all_entries() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ let mut count = 0;
+ for result in reader.iter().expect("Failed to create iterator") {
+ let kv = result.expect("Failed to read kv");
+ let expected_key = format!("hudi-key-{:09}", count);
+ assert_eq!(kv.key().content_as_str().unwrap(), expected_key);
+ count += 1;
+ }
+
+ assert_eq!(count, 5000);
+ }
+
+ #[test]
+ fn test_empty_hfile_seek() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_no_entry.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ // Seek to first should return false
+ assert!(!reader.seek_to_first().expect("Failed to seek"));
+
+ // Get key-value should return None
+ assert!(reader.get_key_value().expect("Failed to get kv").is_none());
+ }
+
+ #[test]
+ fn test_gzip_sequential_read() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_GZ_20000.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ // Seek to first
+ assert!(reader.seek_to_first().expect("Failed to seek"));
+
+ // Read first 10 entries
+ for i in 0..10 {
+ let kv = reader.get_key_value().expect("Failed to get kv").unwrap();
+ let expected_key = format!("hudi-key-{:09}", i);
+ let expected_value = format!("hudi-value-{:09}", i);
+
+ assert_eq!(kv.key().content_as_str().unwrap(), expected_key);
+ assert_eq!(std::str::from_utf8(kv.value()).unwrap(), expected_value);
+
+ if i < 9 {
+ assert!(reader.next().expect("Failed to move next"));
+ }
+ }
+ }
+
+ // ================== HFileRecord Tests ==================
+
+ #[test]
+ fn test_collect_records() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ let records = reader.collect_records().expect("Failed to collect records");
+ assert_eq!(records.len(), 5000);
+
+ // Verify first and last records
+ assert_eq!(records[0].key_as_str(), Some("hudi-key-000000000"));
+ assert_eq!(
+ std::str::from_utf8(records[0].value()).unwrap(),
+ "hudi-value-000000000"
+ );
+
+ assert_eq!(records[4999].key_as_str(), Some("hudi-key-000004999"));
+ assert_eq!(
+ std::str::from_utf8(records[4999].value()).unwrap(),
+ "hudi-value-000004999"
+ );
+ }
+
+ #[test]
+ fn test_record_iterator() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ let mut count = 0;
+ for result in reader.record_iter().expect("Failed to create iterator") {
+ let record = result.expect("Failed to read record");
+ let expected_key = format!("hudi-key-{:09}", count);
+ assert_eq!(record.key_as_str(), Some(expected_key.as_str()));
+ count += 1;
+ }
+
+ assert_eq!(count, 5000);
+ }
+
+ #[test]
+ fn test_get_record() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ // Before seeking, should return None
+ assert!(reader.get_record().expect("Failed to get record").is_none());
+
+ // After seeking, should return a record
+ reader.seek_to_first().expect("Failed to seek");
+ let record = reader.get_record().expect("Failed to get record").unwrap();
+ assert_eq!(record.key_as_str(), Some("hudi-key-000000000"));
+ }
+
+ #[test]
+ fn test_lookup_records() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ let keys = vec![
+ "hudi-key-000000000",
+ "hudi-key-000000100",
+ "hudi-key-nonexistent",
+ ];
+ let results = reader.lookup_records(&keys).expect("Failed to lookup");
+
+ assert_eq!(results.len(), 3);
+
+ // First key should be found
+ assert_eq!(results[0].0, "hudi-key-000000000");
+ assert!(results[0].1.is_some());
+ assert_eq!(
+ results[0].1.as_ref().unwrap().key_as_str(),
+ Some("hudi-key-000000000")
+ );
+
+ // Second key should be found
+ assert_eq!(results[1].0, "hudi-key-000000100");
+ assert!(results[1].1.is_some());
+ assert_eq!(
+ results[1].1.as_ref().unwrap().key_as_str(),
+ Some("hudi-key-000000100")
+ );
+
+ // Third key should not be found
+ assert_eq!(results[2].0, "hudi-key-nonexistent");
+ assert!(results[2].1.is_none());
+ }
+
+ #[test]
+ fn test_collect_records_by_prefix() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ // Collect records with prefix "hudi-key-00000010" (should match 100-109)
+ // Keys are 9-digit padded, so "000000100" to "000000109" match this prefix
+ let records = reader
+ .collect_records_by_prefix("hudi-key-00000010")
+ .expect("Failed to collect by prefix");
+
+ assert_eq!(records.len(), 10);
+ for (i, record) in records.iter().enumerate() {
+ let expected = format!("hudi-key-{:09}", 100 + i);
+ assert_eq!(record.key_as_str(), Some(expected.as_str()));
+ }
+ }
+
+ #[test]
+ fn test_collect_records_empty_file() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_no_entry.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ let records = reader.collect_records().expect("Failed to collect records");
+ assert!(records.is_empty());
+ }
+
+ #[test]
+ fn test_hfile_record_ownership() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ // Collect some records
+ reader.seek_to_first().expect("Failed to seek");
+ let record1 = reader.get_record().expect("Failed to get record").unwrap();
+ reader.next().expect("Failed to move next");
+ let record2 = reader.get_record().expect("Failed to get record").unwrap();
+
+ // Records should be independent (owned data)
+ assert_ne!(record1.key(), record2.key());
+ assert_eq!(record1.key_as_str(), Some("hudi-key-000000000"));
+ assert_eq!(record2.key_as_str(), Some("hudi-key-000000001"));
+
+ // Can use records after reader has moved
+ drop(reader);
+ assert_eq!(record1.key_as_str(), Some("hudi-key-000000000"));
+ }
+
+ // ================== Additional Test Files ==================
+
+ // Priority 1: Different Block Sizes
+
+ #[test]
+ fn test_read_512kb_blocks_gzip() {
+ // 512KB block size, GZIP compression, 20000 entries
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_512KB_GZ_20000.hfile");
+ let reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ assert_eq!(reader.num_entries(), 20000);
+ }
+
+ #[test]
+ fn test_512kb_blocks_sequential_read() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_512KB_GZ_20000.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ assert!(reader.seek_to_first().expect("Failed to seek"));
+
+ // Read first 10 entries
+ for i in 0..10 {
+ let kv = reader.get_key_value().expect("Failed to get kv").unwrap();
+ let expected_key = format!("hudi-key-{:09}", i);
+ let expected_value = format!("hudi-value-{:09}", i);
+
+ assert_eq!(kv.key().content_as_str().unwrap(), expected_key);
+ assert_eq!(std::str::from_utf8(kv.value()).unwrap(), expected_value);
+
+ if i < 9 {
+ assert!(reader.next().expect("Failed to move next"));
+ }
+ }
+ }
+
+ #[test]
+ fn test_512kb_blocks_seek() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_512KB_GZ_20000.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ assert!(reader.seek_to_first().expect("Failed to seek"));
+
+ // Seek to key in second block (block 0 ends at ~8886)
+ let lookup = Utf8Key::new("hudi-key-000008888");
+ let result = reader.seek_to(&lookup).expect("Failed to seek");
+ assert_eq!(result, SeekResult::Found);
+
+ let kv = reader.get_key_value().expect("Failed to get kv").unwrap();
+ assert_eq!(kv.key().content_as_str().unwrap(), "hudi-key-000008888");
+ }
+
+ #[test]
+ fn test_read_64kb_blocks_uncompressed() {
+ // 64KB block size, no compression, 5000 entries
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_64KB_NONE_5000.hfile");
+ let reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ assert_eq!(reader.num_entries(), 5000);
+ }
+
+ #[test]
+ fn test_64kb_blocks_sequential_read() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_64KB_NONE_5000.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ assert!(reader.seek_to_first().expect("Failed to seek"));
+
+ // Read first 10 entries
+ for i in 0..10 {
+ let kv = reader.get_key_value().expect("Failed to get kv").unwrap();
+ let expected_key = format!("hudi-key-{:09}", i);
+ let expected_value = format!("hudi-value-{:09}", i);
+
+ assert_eq!(kv.key().content_as_str().unwrap(), expected_key);
+ assert_eq!(std::str::from_utf8(kv.value()).unwrap(), expected_value);
+
+ if i < 9 {
+ assert!(reader.next().expect("Failed to move next"));
+ }
+ }
+ }
+
+ #[test]
+ fn test_64kb_blocks_seek() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_64KB_NONE_5000.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ assert!(reader.seek_to_first().expect("Failed to seek"));
+
+ // Seek to key in second block (block 0 ends at ~1110)
+ let lookup = Utf8Key::new("hudi-key-000001688");
+ let result = reader.seek_to(&lookup).expect("Failed to seek");
+ assert_eq!(result, SeekResult::Found);
+
+ let kv = reader.get_key_value().expect("Failed to get kv").unwrap();
+ assert_eq!(kv.key().content_as_str().unwrap(), "hudi-key-000001688");
+ }
+
+ // Priority 2: Edge Cases
+
+ #[test]
+ fn test_read_non_unique_keys() {
+ // 200 unique keys, each with 21 values (1 primary + 20 duplicates)
+ // Total: 200 * 21 = 4200 entries
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_GZ_200_20_non_unique.hfile");
+ let reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ assert_eq!(reader.num_entries(), 4200);
+ }
+
+ #[test]
+ fn test_non_unique_keys_sequential_read() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_GZ_200_20_non_unique.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ assert!(reader.seek_to_first().expect("Failed to seek"));
+
+ // First entry for key 0
+ let kv = reader.get_key_value().expect("Failed to get kv").unwrap();
+ assert_eq!(kv.key().content_as_str().unwrap(), "hudi-key-000000000");
+ assert_eq!(
+ std::str::from_utf8(kv.value()).unwrap(),
+ "hudi-value-000000000"
+ );
+
+ // Next 20 entries should be duplicates with _0 to _19 suffix
+ for j in 0..20 {
+ assert!(reader.next().expect("Failed to move next"));
+ let kv = reader.get_key_value().expect("Failed to get kv").unwrap();
+ assert_eq!(kv.key().content_as_str().unwrap(), "hudi-key-000000000");
+ let expected_value = format!("hudi-value-000000000_{}", j);
+ assert_eq!(std::str::from_utf8(kv.value()).unwrap(), expected_value);
+ }
+
+ // Next entry should be key 1
+ assert!(reader.next().expect("Failed to move next"));
+ let kv = reader.get_key_value().expect("Failed to get kv").unwrap();
+ assert_eq!(kv.key().content_as_str().unwrap(), "hudi-key-000000001");
+ }
+
+ #[test]
+ fn test_non_unique_keys_seek() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_GZ_200_20_non_unique.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ assert!(reader.seek_to_first().expect("Failed to seek"));
+
+ // Seek to a key - should find the first occurrence
+ let lookup = Utf8Key::new("hudi-key-000000005");
+ let result = reader.seek_to(&lookup).expect("Failed to seek");
+ assert_eq!(result, SeekResult::Found);
+
+ let kv = reader.get_key_value().expect("Failed to get kv").unwrap();
+ assert_eq!(kv.key().content_as_str().unwrap(), "hudi-key-000000005");
+ // First occurrence has the base value
+ assert_eq!(
+ std::str::from_utf8(kv.value()).unwrap(),
+ "hudi-value-000000005"
+ );
+ }
+
+ #[test]
+ fn test_read_fake_first_key() {
+ // File with fake first keys in meta index block
+ // Keys have suffix "-abcdefghij"
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_13_16KB_GZ_20000_fake_first_key.hfile");
+ let reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ assert_eq!(reader.num_entries(), 20000);
+ }
+
+ #[test]
+ fn test_fake_first_key_sequential_read() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_13_16KB_GZ_20000_fake_first_key.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ assert!(reader.seek_to_first().expect("Failed to seek"));
+
+ // Read first 10 entries - keys have "-abcdefghij" suffix
+ for i in 0..10 {
+ let kv = reader.get_key_value().expect("Failed to get kv").unwrap();
+ let expected_key = format!("hudi-key-{:09}-abcdefghij", i);
+ let expected_value = format!("hudi-value-{:09}", i);
+
+ assert_eq!(kv.key().content_as_str().unwrap(), expected_key);
+ assert_eq!(std::str::from_utf8(kv.value()).unwrap(), expected_value);
+
+ if i < 9 {
+ assert!(reader.next().expect("Failed to move next"));
+ }
+ }
+ }
+
+ #[test]
+ fn test_fake_first_key_seek_exact() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_13_16KB_GZ_20000_fake_first_key.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ assert!(reader.seek_to_first().expect("Failed to seek"));
+
+ // Seek to exact key with suffix
+ let lookup = Utf8Key::new("hudi-key-000000099-abcdefghij");
+ let result = reader.seek_to(&lookup).expect("Failed to seek");
+ assert_eq!(result, SeekResult::Found);
+
+ let kv = reader.get_key_value().expect("Failed to get kv").unwrap();
+ assert_eq!(
+ kv.key().content_as_str().unwrap(),
+ "hudi-key-000000099-abcdefghij"
+ );
+ }
+
+ #[test]
+ fn test_fake_first_key_seek_before_block_first() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_13_16KB_GZ_20000_fake_first_key.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ assert!(reader.seek_to_first().expect("Failed to seek"));
+
+ // First, move to a known position
+ let lookup = Utf8Key::new("hudi-key-000000469-abcdefghij");
+ let result = reader.seek_to(&lookup).expect("Failed to seek");
+ assert_eq!(result, SeekResult::Found);
+
+ // Now seek to a key that falls between fake first key and actual first key of next block
+ // Block 2 has fake first key "hudi-key-00000047" but actual first key "hudi-key-000000470-abcdefghij"
+ let lookup = Utf8Key::new("hudi-key-000000470");
+ let result = reader.seek_to(&lookup).expect("Failed to seek");
+ // This should return BeforeBlockFirstKey since the lookup key is >= fake first key
+ // but < actual first key
+ assert_eq!(result, SeekResult::BeforeBlockFirstKey);
+ }
+
+ // Priority 3: Multi-level Index
+
+ #[test]
+ fn test_read_large_keys_2level_index() {
+ // Large keys (>100 bytes), 2-level data block index
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_13_1KB_GZ_20000_large_keys.hfile");
+ let reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ assert_eq!(reader.num_entries(), 20000);
+ }
+
+ #[test]
+ fn test_large_keys_2level_sequential_read() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_13_1KB_GZ_20000_large_keys.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ let large_key_prefix = "hudi-key-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
+ aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-";
+
+ assert!(reader.seek_to_first().expect("Failed to seek"));
+
+ // Read first 5 entries
+ for i in 0..5 {
+ let kv = reader.get_key_value().expect("Failed to get kv").unwrap();
+ let expected_key = format!("{}{:09}", large_key_prefix, i);
+ let expected_value = format!("hudi-value-{:09}", i);
+
+ assert_eq!(kv.key().content_as_str().unwrap(), expected_key);
+ assert_eq!(std::str::from_utf8(kv.value()).unwrap(), expected_value);
+
+ if i < 4 {
+ assert!(reader.next().expect("Failed to move next"));
+ }
+ }
+ }
+
+ #[test]
+ fn test_large_keys_2level_seek() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_13_1KB_GZ_20000_large_keys.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ let large_key_prefix = "hudi-key-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
+ aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-";
+
+ assert!(reader.seek_to_first().expect("Failed to seek"));
+
+ // Seek to a key deep in the file
+ let lookup_key = format!("{}000005340", large_key_prefix);
+ let lookup = Utf8Key::new(&lookup_key);
+ let result = reader.seek_to(&lookup).expect("Failed to seek");
+ assert_eq!(result, SeekResult::Found);
+
+ let kv = reader.get_key_value().expect("Failed to get kv").unwrap();
+ assert_eq!(kv.key().content_as_str().unwrap(), lookup_key);
+ }
+
+ #[test]
+ fn test_large_keys_2level_iterate_all() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_13_1KB_GZ_20000_large_keys.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ let mut count = 0;
+ for result in reader.iter().expect("Failed to create iterator") {
+ let _ = result.expect("Failed to read kv");
+ count += 1;
+ }
+
+ assert_eq!(count, 20000);
+ }
+
+ #[test]
+ fn test_read_large_keys_3level_index() {
+ // Large keys, 3-level deep data block index
+ let bytes =
+ read_test_hfile("hudi_1_0_hbase_2_4_13_1KB_GZ_10000_large_keys_deep_index.hfile");
+ let reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ assert_eq!(reader.num_entries(), 10000);
+ }
+
+ #[test]
+ fn test_large_keys_3level_sequential_read() {
+ let bytes =
+ read_test_hfile("hudi_1_0_hbase_2_4_13_1KB_GZ_10000_large_keys_deep_index.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ let large_key_prefix = "hudi-key-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
+ aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-";
+
+ assert!(reader.seek_to_first().expect("Failed to seek"));
+
+ // Read first 5 entries
+ for i in 0..5 {
+ let kv = reader.get_key_value().expect("Failed to get kv").unwrap();
+ let expected_key = format!("{}{:09}", large_key_prefix, i);
+ let expected_value = format!("hudi-value-{:09}", i);
+
+ assert_eq!(kv.key().content_as_str().unwrap(), expected_key);
+ assert_eq!(std::str::from_utf8(kv.value()).unwrap(), expected_value);
+
+ if i < 4 {
+ assert!(reader.next().expect("Failed to move next"));
+ }
+ }
+ }
+
+ #[test]
+ fn test_large_keys_3level_seek() {
+ let bytes =
+ read_test_hfile("hudi_1_0_hbase_2_4_13_1KB_GZ_10000_large_keys_deep_index.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ let large_key_prefix = "hudi-key-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
+ aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-";
+
+ assert!(reader.seek_to_first().expect("Failed to seek"));
+
+ // Seek to a key deep in the file
+ let lookup_key = format!("{}000005340", large_key_prefix);
+ let lookup = Utf8Key::new(&lookup_key);
+ let result = reader.seek_to(&lookup).expect("Failed to seek");
+ assert_eq!(result, SeekResult::Found);
+
+ let kv = reader.get_key_value().expect("Failed to get kv").unwrap();
+ assert_eq!(kv.key().content_as_str().unwrap(), lookup_key);
+ }
+
+ #[test]
+ fn test_large_keys_3level_iterate_all() {
+ let bytes =
+ read_test_hfile("hudi_1_0_hbase_2_4_13_1KB_GZ_10000_large_keys_deep_index.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ let mut count = 0;
+ for result in reader.iter().expect("Failed to create iterator") {
+ let _ = result.expect("Failed to read kv");
+ count += 1;
+ }
+
+ assert_eq!(count, 10000);
+ }
+
+ #[test]
+ fn test_large_keys_3level_last_key() {
+ let bytes =
+ read_test_hfile("hudi_1_0_hbase_2_4_13_1KB_GZ_10000_large_keys_deep_index.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ let large_key_prefix = "hudi-key-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
+ aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-";
+
+ assert!(reader.seek_to_first().expect("Failed to seek"));
+
+ // Seek to last key
+ let lookup_key = format!("{}000009999", large_key_prefix);
+ let lookup = Utf8Key::new(&lookup_key);
+ let result = reader.seek_to(&lookup).expect("Failed to seek");
+ assert_eq!(result, SeekResult::Found);
+
+ let kv = reader.get_key_value().expect("Failed to get kv").unwrap();
+ assert_eq!(kv.key().content_as_str().unwrap(), lookup_key);
+ assert_eq!(
+ std::str::from_utf8(kv.value()).unwrap(),
+ "hudi-value-000009999"
+ );
+
+ // Next should return false (EOF)
+ assert!(!reader.next().expect("Failed to move next"));
+ }
+
+ #[test]
+ fn test_large_keys_3level_seek_eof() {
+ let bytes =
+ read_test_hfile("hudi_1_0_hbase_2_4_13_1KB_GZ_10000_large_keys_deep_index.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ let large_key_prefix = "hudi-key-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
+ aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-";
+
+ assert!(reader.seek_to_first().expect("Failed to seek"));
+
+ // Seek past last key
+ let lookup_key = format!("{}000009999a", large_key_prefix);
+ let lookup = Utf8Key::new(&lookup_key);
+ let result = reader.seek_to(&lookup).expect("Failed to seek");
+ assert_eq!(result, SeekResult::Eof);
+ }
+
+ // ================== Additional Coverage Tests ==================
+
+ #[test]
+ fn test_is_seeked() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ assert!(!reader.is_seeked());
+ reader.seek_to_first().expect("Failed to seek");
+ assert!(reader.is_seeked());
+ }
+
+ #[test]
+ fn test_get_key_value_not_seeked() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ // Not seeked yet
+ let result = reader.get_key_value().expect("Failed to get kv");
+ assert!(result.is_none());
+ }
+
+ #[test]
+ fn test_next_not_seeked() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ // next() without seek should return false
+ assert!(!reader.next().expect("Failed to next"));
+ }
+
+ #[test]
+ fn test_seek_before_first_key() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ reader.seek_to_first().expect("Failed to seek");
+
+ // Seek to a key before the first key in the file
+ let lookup = Utf8Key::new("aaa"); // Before "hudi-key-000000000"
+ let result = reader.seek_to(&lookup).expect("Failed to seek");
+ assert_eq!(result, SeekResult::BeforeFileFirstKey);
+ }
+
+ #[test]
+ fn test_seek_in_range() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ reader.seek_to_first().expect("Failed to seek");
+
+ // First move past the first key to avoid BeforeBlockFirstKey result
+ let lookup = Utf8Key::new("hudi-key-000000100");
+ reader.seek_to(&lookup).expect("Failed to seek");
+
+ // Now seek to a key that doesn't exist but is in range (between 100 and 101)
+ let lookup = Utf8Key::new("hudi-key-000000100a");
+ let result = reader.seek_to(&lookup).expect("Failed to seek");
+ assert_eq!(result, SeekResult::InRange);
+ }
+
+ #[test]
+ fn test_lookup_records_empty_keys() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ let results = reader.lookup_records(&[]).expect("Failed to lookup");
+ assert!(results.is_empty());
+ }
+
+ #[test]
+ fn test_lookup_records_not_found() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ let results = reader
+ .lookup_records(&["nonexistent-key"])
+ .expect("Failed to lookup");
+ assert_eq!(results.len(), 1);
+ assert_eq!(results[0].0, "nonexistent-key");
+ assert!(results[0].1.is_none());
+ }
+
+ #[test]
+ fn test_lookup_records_found() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ let results = reader
+ .lookup_records(&["hudi-key-000000000", "hudi-key-000000001"])
+ .expect("Failed to lookup");
+
+ assert_eq!(results.len(), 2);
+ assert_eq!(results[0].0, "hudi-key-000000000");
+ assert!(results[0].1.is_some());
+ assert_eq!(results[1].0, "hudi-key-000000001");
+ assert!(results[1].1.is_some());
+ }
+
+ #[test]
+ fn test_collect_records_by_prefix_no_matches() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ let records = reader
+ .collect_records_by_prefix("nonexistent-prefix-")
+ .expect("Failed to collect");
+ assert!(records.is_empty());
+ }
+
+ #[test]
+ fn test_collect_records_by_prefix_found() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ // All keys start with "hudi-key-00000000" for keys 0-9
+ let records = reader
+ .collect_records_by_prefix("hudi-key-00000000")
+ .expect("Failed to collect");
+ // Keys 0-9 match this prefix
+ assert_eq!(records.len(), 10);
+ }
+
+ #[test]
+ fn test_trailer_info() {
+ let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+ let reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ assert_eq!(reader.num_entries(), 5000);
+ // Just verify we can access trailer info
+ assert!(reader.num_entries() > 0);
+ }
+
+ #[test]
+ fn test_seek_result_enum() {
+ // Test SeekResult values
+ assert_eq!(SeekResult::BeforeBlockFirstKey as i32, -2);
+ assert_eq!(SeekResult::BeforeFileFirstKey as i32, -1);
+ assert_eq!(SeekResult::Found as i32, 0);
+ assert_eq!(SeekResult::InRange as i32, 1);
+ assert_eq!(SeekResult::Eof as i32, 2);
+
+ // Test Debug implementation
+ let _ = format!("{:?}", SeekResult::Found);
+ }
+}
diff --git a/crates/core/src/hfile/record.rs b/crates/core/src/hfile/record.rs
new file mode 100644
index 0000000..4ee6364
--- /dev/null
+++ b/crates/core/src/hfile/record.rs
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! HFile record types for MDT (Metadata Table) operations.
+//!
+//! This module provides simple, owned record types for HFile key-value pairs.
+//! These are designed for use in MDT operations where:
+//! - Records need to be passed around and stored
+//! - Key-based lookups and merging are primary operations
+//! - Values are Avro-serialized payloads decoded on demand
+//!
+//! Unlike the `KeyValue` type which references into file bytes,
+//! `HFileRecord` owns its data and can be freely moved.
+
+use std::cmp::Ordering;
+
+/// An owned HFile record with key and value.
+///
+/// This is a simple struct designed for MDT operations. The key is the
+/// UTF-8 record key (content only, without HFile key structure), and
+/// the value is the raw bytes (typically Avro-serialized payload).
+///
+/// # Example
+/// ```ignore
+/// let record = HFileRecord::new("my-key".into(), value_bytes);
+/// println!("Key: {}", record.key_as_str());
+/// // Decode value on demand
+/// let payload = decode_avro(&record.value);
+/// ```
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct HFileRecord {
+ /// Record key (UTF-8 string content only, no length prefix)
+ pub key: Vec<u8>,
+ /// Record value (raw bytes, typically Avro-serialized)
+ pub value: Vec<u8>,
+}
+
+impl HFileRecord {
+ /// Create a new HFile record.
+ pub fn new(key: Vec<u8>, value: Vec<u8>) -> Self {
+ Self { key, value }
+ }
+
+ /// Create a record from string key and value bytes.
+ pub fn from_str_key(key: &str, value: Vec<u8>) -> Self {
+ Self {
+ key: key.as_bytes().to_vec(),
+ value,
+ }
+ }
+
+ /// Returns the key as a UTF-8 string.
+ ///
+ /// Returns `None` if the key is not valid UTF-8.
+ pub fn key_as_str(&self) -> Option<&str> {
+ std::str::from_utf8(&self.key).ok()
+ }
+
+ /// Returns the key as bytes.
+ pub fn key(&self) -> &[u8] {
+ &self.key
+ }
+
+ /// Returns the value as bytes.
+ pub fn value(&self) -> &[u8] {
+ &self.value
+ }
+
+ /// Returns whether this record represents a deletion.
+ ///
+ /// In MDT, a deleted record has an empty value.
+ pub fn is_deleted(&self) -> bool {
+ self.value.is_empty()
+ }
+}
+
+impl PartialOrd for HFileRecord {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ Some(self.cmp(other))
+ }
+}
+
+impl Ord for HFileRecord {
+ fn cmp(&self, other: &Self) -> Ordering {
+ self.key.cmp(&other.key)
+ }
+}
+
+impl std::fmt::Display for HFileRecord {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self.key_as_str() {
+ Some(key) => write!(
+ f,
+ "HFileRecord{{key={}, value_len={}}}",
+ key,
+ self.value.len()
+ ),
+ None => write!(
+ f,
+ "HFileRecord{{key=<binary {} bytes>, value_len={}}}",
+ self.key.len(),
+ self.value.len()
+ ),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_hfile_record_creation() {
+ let record = HFileRecord::new(b"test-key".to_vec(), b"test-value".to_vec());
+ assert_eq!(record.key_as_str(), Some("test-key"));
+ assert_eq!(record.value(), b"test-value");
+ assert!(!record.is_deleted());
+ }
+
+ #[test]
+ fn test_hfile_record_from_str() {
+ let record = HFileRecord::from_str_key("my-key", b"my-value".to_vec());
+ assert_eq!(record.key_as_str(), Some("my-key"));
+ assert_eq!(record.value(), b"my-value");
+ }
+
+ #[test]
+ fn test_hfile_record_deleted() {
+ let record = HFileRecord::new(b"deleted-key".to_vec(), vec![]);
+ assert!(record.is_deleted());
+ }
+
+ #[test]
+ fn test_hfile_record_ordering() {
+ let r1 = HFileRecord::from_str_key("aaa", vec![1]);
+ let r2 = HFileRecord::from_str_key("bbb", vec![2]);
+ let r3 = HFileRecord::from_str_key("aaa", vec![3]);
+
+ assert!(r1 < r2);
+ assert_eq!(r1.cmp(&r3), Ordering::Equal); // Same key, ordering only by key
+ }
+
+ #[test]
+ fn test_hfile_record_display() {
+ let record = HFileRecord::from_str_key("test", b"value".to_vec());
+ let display = format!("{}", record);
+ assert!(display.contains("test"));
+ assert!(display.contains("value_len=5"));
+ }
+}
diff --git a/crates/core/src/hfile/trailer.rs b/crates/core/src/hfile/trailer.rs
new file mode 100644
index 0000000..1c8d786
--- /dev/null
+++ b/crates/core/src/hfile/trailer.rs
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! HFile trailer parsing.
+
+use crate::hfile::block_type::{HFileBlockType, MAGIC_LENGTH};
+use crate::hfile::compression::CompressionCodec;
+use crate::hfile::error::{HFileError, Result};
+use crate::hfile::proto::TrailerProto;
+use prost::Message;
+
+/// HFile trailer size (fixed at 4096 bytes for HFile v3)
+pub const TRAILER_SIZE: usize = 4096;
+
+/// HFile trailer containing file metadata.
+#[derive(Debug, Clone)]
+#[allow(dead_code)]
+pub struct HFileTrailer {
+ /// Major version (should be 3 for HFile v3)
+ pub major_version: u32,
+ /// Minor version
+ pub minor_version: u32,
+ /// Offset to file info block
+ pub file_info_offset: u64,
+ /// Offset to load-on-open section
+ pub load_on_open_data_offset: u64,
+ /// Total uncompressed size of data block index
+ pub uncompressed_data_index_size: u64,
+ /// Total uncompressed bytes in file
+ pub total_uncompressed_bytes: u64,
+ /// Number of entries in data block index
+ pub data_index_count: u32,
+ /// Number of entries in meta block index
+ pub meta_index_count: u32,
+ /// Number of key-value entries in file
+ pub entry_count: u64,
+ /// Number of levels in data block index
+ pub num_data_index_levels: u32,
+ /// Offset to first data block
+ pub first_data_block_offset: u64,
+ /// Offset past the last data block
+ pub last_data_block_offset: u64,
+ /// Comparator class name (not used by Hudi)
+ pub comparator_class_name: String,
+ /// Compression codec used for blocks
+ pub compression_codec: CompressionCodec,
+}
+
+impl HFileTrailer {
+ /// Read and parse the trailer from file bytes.
+ ///
+ /// The trailer is always at the end of the file with fixed size.
+ pub fn read(file_bytes: &[u8]) -> Result<Self> {
+ let file_size = file_bytes.len();
+ if file_size < TRAILER_SIZE {
+ return Err(HFileError::InvalidFormat(format!(
+ "File too small for HFile trailer: {} bytes, need at least {}",
+ file_size, TRAILER_SIZE
+ )));
+ }
+
+ let trailer_start = file_size - TRAILER_SIZE;
+ let trailer_bytes = &file_bytes[trailer_start..];
+
+ // Verify trailer magic
+ HFileBlockType::Trailer.check_magic(trailer_bytes)?;
+
+ // Read version from last 4 bytes
+ // Format: [minor_version (1 byte)] [major_version (3 bytes)]
+ let version_bytes = &trailer_bytes[TRAILER_SIZE - 4..];
+ let minor_version = version_bytes[0] as u32;
+ let major_version = ((version_bytes[1] as u32) << 16)
+ | ((version_bytes[2] as u32) << 8)
+ | (version_bytes[3] as u32);
+
+ if major_version != 3 {
+ return Err(HFileError::UnsupportedVersion {
+ major: major_version,
+ minor: minor_version,
+ });
+ }
+
+ // Parse protobuf content (after magic, before version)
+ let proto_start = MAGIC_LENGTH;
+ let proto_end = TRAILER_SIZE - 4;
+ let proto_bytes = &trailer_bytes[proto_start..proto_end];
+
+ // Protobuf is length-delimited
+ let proto = Self::parse_length_delimited_proto(proto_bytes)?;
+
+ let compression_codec = proto
+ .compression_codec
+ .map(CompressionCodec::from_id)
+ .transpose()?
+ .unwrap_or_default();
+
+ Ok(Self {
+ major_version,
+ minor_version,
+ file_info_offset: proto.file_info_offset.unwrap_or(0),
+ load_on_open_data_offset: proto.load_on_open_data_offset.unwrap_or(0),
+ uncompressed_data_index_size: proto.uncompressed_data_index_size.unwrap_or(0),
+ total_uncompressed_bytes: proto.total_uncompressed_bytes.unwrap_or(0),
+ data_index_count: proto.data_index_count.unwrap_or(0),
+ meta_index_count: proto.meta_index_count.unwrap_or(0),
+ entry_count: proto.entry_count.unwrap_or(0),
+ num_data_index_levels: proto.num_data_index_levels.unwrap_or(1),
+ first_data_block_offset: proto.first_data_block_offset.unwrap_or(0),
+ last_data_block_offset: proto.last_data_block_offset.unwrap_or(0),
+ comparator_class_name: proto.comparator_class_name.unwrap_or_default(),
+ compression_codec,
+ })
+ }
+
+ /// Parse a length-delimited protobuf message.
+ fn parse_length_delimited_proto(bytes: &[u8]) -> Result<TrailerProto> {
+ // Read varint length
+ let (length, consumed) = read_varint(bytes);
+ let proto_bytes = &bytes[consumed..consumed + length as usize];
+ TrailerProto::decode(proto_bytes).map_err(HFileError::from)
+ }
+}
+
+/// Read a varint from bytes. Returns (value, bytes_consumed).
+fn read_varint(bytes: &[u8]) -> (u64, usize) {
+ let mut result: u64 = 0;
+ let mut shift = 0;
+ let mut pos = 0;
+
+ while pos < bytes.len() {
+ let b = bytes[pos] as u64;
+ pos += 1;
+ result |= (b & 0x7F) << shift;
+ if b & 0x80 == 0 {
+ break;
+ }
+ shift += 7;
+ }
+
+ (result, pos)
+}
diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs
index 9438db5..5b1d891 100644
--- a/crates/core/src/lib.rs
+++ b/crates/core/src/lib.rs
@@ -48,6 +48,7 @@
pub mod error;
pub mod expr;
pub mod file_group;
+pub mod hfile;
pub mod merge;
pub mod metadata;
mod record;
diff --git a/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_13_16KB_GZ_20000_fake_first_key.hfile b/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_13_16KB_GZ_20000_fake_first_key.hfile
new file mode 100644
index 0000000..4825637
--- /dev/null
+++ b/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_13_16KB_GZ_20000_fake_first_key.hfile
Binary files differ
diff --git a/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_13_1KB_GZ_10000_large_keys_deep_index.hfile b/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_13_1KB_GZ_10000_large_keys_deep_index.hfile
new file mode 100644
index 0000000..03c7960
--- /dev/null
+++ b/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_13_1KB_GZ_10000_large_keys_deep_index.hfile
Binary files differ
diff --git a/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_13_1KB_GZ_20000_large_keys.hfile b/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_13_1KB_GZ_20000_large_keys.hfile
new file mode 100644
index 0000000..bacd245
--- /dev/null
+++ b/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_13_1KB_GZ_20000_large_keys.hfile
Binary files differ
diff --git a/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_9_16KB_GZ_20000.hfile b/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_9_16KB_GZ_20000.hfile
new file mode 100644
index 0000000..243eb66
--- /dev/null
+++ b/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_9_16KB_GZ_20000.hfile
Binary files differ
diff --git a/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_9_16KB_GZ_200_20_non_unique.hfile b/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_9_16KB_GZ_200_20_non_unique.hfile
new file mode 100644
index 0000000..6c86b01
--- /dev/null
+++ b/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_9_16KB_GZ_200_20_non_unique.hfile
Binary files differ
diff --git a/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile b/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile
new file mode 100644
index 0000000..c12188d
--- /dev/null
+++ b/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile
Binary files differ
diff --git a/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_9_512KB_GZ_20000.hfile b/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_9_512KB_GZ_20000.hfile
new file mode 100644
index 0000000..4d35a3e
--- /dev/null
+++ b/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_9_512KB_GZ_20000.hfile
Binary files differ
diff --git a/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_9_64KB_NONE_5000.hfile b/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_9_64KB_NONE_5000.hfile
new file mode 100644
index 0000000..923bb84
--- /dev/null
+++ b/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_9_64KB_NONE_5000.hfile
Binary files differ
diff --git a/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_9_no_entry.hfile b/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_9_no_entry.hfile
new file mode 100644
index 0000000..0471f98
--- /dev/null
+++ b/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_9_no_entry.hfile
Binary files differ