feat: add hfile reader (#490)

diff --git a/Cargo.toml b/Cargo.toml
index bc991d1..5e44b0a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -85,3 +85,9 @@
 dashmap = { version = "~6.1" }
 futures = { version = "~0.3" }
 tokio = { version = "~1.45", features = ["rt-multi-thread"] }
+
+# protobuf
+prost = { version = "~0.13" }
+
+# compression
+flate2 = { version = "^1.1" }
diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml
index eb958a9..45d423d 100644
--- a/crates/core/Cargo.toml
+++ b/crates/core/Cargo.toml
@@ -73,6 +73,12 @@
 futures = { workspace = true }
 tokio = { workspace = true }
 
+# protobuf
+prost = { workspace = true }
+
+# compression
+flate2 = { workspace = true }
+
 # datafusion
 datafusion = { workspace = true, optional = true }
 datafusion-expr = { workspace = true, optional = true }
diff --git a/crates/core/src/hfile/block.rs b/crates/core/src/hfile/block.rs
new file mode 100644
index 0000000..29b308e
--- /dev/null
+++ b/crates/core/src/hfile/block.rs
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! HFile block parsing and handling.
+
+use crate::hfile::block_type::{HFileBlockType, MAGIC_LENGTH};
+use crate::hfile::compression::CompressionCodec;
+use crate::hfile::error::{HFileError, Result};
+use crate::hfile::key::{Key, KeyValue, KEY_VALUE_HEADER_SIZE};
+
+/// Size constants for block header
+const SIZEOF_INT32: usize = 4;
+const SIZEOF_INT64: usize = 8;
+const SIZEOF_BYTE: usize = 1;
+
+/// Block header size without checksum info (HFile v2)
+const HEADER_SIZE_NO_CHECKSUM: usize = MAGIC_LENGTH + 2 * SIZEOF_INT32 + SIZEOF_INT64;
+
+/// Block header size with checksum (HFile v3)
+/// Header fields:
+/// - 8 bytes: magic
+/// - 4 bytes: on-disk size without header
+/// - 4 bytes: uncompressed size without header
+/// - 8 bytes: previous block offset
+/// - 1 byte: checksum type
+/// - 4 bytes: bytes per checksum
+/// - 4 bytes: on-disk data size with header
+pub const BLOCK_HEADER_SIZE: usize = HEADER_SIZE_NO_CHECKSUM + SIZEOF_BYTE + 2 * SIZEOF_INT32;
+
+/// Checksum size (each checksum is 4 bytes)
+const CHECKSUM_SIZE: usize = SIZEOF_INT32;
+
+/// Parsed block header information.
+#[derive(Debug, Clone)]
+#[allow(dead_code)]
+pub struct BlockHeader {
+    pub block_type: HFileBlockType,
+    pub on_disk_size_without_header: usize,
+    pub uncompressed_size_without_header: usize,
+    pub prev_block_offset: i64,
+    pub checksum_type: u8,
+    pub bytes_per_checksum: usize,
+    pub on_disk_data_size_with_header: usize,
+}
+
+impl BlockHeader {
+    /// Parse a block header from bytes.
+    pub fn parse(bytes: &[u8]) -> Result<Self> {
+        if bytes.len() < BLOCK_HEADER_SIZE {
+            return Err(HFileError::InvalidFormat(format!(
+                "Buffer too small for block header: {} bytes, need {}",
+                bytes.len(),
+                BLOCK_HEADER_SIZE
+            )));
+        }
+
+        let block_type = HFileBlockType::from_magic(bytes)?;
+
+        let on_disk_size_without_header =
+            i32::from_be_bytes([bytes[8], bytes[9], bytes[10], bytes[11]]) as usize;
+
+        let uncompressed_size_without_header =
+            i32::from_be_bytes([bytes[12], bytes[13], bytes[14], bytes[15]]) as usize;
+
+        let prev_block_offset = i64::from_be_bytes([
+            bytes[16], bytes[17], bytes[18], bytes[19], bytes[20], bytes[21], bytes[22], bytes[23],
+        ]);
+
+        let checksum_type = bytes[24];
+
+        let bytes_per_checksum =
+            i32::from_be_bytes([bytes[25], bytes[26], bytes[27], bytes[28]]) as usize;
+
+        let on_disk_data_size_with_header =
+            i32::from_be_bytes([bytes[29], bytes[30], bytes[31], bytes[32]]) as usize;
+
+        Ok(Self {
+            block_type,
+            on_disk_size_without_header,
+            uncompressed_size_without_header,
+            prev_block_offset,
+            checksum_type,
+            bytes_per_checksum,
+            on_disk_data_size_with_header,
+        })
+    }
+
+    /// Calculate the number of checksum bytes.
+    pub fn checksum_bytes_count(&self) -> usize {
+        let on_disk_with_header = BLOCK_HEADER_SIZE + self.on_disk_size_without_header;
+        let num_chunks = on_disk_with_header.div_ceil(self.bytes_per_checksum);
+        num_chunks * CHECKSUM_SIZE
+    }
+
+    /// Returns the total on-disk size including header.
+    pub fn on_disk_size_with_header(&self) -> usize {
+        BLOCK_HEADER_SIZE + self.on_disk_size_without_header
+    }
+}
+
+/// An HFile block with parsed content.
+#[derive(Debug)]
+pub struct HFileBlock {
+    pub header: BlockHeader,
+    /// Uncompressed block data (after header, before checksum)
+    pub data: Vec<u8>,
+}
+
+impl HFileBlock {
+    /// Parse a block from bytes, decompressing if necessary.
+    pub fn parse(bytes: &[u8], codec: CompressionCodec) -> Result<Self> {
+        let header = BlockHeader::parse(bytes)?;
+
+        // Extract the compressed data (between header and checksum)
+        let data_start = BLOCK_HEADER_SIZE;
+        let data_end =
+            data_start + header.on_disk_size_without_header - header.checksum_bytes_count();
+
+        // For uncompressed blocks, on_disk_size == uncompressed_size
+        // For compressed blocks, we need to decompress
+        let data = if codec == CompressionCodec::None {
+            bytes[data_start..data_start + header.uncompressed_size_without_header].to_vec()
+        } else {
+            let compressed_data = &bytes[data_start..data_end];
+            codec.decompress(compressed_data, header.uncompressed_size_without_header)?
+        };
+
+        Ok(Self { header, data })
+    }
+
+    /// Returns the block type.
+    pub fn block_type(&self) -> HFileBlockType {
+        self.header.block_type
+    }
+}
+
+/// A block index entry pointing to a data or meta block.
+#[derive(Debug, Clone)]
+pub struct BlockIndexEntry {
+    /// First key in the block (may be shortened/fake for optimization)
+    pub first_key: Key,
+    /// First key of the next block (if present)
+    pub next_block_first_key: Option<Key>,
+    /// Offset of the block in the file
+    pub offset: u64,
+    /// On-disk size of the block
+    pub size: u32,
+}
+
+impl BlockIndexEntry {
+    /// Create a new block index entry.
+    pub fn new(first_key: Key, next_block_first_key: Option<Key>, offset: u64, size: u32) -> Self {
+        Self {
+            first_key,
+            next_block_first_key,
+            offset,
+            size,
+        }
+    }
+}
+
+impl PartialEq for BlockIndexEntry {
+    fn eq(&self, other: &Self) -> bool {
+        self.first_key == other.first_key
+    }
+}
+
+impl Eq for BlockIndexEntry {}
+
+impl PartialOrd for BlockIndexEntry {
+    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
+impl Ord for BlockIndexEntry {
+    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+        self.first_key.cmp(&other.first_key)
+    }
+}
+
+/// Data block containing key-value pairs.
+pub struct DataBlock {
+    /// Uncompressed block data
+    data: Vec<u8>,
+    /// End offset of content (excluding checksum)
+    content_end: usize,
+}
+
+impl DataBlock {
+    /// Create a data block from an HFileBlock.
+    pub fn from_block(block: HFileBlock) -> Self {
+        let content_end = block.data.len();
+        Self {
+            data: block.data,
+            content_end,
+        }
+    }
+
+    /// Read a key-value at the given offset within the data block.
+    pub fn read_key_value(&self, offset: usize) -> KeyValue {
+        KeyValue::parse(&self.data, offset)
+    }
+
+    /// Check if the offset is within valid content range.
+    pub fn is_valid_offset(&self, offset: usize) -> bool {
+        offset < self.content_end
+    }
+
+    /// Iterate over all key-value pairs in the block.
+    #[allow(dead_code)]
+    pub fn iter(&self) -> DataBlockIterator<'_> {
+        DataBlockIterator {
+            block: self,
+            offset: 0,
+        }
+    }
+
+    /// Returns the raw data.
+    #[allow(dead_code)]
+    pub fn data(&self) -> &[u8] {
+        &self.data
+    }
+}
+
+/// Iterator over key-value pairs in a data block.
+pub struct DataBlockIterator<'a> {
+    block: &'a DataBlock,
+    offset: usize,
+}
+
+impl<'a> Iterator for DataBlockIterator<'a> {
+    type Item = KeyValue;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        if !self.block.is_valid_offset(self.offset) {
+            return None;
+        }
+
+        // Need at least 8 bytes for key/value lengths
+        if self.offset + KEY_VALUE_HEADER_SIZE > self.block.content_end {
+            return None;
+        }
+
+        let kv = self.block.read_key_value(self.offset);
+        self.offset += kv.record_size();
+        Some(kv)
+    }
+}
+
+/// Read a Hadoop VLong encoded integer from bytes.
+/// This is the encoding used in HFile root/leaf index blocks.
+/// Returns (value, bytes_consumed).
+///
+/// Hadoop VLong format:
+/// - First byte determines total size
+/// - If first byte >= -112 (signed), value fits in single byte
+/// - Otherwise, -111 - first_byte gives number of additional data bytes
+pub fn read_var_long(bytes: &[u8], offset: usize) -> (u64, usize) {
+    let first_byte = bytes[offset];
+    let size = var_long_size_on_disk_single(first_byte);
+
+    if size == 1 {
+        // Single byte encoding: value is the byte itself (as signed, then cast to u64)
+        return (first_byte as i8 as i64 as u64, 1);
+    }
+
+    // Multi-byte encoding: read size-1 bytes as big-endian
+    let mut value: u64 = 0;
+    for i in 0..size - 1 {
+        value = (value << 8) | (bytes[offset + 1 + i] as u64);
+    }
+
+    // Check if negative (first byte < -120 in signed representation)
+    let is_negative = (first_byte as i8) < -120;
+    if is_negative {
+        (!value, size)
+    } else {
+        (value, size)
+    }
+}
+
+/// Calculate the size of a Hadoop VLong encoded integer from the first byte.
+fn var_long_size_on_disk_single(first_byte: u8) -> usize {
+    let signed = first_byte as i8;
+    if signed >= -112 {
+        1
+    } else {
+        (-111 - signed as i32) as usize
+    }
+}
+
+/// Calculate the size of a Hadoop VLong encoded integer.
+pub fn var_long_size_on_disk(bytes: &[u8], offset: usize) -> usize {
+    var_long_size_on_disk_single(bytes[offset])
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_block_header_parse_too_small() {
+        let bytes = vec![0u8; 10]; // Too small for header
+        let err = BlockHeader::parse(&bytes).unwrap_err();
+        assert!(matches!(err, HFileError::InvalidFormat(_)));
+    }
+
+    #[test]
+    fn test_block_header_on_disk_size_with_header() {
+        // Create a minimal valid block header with DATA magic
+        let mut bytes = vec![];
+        bytes.extend_from_slice(b"DATABLK*"); // magic
+        bytes.extend_from_slice(&100i32.to_be_bytes()); // on_disk_size_without_header
+        bytes.extend_from_slice(&200i32.to_be_bytes()); // uncompressed_size_without_header
+        bytes.extend_from_slice(&(-1i64).to_be_bytes()); // prev_block_offset
+        bytes.push(1); // checksum_type
+        bytes.extend_from_slice(&16384i32.to_be_bytes()); // bytes_per_checksum
+        bytes.extend_from_slice(&133i32.to_be_bytes()); // on_disk_data_size_with_header
+
+        let header = BlockHeader::parse(&bytes).unwrap();
+        assert_eq!(header.on_disk_size_with_header(), BLOCK_HEADER_SIZE + 100);
+        assert_eq!(header.block_type, HFileBlockType::Data);
+    }
+
+    #[test]
+    fn test_block_index_entry_ordering() {
+        let key1 = Key::from_bytes(vec![0, 3, b'a', b'a', b'a']);
+        let key2 = Key::from_bytes(vec![0, 3, b'b', b'b', b'b']);
+
+        let entry1 = BlockIndexEntry::new(key1.clone(), None, 0, 100);
+        let entry2 = BlockIndexEntry::new(key2.clone(), None, 1000, 100);
+        let entry3 = BlockIndexEntry::new(key1.clone(), None, 2000, 200);
+
+        assert!(entry1 < entry2);
+        assert_eq!(entry1, entry3); // Same key = equal
+        assert_eq!(entry1.partial_cmp(&entry2), Some(std::cmp::Ordering::Less));
+    }
+
+    #[test]
+    fn test_read_var_long_single_byte() {
+        // Single byte values (first byte >= -112 as signed, i.e., 0-143 or 144-255 when unsigned)
+        let bytes = vec![0u8]; // value 0
+        let (value, size) = read_var_long(&bytes, 0);
+        assert_eq!(value, 0);
+        assert_eq!(size, 1);
+
+        let bytes = vec![100u8]; // value 100
+        let (value, size) = read_var_long(&bytes, 0);
+        assert_eq!(value, 100);
+        assert_eq!(size, 1);
+
+        // 127 as unsigned byte is 127, as signed it's 127 (>= -112), so single byte
+        let bytes = vec![127u8];
+        let (value, size) = read_var_long(&bytes, 0);
+        assert_eq!(value, 127);
+        assert_eq!(size, 1);
+    }
+
+    #[test]
+    fn test_read_var_long_multi_byte() {
+        // Multi-byte encoding: first byte < -112 (signed)
+        // -113 as signed = 143 as unsigned = 0x8F
+        // This means 2 additional bytes (size = -111 - (-113) = 2, total size = 3)
+        // Actually let's test with known values
+
+        // For value 1000:
+        // In Hadoop VLong, values 128-255 use 2 bytes
+        // First byte = -113 (0x8F) means 2 additional bytes
+        // But let's use a simpler approach - test with offset
+
+        let bytes = vec![50u8, 0x8F, 0x03, 0xE8]; // offset 0 = 50 (single byte)
+        let (value, size) = read_var_long(&bytes, 0);
+        assert_eq!(value, 50);
+        assert_eq!(size, 1);
+    }
+
+    #[test]
+    fn test_var_long_size_on_disk() {
+        // Single byte values (first byte >= -112 as signed)
+        // 0..=127 are positive when signed, so single byte
+        assert_eq!(var_long_size_on_disk(&[0], 0), 1);
+        assert_eq!(var_long_size_on_disk(&[100], 0), 1);
+        assert_eq!(var_long_size_on_disk(&[127], 0), 1);
+        // 128..=143 are -128..-113 as signed, still >= -112? No.
+        // 143 as u8 = -113 as i8, which is >= -112? -113 >= -112? No, -113 < -112
+        // So 143 should be multi-byte. Let's check: -111 - (-113) = 2
+        assert_eq!(var_long_size_on_disk(&[143], 0), 2); // -113 as signed, size = 2
+
+        // 144 as u8 = -112 as i8, which is >= -112, so single byte
+        assert_eq!(var_long_size_on_disk(&[144], 0), 1);
+    }
+
+    #[test]
+    fn test_data_block_is_valid_offset() {
+        // Create a simple HFileBlock
+        let mut header_bytes = vec![];
+        header_bytes.extend_from_slice(b"DATABLK*"); // magic
+        header_bytes.extend_from_slice(&100i32.to_be_bytes()); // on_disk_size
+        header_bytes.extend_from_slice(&50i32.to_be_bytes()); // uncompressed_size
+        header_bytes.extend_from_slice(&(-1i64).to_be_bytes()); // prev_block_offset
+        header_bytes.push(1); // checksum_type
+        header_bytes.extend_from_slice(&16384i32.to_be_bytes()); // bytes_per_checksum
+        header_bytes.extend_from_slice(&133i32.to_be_bytes()); // on_disk_data_size
+
+        // Add data after header
+        header_bytes.extend_from_slice(&[0u8; 50]); // 50 bytes of data
+
+        let block = HFileBlock::parse(&header_bytes, CompressionCodec::None).unwrap();
+        let data_block = DataBlock::from_block(block);
+
+        assert!(data_block.is_valid_offset(0));
+        assert!(data_block.is_valid_offset(49));
+        assert!(!data_block.is_valid_offset(50));
+        assert!(!data_block.is_valid_offset(100));
+    }
+}
diff --git a/crates/core/src/hfile/block_type.rs b/crates/core/src/hfile/block_type.rs
new file mode 100644
index 0000000..620fd81
--- /dev/null
+++ b/crates/core/src/hfile/block_type.rs
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! HFile block type definitions.
+
+use crate::hfile::error::{HFileError, Result};
+
+/// Length of block magic bytes
+pub const MAGIC_LENGTH: usize = 8;
+
+/// HFile block types with their magic byte sequences.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum HFileBlockType {
+    /// Data block containing key-value pairs
+    Data,
+    /// Leaf-level index block (multi-level index)
+    LeafIndex,
+    /// Meta block (e.g., bloom filter data)
+    Meta,
+    /// Intermediate-level index block (multi-level index)
+    IntermediateIndex,
+    /// Root-level index block
+    RootIndex,
+    /// File info block containing metadata key-value pairs
+    FileInfo,
+    /// HFile trailer
+    Trailer,
+}
+
+impl HFileBlockType {
+    /// Returns the magic bytes for this block type.
+    pub fn magic(&self) -> &'static [u8; MAGIC_LENGTH] {
+        match self {
+            HFileBlockType::Data => b"DATABLK*",
+            HFileBlockType::LeafIndex => b"IDXLEAF2",
+            HFileBlockType::Meta => b"METABLKc",
+            HFileBlockType::IntermediateIndex => b"IDXINTE2",
+            HFileBlockType::RootIndex => b"IDXROOT2",
+            HFileBlockType::FileInfo => b"FILEINF2",
+            HFileBlockType::Trailer => b"TRABLK\"$",
+        }
+    }
+
+    /// Parse block type from magic bytes.
+    pub fn from_magic(magic: &[u8]) -> Result<Self> {
+        if magic.len() < MAGIC_LENGTH {
+            return Err(HFileError::InvalidFormat(format!(
+                "Magic bytes too short: {} bytes",
+                magic.len()
+            )));
+        }
+
+        let magic_arr: &[u8; MAGIC_LENGTH] = magic[..MAGIC_LENGTH].try_into().unwrap();
+
+        match magic_arr {
+            b"DATABLK*" | b"DATABLKE" => Ok(HFileBlockType::Data),
+            b"IDXLEAF2" => Ok(HFileBlockType::LeafIndex),
+            b"METABLKc" => Ok(HFileBlockType::Meta),
+            b"IDXINTE2" => Ok(HFileBlockType::IntermediateIndex),
+            b"IDXROOT2" => Ok(HFileBlockType::RootIndex),
+            b"FILEINF2" => Ok(HFileBlockType::FileInfo),
+            b"TRABLK\"$" => Ok(HFileBlockType::Trailer),
+            _ => Err(HFileError::InvalidBlockMagic {
+                expected: "valid block magic".to_string(),
+                actual: String::from_utf8_lossy(magic_arr).to_string(),
+            }),
+        }
+    }
+
+    /// Check if the given bytes start with this block type's magic.
+    pub fn check_magic(&self, bytes: &[u8]) -> Result<()> {
+        if bytes.len() < MAGIC_LENGTH {
+            return Err(HFileError::InvalidFormat(format!(
+                "Buffer too short for magic check: {} bytes",
+                bytes.len()
+            )));
+        }
+
+        let expected = self.magic();
+        let actual = &bytes[..MAGIC_LENGTH];
+
+        if actual != expected {
+            return Err(HFileError::InvalidBlockMagic {
+                expected: String::from_utf8_lossy(expected).to_string(),
+                actual: String::from_utf8_lossy(actual).to_string(),
+            });
+        }
+
+        Ok(())
+    }
+}
+
+impl std::fmt::Display for HFileBlockType {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            HFileBlockType::Data => write!(f, "DATA"),
+            HFileBlockType::LeafIndex => write!(f, "LEAF_INDEX"),
+            HFileBlockType::Meta => write!(f, "META"),
+            HFileBlockType::IntermediateIndex => write!(f, "INTERMEDIATE_INDEX"),
+            HFileBlockType::RootIndex => write!(f, "ROOT_INDEX"),
+            HFileBlockType::FileInfo => write!(f, "FILE_INFO"),
+            HFileBlockType::Trailer => write!(f, "TRAILER"),
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_magic_bytes() {
+        assert_eq!(HFileBlockType::Data.magic(), b"DATABLK*");
+        assert_eq!(HFileBlockType::LeafIndex.magic(), b"IDXLEAF2");
+        assert_eq!(HFileBlockType::Meta.magic(), b"METABLKc");
+        assert_eq!(HFileBlockType::IntermediateIndex.magic(), b"IDXINTE2");
+        assert_eq!(HFileBlockType::RootIndex.magic(), b"IDXROOT2");
+        assert_eq!(HFileBlockType::FileInfo.magic(), b"FILEINF2");
+        assert_eq!(HFileBlockType::Trailer.magic(), b"TRABLK\"$");
+    }
+
+    #[test]
+    fn test_from_magic_data_block() {
+        let magic = b"DATABLK*";
+        assert_eq!(
+            HFileBlockType::from_magic(magic).unwrap(),
+            HFileBlockType::Data
+        );
+    }
+
+    #[test]
+    fn test_from_magic_data_block_encoded() {
+        // Encoded data block magic
+        let magic = b"DATABLKE";
+        assert_eq!(
+            HFileBlockType::from_magic(magic).unwrap(),
+            HFileBlockType::Data
+        );
+    }
+
+    #[test]
+    fn test_from_magic_all_types() {
+        assert_eq!(
+            HFileBlockType::from_magic(b"IDXLEAF2").unwrap(),
+            HFileBlockType::LeafIndex
+        );
+        assert_eq!(
+            HFileBlockType::from_magic(b"METABLKc").unwrap(),
+            HFileBlockType::Meta
+        );
+        assert_eq!(
+            HFileBlockType::from_magic(b"IDXINTE2").unwrap(),
+            HFileBlockType::IntermediateIndex
+        );
+        assert_eq!(
+            HFileBlockType::from_magic(b"IDXROOT2").unwrap(),
+            HFileBlockType::RootIndex
+        );
+        assert_eq!(
+            HFileBlockType::from_magic(b"FILEINF2").unwrap(),
+            HFileBlockType::FileInfo
+        );
+        assert_eq!(
+            HFileBlockType::from_magic(b"TRABLK\"$").unwrap(),
+            HFileBlockType::Trailer
+        );
+    }
+
+    #[test]
+    fn test_from_magic_too_short() {
+        let err = HFileBlockType::from_magic(b"DATA").unwrap_err();
+        assert!(matches!(err, HFileError::InvalidFormat(_)));
+    }
+
+    #[test]
+    fn test_from_magic_invalid() {
+        let err = HFileBlockType::from_magic(b"INVALID!").unwrap_err();
+        assert!(matches!(err, HFileError::InvalidBlockMagic { .. }));
+    }
+
+    #[test]
+    fn test_check_magic_success() {
+        let bytes = b"DATABLK*extra_data_here";
+        assert!(HFileBlockType::Data.check_magic(bytes).is_ok());
+    }
+
+    #[test]
+    fn test_check_magic_too_short() {
+        let err = HFileBlockType::Data.check_magic(b"DATA").unwrap_err();
+        assert!(matches!(err, HFileError::InvalidFormat(_)));
+    }
+
+    #[test]
+    fn test_check_magic_mismatch() {
+        let err = HFileBlockType::Data.check_magic(b"IDXROOT2").unwrap_err();
+        assert!(matches!(err, HFileError::InvalidBlockMagic { .. }));
+    }
+
+    #[test]
+    fn test_display() {
+        assert_eq!(format!("{}", HFileBlockType::Data), "DATA");
+        assert_eq!(format!("{}", HFileBlockType::LeafIndex), "LEAF_INDEX");
+        assert_eq!(format!("{}", HFileBlockType::Meta), "META");
+        assert_eq!(
+            format!("{}", HFileBlockType::IntermediateIndex),
+            "INTERMEDIATE_INDEX"
+        );
+        assert_eq!(format!("{}", HFileBlockType::RootIndex), "ROOT_INDEX");
+        assert_eq!(format!("{}", HFileBlockType::FileInfo), "FILE_INFO");
+        assert_eq!(format!("{}", HFileBlockType::Trailer), "TRAILER");
+    }
+}
diff --git a/crates/core/src/hfile/compression.rs b/crates/core/src/hfile/compression.rs
new file mode 100644
index 0000000..91d5cf6
--- /dev/null
+++ b/crates/core/src/hfile/compression.rs
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! Compression codec support for HFile blocks.
+
+use crate::hfile::error::{HFileError, Result};
+use flate2::read::GzDecoder;
+use std::io::Read;
+
+/// Compression codec IDs used in HFile.
+/// These IDs are stored in the HFile trailer and must not change.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
+pub enum CompressionCodec {
+    /// LZO compression (ID: 0)
+    Lzo = 0,
+    /// GZIP compression (ID: 1)
+    Gzip = 1,
+    /// No compression (ID: 2)
+    #[default]
+    None = 2,
+    /// Snappy compression (ID: 3)
+    Snappy = 3,
+    /// LZ4 compression (ID: 4)
+    Lz4 = 4,
+    /// BZIP2 compression (ID: 5)
+    Bzip2 = 5,
+    /// ZSTD compression (ID: 6)
+    Zstd = 6,
+}
+
+impl CompressionCodec {
+    /// Decode compression codec from ID stored in HFile.
+    pub fn from_id(id: u32) -> Result<Self> {
+        match id {
+            0 => Ok(CompressionCodec::Lzo),
+            1 => Ok(CompressionCodec::Gzip),
+            2 => Ok(CompressionCodec::None),
+            3 => Ok(CompressionCodec::Snappy),
+            4 => Ok(CompressionCodec::Lz4),
+            5 => Ok(CompressionCodec::Bzip2),
+            6 => Ok(CompressionCodec::Zstd),
+            _ => Err(HFileError::UnsupportedCompression(id)),
+        }
+    }
+
+    /// Decompress data using this codec.
+    ///
+    /// # Arguments
+    /// * `compressed_data` - The compressed data bytes
+    /// * `uncompressed_size` - Expected size of uncompressed data
+    ///
+    /// # Returns
+    /// Decompressed data as a byte vector
+    pub fn decompress(&self, compressed_data: &[u8], uncompressed_size: usize) -> Result<Vec<u8>> {
+        match self {
+            CompressionCodec::None => Ok(compressed_data.to_vec()),
+            CompressionCodec::Gzip => {
+                let mut decoder = GzDecoder::new(compressed_data);
+                let mut decompressed = Vec::with_capacity(uncompressed_size);
+                decoder.read_to_end(&mut decompressed).map_err(|e| {
+                    HFileError::DecompressionError(format!("GZIP decompression failed: {}", e))
+                })?;
+                Ok(decompressed)
+            }
+            CompressionCodec::Lzo => Err(HFileError::DecompressionError(
+                "LZO compression not yet supported".to_string(),
+            )),
+            CompressionCodec::Snappy => Err(HFileError::DecompressionError(
+                "Snappy compression not yet supported".to_string(),
+            )),
+            CompressionCodec::Lz4 => Err(HFileError::DecompressionError(
+                "LZ4 compression not yet supported".to_string(),
+            )),
+            CompressionCodec::Bzip2 => Err(HFileError::DecompressionError(
+                "BZIP2 compression not yet supported".to_string(),
+            )),
+            CompressionCodec::Zstd => Err(HFileError::DecompressionError(
+                "ZSTD compression not yet supported".to_string(),
+            )),
+        }
+    }
+}
diff --git a/crates/core/src/hfile/error.rs b/crates/core/src/hfile/error.rs
new file mode 100644
index 0000000..f69c72c
--- /dev/null
+++ b/crates/core/src/hfile/error.rs
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! HFile error types.
+
+use std::io;
+use thiserror::Error;
+
+pub type Result<T, E = HFileError> = std::result::Result<T, E>;
+
+#[derive(Error, Debug)]
+pub enum HFileError {
+    #[error("Invalid HFile: {0}")]
+    InvalidFormat(String),
+
+    #[error("Invalid block magic: expected {expected}, got {actual}")]
+    InvalidBlockMagic { expected: String, actual: String },
+
+    #[error("Unsupported HFile version: major={major}, minor={minor}")]
+    UnsupportedVersion { major: u32, minor: u32 },
+
+    #[error("Unsupported compression codec: {0}")]
+    UnsupportedCompression(u32),
+
+    #[error("HFiles with MVCC timestamps are not supported")]
+    UnsupportedMvccTimestamp,
+
+    #[error("Decompression error: {0}")]
+    DecompressionError(String),
+
+    #[error("Protobuf decode error: {0}")]
+    ProtobufError(#[from] prost::DecodeError),
+
+    #[error("IO error: {0}")]
+    IoError(#[from] io::Error),
+
+    #[error("Unexpected block type: expected {expected}, got {actual}")]
+    UnexpectedBlockType { expected: String, actual: String },
+
+    #[error("Backward seek not supported: current position is ahead of lookup key")]
+    BackwardSeekNotSupported,
+
+    #[error("Not seeked: must call seek_to() before reading key-value pairs")]
+    NotSeeked,
+
+    #[error("End of file reached")]
+    Eof,
+}
diff --git a/crates/core/src/hfile/key.rs b/crates/core/src/hfile/key.rs
new file mode 100644
index 0000000..51e664a
--- /dev/null
+++ b/crates/core/src/hfile/key.rs
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! Key and KeyValue types for HFile.
+
+use std::cmp::Ordering;
+
+/// Size constants
+const SIZEOF_INT32: usize = 4;
+const SIZEOF_INT16: usize = 2;
+
+/// Key offset after key length (int32) and value length (int32)
+pub const KEY_VALUE_HEADER_SIZE: usize = SIZEOF_INT32 * 2;
+
+/// A key in HFile format.
+///
+/// In HFile, keys have the following structure:
+/// - 2 bytes: key content length (short)
+/// - N bytes: key content
+/// - Additional bytes: other information (not used by Hudi)
+///
+/// For comparison and hashing, only the key content is used.
+#[derive(Debug, Clone)]
+pub struct Key {
+    /// Raw key bytes including the length prefix
+    bytes: Vec<u8>,
+    /// Offset to the start of the key within bytes
+    offset: usize,
+    /// Total length of the key part (including length prefix and other info)
+    length: usize,
+}
+
+impl Key {
+    /// Create a new Key from bytes at the given offset with the specified length.
+    pub fn new(bytes: &[u8], offset: usize, length: usize) -> Self {
+        Self {
+            bytes: bytes.to_vec(),
+            offset,
+            length,
+        }
+    }
+
+    /// Create a Key from raw bytes (the entire key).
+    pub fn from_bytes(bytes: Vec<u8>) -> Self {
+        let length = bytes.len();
+        Self {
+            bytes,
+            offset: 0,
+            length,
+        }
+    }
+
+    /// Returns the offset to the key content (after length prefix).
+    pub fn content_offset(&self) -> usize {
+        self.offset + SIZEOF_INT16
+    }
+
+    /// Returns the length of the key content.
+    pub fn content_length(&self) -> usize {
+        if self.bytes.len() < self.offset + SIZEOF_INT16 {
+            return 0;
+        }
+        let len_bytes = &self.bytes[self.offset..self.offset + SIZEOF_INT16];
+        i16::from_be_bytes([len_bytes[0], len_bytes[1]]) as usize
+    }
+
+    /// Returns the key content as a byte slice.
+    pub fn content(&self) -> &[u8] {
+        let start = self.content_offset();
+        let len = self.content_length();
+        if start + len > self.bytes.len() {
+            return &[];
+        }
+        &self.bytes[start..start + len]
+    }
+
+    /// Returns the key content as a UTF-8 string.
+    pub fn content_as_str(&self) -> Result<&str, std::str::Utf8Error> {
+        std::str::from_utf8(self.content())
+    }
+
+    /// Returns the total length of the key part.
+    pub fn length(&self) -> usize {
+        self.length
+    }
+
+    /// Returns the raw bytes.
+    pub fn bytes(&self) -> &[u8] {
+        &self.bytes
+    }
+}
+
+impl PartialEq for Key {
+    fn eq(&self, other: &Self) -> bool {
+        self.content() == other.content()
+    }
+}
+
+impl Eq for Key {}
+
+impl PartialOrd for Key {
+    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
+impl Ord for Key {
+    fn cmp(&self, other: &Self) -> Ordering {
+        self.content().cmp(other.content())
+    }
+}
+
+impl std::hash::Hash for Key {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        self.content().hash(state);
+    }
+}
+
+impl std::fmt::Display for Key {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self.content_as_str() {
+            Ok(s) => write!(f, "Key{{{}}}", s),
+            Err(_) => write!(f, "Key{{<binary>}}"),
+        }
+    }
+}
+
+/// A UTF-8 string key without length prefix.
+///
+/// Used for lookup keys and meta block keys where the key is just the content
+/// without the HFile key structure.
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct Utf8Key {
+    content: String,
+}
+
+impl Utf8Key {
+    /// Create a new UTF-8 key from a string.
+    pub fn new(s: impl Into<String>) -> Self {
+        Self { content: s.into() }
+    }
+
+    /// Returns the key content as bytes.
+    pub fn as_bytes(&self) -> &[u8] {
+        self.content.as_bytes()
+    }
+
+    /// Returns the key content as a string slice.
+    pub fn as_str(&self) -> &str {
+        &self.content
+    }
+}
+
+impl PartialOrd for Utf8Key {
+    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
+impl Ord for Utf8Key {
+    fn cmp(&self, other: &Self) -> Ordering {
+        self.content.as_bytes().cmp(other.content.as_bytes())
+    }
+}
+
+impl From<&str> for Utf8Key {
+    fn from(s: &str) -> Self {
+        Utf8Key::new(s)
+    }
+}
+
+impl From<String> for Utf8Key {
+    fn from(s: String) -> Self {
+        Utf8Key::new(s)
+    }
+}
+
+impl std::fmt::Display for Utf8Key {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "Utf8Key{{{}}}", self.content)
+    }
+}
+
+/// A key-value pair from HFile data block.
+///
+/// The HFile key-value format is:
+/// - 4 bytes: key length (int32)
+/// - 4 bytes: value length (int32)
+/// - N bytes: key (structured as Key)
+/// - M bytes: value
+/// - 1 byte: MVCC timestamp version (always 0 for Hudi)
+#[derive(Debug, Clone)]
+pub struct KeyValue {
+    /// The backing byte array containing the entire key-value record
+    bytes: Vec<u8>,
+    /// Offset to the start of this record in bytes
+    offset: usize,
+    /// The parsed key
+    key: Key,
+    /// Length of key part
+    key_length: usize,
+    /// Length of value part
+    value_length: usize,
+}
+
+impl KeyValue {
+    /// Parse a KeyValue from bytes at the given offset.
+    pub fn parse(bytes: &[u8], offset: usize) -> Self {
+        let key_length = i32::from_be_bytes([
+            bytes[offset],
+            bytes[offset + 1],
+            bytes[offset + 2],
+            bytes[offset + 3],
+        ]) as usize;
+
+        let value_length = i32::from_be_bytes([
+            bytes[offset + 4],
+            bytes[offset + 5],
+            bytes[offset + 6],
+            bytes[offset + 7],
+        ]) as usize;
+
+        let key_offset = offset + KEY_VALUE_HEADER_SIZE;
+        let key = Key::new(bytes, key_offset, key_length);
+
+        Self {
+            bytes: bytes.to_vec(),
+            offset,
+            key,
+            key_length,
+            value_length,
+        }
+    }
+
+    /// Returns the key.
+    pub fn key(&self) -> &Key {
+        &self.key
+    }
+
+    /// Returns the value as a byte slice.
+    pub fn value(&self) -> &[u8] {
+        let value_offset = self.offset + KEY_VALUE_HEADER_SIZE + self.key_length;
+        &self.bytes[value_offset..value_offset + self.value_length]
+    }
+
+    /// Returns the total size of this key-value record including MVCC timestamp.
+    pub fn record_size(&self) -> usize {
+        // header (8) + key + value + mvcc timestamp (1)
+        KEY_VALUE_HEADER_SIZE + self.key_length + self.value_length + 1
+    }
+
+    /// Returns the key length.
+    pub fn key_length(&self) -> usize {
+        self.key_length
+    }
+
+    /// Returns the value length.
+    pub fn value_length(&self) -> usize {
+        self.value_length
+    }
+}
+
+impl std::fmt::Display for KeyValue {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "KeyValue{{key={}}}", self.key)
+    }
+}
+
+/// Compare a Key with a Utf8Key (for lookups).
+///
+/// This compares the key content bytes lexicographically.
+pub fn compare_keys(key: &Key, lookup: &Utf8Key) -> Ordering {
+    key.content().cmp(lookup.as_bytes())
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_utf8_key_comparison() {
+        let k1 = Utf8Key::new("abc");
+        let k2 = Utf8Key::new("abd");
+        let k3 = Utf8Key::new("abc");
+
+        assert!(k1 < k2);
+        assert_eq!(k1, k3);
+    }
+
+    #[test]
+    fn test_utf8_key_from_str() {
+        let k1: Utf8Key = "test".into();
+        let k2 = Utf8Key::from("test");
+        assert_eq!(k1, k2);
+        assert_eq!(k1.as_str(), "test");
+        assert_eq!(k1.as_bytes(), b"test");
+    }
+
+    #[test]
+    fn test_utf8_key_from_string() {
+        let s = String::from("hello");
+        let k: Utf8Key = s.into();
+        assert_eq!(k.as_str(), "hello");
+    }
+
+    #[test]
+    fn test_utf8_key_display() {
+        let k = Utf8Key::new("mykey");
+        assert_eq!(format!("{}", k), "Utf8Key{mykey}");
+    }
+
+    #[test]
+    fn test_key_new() {
+        // Create a key with length prefix: 2 bytes for length (0, 4) + 4 bytes content "test"
+        let bytes = vec![0, 4, b't', b'e', b's', b't', 0, 0]; // extra bytes at end
+        let key = Key::new(&bytes, 0, 6);
+
+        assert_eq!(key.content_length(), 4);
+        assert_eq!(key.content(), b"test");
+        assert_eq!(key.content_as_str().unwrap(), "test");
+        assert_eq!(key.length(), 6);
+    }
+
+    #[test]
+    fn test_key_from_bytes() {
+        let bytes = vec![0, 3, b'a', b'b', b'c'];
+        let key = Key::from_bytes(bytes);
+
+        assert_eq!(key.content_length(), 3);
+        assert_eq!(key.content(), b"abc");
+    }
+
+    #[test]
+    fn test_key_content_empty() {
+        // Test with buffer too small for length prefix
+        let bytes = vec![0];
+        let key = Key::new(&bytes, 0, 1);
+        assert_eq!(key.content_length(), 0);
+    }
+
+    #[test]
+    fn test_key_content_out_of_bounds() {
+        // Key claims content length of 10 but only has 3 bytes
+        let bytes = vec![0, 10, b'a', b'b', b'c'];
+        let key = Key::from_bytes(bytes);
+        // content() should return empty slice when out of bounds
+        assert_eq!(key.content(), &[] as &[u8]);
+    }
+
+    #[test]
+    fn test_key_equality() {
+        let bytes1 = vec![0, 3, b'a', b'b', b'c'];
+        let bytes2 = vec![0, 3, b'a', b'b', b'c'];
+        let bytes3 = vec![0, 3, b'x', b'y', b'z'];
+
+        let k1 = Key::from_bytes(bytes1);
+        let k2 = Key::from_bytes(bytes2);
+        let k3 = Key::from_bytes(bytes3);
+
+        assert_eq!(k1, k2);
+        assert_ne!(k1, k3);
+    }
+
+    #[test]
+    fn test_key_ordering() {
+        let k1 = Key::from_bytes(vec![0, 3, b'a', b'b', b'c']);
+        let k2 = Key::from_bytes(vec![0, 3, b'a', b'b', b'd']);
+        let k3 = Key::from_bytes(vec![0, 3, b'a', b'b', b'c']);
+
+        assert!(k1 < k2);
+        assert_eq!(k1.cmp(&k3), Ordering::Equal);
+    }
+
+    #[test]
+    fn test_key_hash() {
+        use std::collections::HashSet;
+
+        let k1 = Key::from_bytes(vec![0, 3, b'a', b'b', b'c']);
+        let k2 = Key::from_bytes(vec![0, 3, b'a', b'b', b'c']);
+
+        let mut set = HashSet::new();
+        set.insert(k1);
+        assert!(set.contains(&k2));
+    }
+
+    #[test]
+    fn test_key_display() {
+        let k1 = Key::from_bytes(vec![0, 4, b't', b'e', b's', b't']);
+        assert_eq!(format!("{}", k1), "Key{test}");
+
+        // Binary key (invalid UTF-8)
+        let k2 = Key::from_bytes(vec![0, 3, 0xFF, 0xFE, 0xFD]);
+        assert_eq!(format!("{}", k2), "Key{<binary>}");
+    }
+
+    #[test]
+    fn test_key_bytes() {
+        let original = vec![0, 3, b'a', b'b', b'c'];
+        let key = Key::from_bytes(original.clone());
+        assert_eq!(key.bytes(), &original);
+    }
+
+    #[test]
+    fn test_keyvalue_parse() {
+        // Build a KeyValue structure:
+        // 4 bytes key length (11) + 4 bytes value length (5)
+        // + key: 2 bytes content length (4) + 4 bytes "test" + 5 extra key bytes
+        // + value: 5 bytes "value"
+        // + 1 byte MVCC timestamp
+        let mut bytes = vec![];
+        bytes.extend_from_slice(&11i32.to_be_bytes()); // key length
+        bytes.extend_from_slice(&5i32.to_be_bytes()); // value length
+        bytes.extend_from_slice(&[0, 4]); // key content length (4)
+        bytes.extend_from_slice(b"test"); // key content
+        bytes.extend_from_slice(&[0, 0, 0, 0, 0]); // extra key bytes
+        bytes.extend_from_slice(b"value"); // value
+        bytes.push(0); // MVCC timestamp
+
+        let kv = KeyValue::parse(&bytes, 0);
+
+        assert_eq!(kv.key().content_as_str().unwrap(), "test");
+        assert_eq!(kv.value(), b"value");
+        assert_eq!(kv.key_length(), 11);
+        assert_eq!(kv.value_length(), 5);
+        assert_eq!(kv.record_size(), 8 + 11 + 5 + 1); // header + key + value + mvcc
+    }
+
+    #[test]
+    fn test_keyvalue_display() {
+        let mut bytes = vec![];
+        bytes.extend_from_slice(&6i32.to_be_bytes()); // key length
+        bytes.extend_from_slice(&3i32.to_be_bytes()); // value length
+        bytes.extend_from_slice(&[0, 4]); // key content length
+        bytes.extend_from_slice(b"test"); // key content
+        bytes.extend_from_slice(b"val"); // value
+        bytes.push(0); // MVCC
+
+        let kv = KeyValue::parse(&bytes, 0);
+        assert!(format!("{}", kv).contains("test"));
+    }
+
+    #[test]
+    fn test_compare_keys() {
+        let key = Key::from_bytes(vec![0, 3, b'a', b'b', b'c']);
+        let lookup1 = Utf8Key::new("abc");
+        let lookup2 = Utf8Key::new("abd");
+        let lookup3 = Utf8Key::new("abb");
+
+        assert_eq!(compare_keys(&key, &lookup1), Ordering::Equal);
+        assert_eq!(compare_keys(&key, &lookup2), Ordering::Less);
+        assert_eq!(compare_keys(&key, &lookup3), Ordering::Greater);
+    }
+}
diff --git a/crates/core/src/hfile/mod.rs b/crates/core/src/hfile/mod.rs
new file mode 100644
index 0000000..67c9370
--- /dev/null
+++ b/crates/core/src/hfile/mod.rs
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! HFile reader implementation.
+//!
+//! HFile is an SSTable based row-oriented file format, optimized for
+//! range scans and point lookups. HFile is used as the base file format
+//! for Hudi's metadata table.
+//!
+//! See [Hudi's HFile format specification](https://github.com/apache/hudi/blob/master/hudi-io/hfile_format.md).
+//!
+//! # Example
+//! ```ignore
+//! use hudi_core::hfile::{HFileReader, Utf8Key, SeekResult};
+//!
+//! let bytes = std::fs::read("data.hfile")?;
+//! let mut reader = HFileReader::new(bytes)?;
+//!
+//! // Iterate all entries
+//! for result in reader.iter()? {
+//!     let kv = result?;
+//!     println!("{}: {:?}", kv.key(), kv.value());
+//! }
+//! ```
+
+mod block;
+mod block_type;
+mod compression;
+mod error;
+mod key;
+mod proto;
+mod reader;
+mod record;
+mod trailer;
+
+pub use block::BlockIndexEntry;
+pub use block_type::HFileBlockType;
+pub use error::{HFileError, Result};
+pub use key::{Key, KeyValue, Utf8Key};
+pub use reader::{HFileReader, HFileRecordIterator, SeekResult};
+pub use record::HFileRecord;
diff --git a/crates/core/src/hfile/proto.rs b/crates/core/src/hfile/proto.rs
new file mode 100644
index 0000000..6a70a04
--- /dev/null
+++ b/crates/core/src/hfile/proto.rs
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! Protobuf message definitions for HFile format.
+//!
+//! These are manually defined to match the HBase HFile protobuf schema.
+//! The definitions correspond to:
+//! - TrailerProto: HFile trailer metadata
+//! - InfoProto/BytesBytesPair: File info key-value pairs
+
+use prost::Message;
+
+/// HFile trailer protobuf message.
+///
+/// Contains metadata about the HFile structure including offsets,
+/// counts, and compression settings.
+#[derive(Clone, PartialEq, Message)]
+pub struct TrailerProto {
+    #[prost(uint64, optional, tag = "1")]
+    pub file_info_offset: Option<u64>,
+
+    #[prost(uint64, optional, tag = "2")]
+    pub load_on_open_data_offset: Option<u64>,
+
+    #[prost(uint64, optional, tag = "3")]
+    pub uncompressed_data_index_size: Option<u64>,
+
+    #[prost(uint64, optional, tag = "4")]
+    pub total_uncompressed_bytes: Option<u64>,
+
+    #[prost(uint32, optional, tag = "5")]
+    pub data_index_count: Option<u32>,
+
+    #[prost(uint32, optional, tag = "6")]
+    pub meta_index_count: Option<u32>,
+
+    #[prost(uint64, optional, tag = "7")]
+    pub entry_count: Option<u64>,
+
+    #[prost(uint32, optional, tag = "8")]
+    pub num_data_index_levels: Option<u32>,
+
+    #[prost(uint64, optional, tag = "9")]
+    pub first_data_block_offset: Option<u64>,
+
+    #[prost(uint64, optional, tag = "10")]
+    pub last_data_block_offset: Option<u64>,
+
+    #[prost(string, optional, tag = "11")]
+    pub comparator_class_name: Option<String>,
+
+    #[prost(uint32, optional, tag = "12")]
+    pub compression_codec: Option<u32>,
+
+    #[prost(bytes, optional, tag = "13")]
+    pub encryption_key: Option<Vec<u8>>,
+}
+
+/// A key-value pair with both key and value as byte arrays.
+#[derive(Clone, PartialEq, Message)]
+pub struct BytesBytesPair {
+    #[prost(bytes, required, tag = "1")]
+    pub first: Vec<u8>,
+
+    #[prost(bytes, required, tag = "2")]
+    pub second: Vec<u8>,
+}
+
+/// File info protobuf message containing a list of key-value pairs.
+#[derive(Clone, PartialEq, Message)]
+pub struct InfoProto {
+    #[prost(message, repeated, tag = "1")]
+    pub map_entry: Vec<BytesBytesPair>,
+}
diff --git a/crates/core/src/hfile/reader.rs b/crates/core/src/hfile/reader.rs
new file mode 100644
index 0000000..0195581
--- /dev/null
+++ b/crates/core/src/hfile/reader.rs
@@ -0,0 +1,1903 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! HFile reader implementation.
+
+use std::collections::BTreeMap;
+
+use crate::hfile::block::{
+    read_var_long, var_long_size_on_disk, BlockIndexEntry, DataBlock, HFileBlock, BLOCK_HEADER_SIZE,
+};
+use crate::hfile::block_type::HFileBlockType;
+use crate::hfile::compression::CompressionCodec;
+use crate::hfile::error::{HFileError, Result};
+use crate::hfile::key::{compare_keys, Key, KeyValue, Utf8Key};
+use crate::hfile::proto::InfoProto;
+use crate::hfile::record::HFileRecord;
+use crate::hfile::trailer::HFileTrailer;
+use prost::Message;
+
+/// Magic bytes indicating protobuf format in file info block
+const PBUF_MAGIC: &[u8; 4] = b"PBUF";
+
+/// Seek result codes (matching Java implementation)
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum SeekResult {
+    /// Lookup key is before the fake first key of a block but >= actual first key
+    BeforeBlockFirstKey = -2,
+    /// Lookup key is before the first key of the file
+    BeforeFileFirstKey = -1,
+    /// Exact match found
+    Found = 0,
+    /// Key not found but within range; cursor points to greatest key < lookup
+    InRange = 1,
+    /// Key is greater than the last key; EOF reached
+    Eof = 2,
+}
+
+/// File info key for last key in the file
+const FILE_INFO_LAST_KEY: &str = "hfile.LASTKEY";
+/// File info key for key-value version
+const FILE_INFO_KEY_VALUE_VERSION: &str = "KEY_VALUE_VERSION";
+/// File info key for max MVCC timestamp
+const FILE_INFO_MAX_MVCC_TS: &str = "MAX_MEMSTORE_TS_KEY";
+
+/// Key-value version indicating MVCC timestamp support
+const KEY_VALUE_VERSION_WITH_MVCC_TS: i32 = 1;
+
+/// HFile reader that supports sequential reads and seeks.
+pub struct HFileReader {
+    /// Raw file bytes
+    bytes: Vec<u8>,
+    /// Parsed trailer
+    trailer: HFileTrailer,
+    /// Compression codec from trailer
+    codec: CompressionCodec,
+    /// Data block index (first key -> entry)
+    data_block_index: BTreeMap<Key, BlockIndexEntry>,
+    /// Meta block index (name -> entry)
+    meta_block_index: BTreeMap<String, BlockIndexEntry>,
+    /// File info map
+    file_info: BTreeMap<String, Vec<u8>>,
+    /// Last key in the file
+    last_key: Option<Key>,
+    /// Current cursor position
+    cursor: Cursor,
+    /// Currently loaded data block
+    current_block: Option<DataBlock>,
+    /// Current block's index entry
+    current_block_entry: Option<BlockIndexEntry>,
+}
+
+/// Cursor tracking current position in the file.
+#[derive(Debug, Clone, Default)]
+struct Cursor {
+    /// Absolute offset in file
+    offset: usize,
+    /// Cached key-value at current position
+    cached_kv: Option<KeyValue>,
+    /// Whether we've reached EOF
+    eof: bool,
+    /// Whether seek has been called
+    seeked: bool,
+}
+
+impl HFileReader {
+    /// Create a new HFile reader from raw bytes.
+    pub fn new(bytes: Vec<u8>) -> Result<Self> {
+        let trailer = HFileTrailer::read(&bytes)?;
+        let codec = trailer.compression_codec;
+
+        let mut reader = Self {
+            bytes,
+            trailer,
+            codec,
+            data_block_index: BTreeMap::new(),
+            meta_block_index: BTreeMap::new(),
+            file_info: BTreeMap::new(),
+            last_key: None,
+            cursor: Cursor::default(),
+            current_block: None,
+            current_block_entry: None,
+        };
+
+        reader.initialize_metadata()?;
+        Ok(reader)
+    }
+
+    /// Initialize metadata by reading index blocks and file info.
+    fn initialize_metadata(&mut self) -> Result<()> {
+        // Read the "load-on-open" section starting from load_on_open_data_offset
+        let start = self.trailer.load_on_open_data_offset as usize;
+
+        // Read root data index block
+        let (data_index, offset) = self.read_root_index_block(start)?;
+        self.data_block_index = data_index;
+
+        // Handle multi-level index if needed
+        if self.trailer.num_data_index_levels > 1 {
+            self.load_multi_level_index()?;
+        }
+
+        // Read meta index block
+        let (meta_index, offset) = self.read_meta_index_block(offset)?;
+        self.meta_block_index = meta_index;
+
+        // Read file info block
+        self.read_file_info_block(offset)?;
+
+        // Parse last key from file info
+        if let Some(last_key_bytes) = self.file_info.get(FILE_INFO_LAST_KEY) {
+            self.last_key = Some(Key::from_bytes(last_key_bytes.clone()));
+        }
+
+        // Check MVCC timestamp support
+        self.check_mvcc_support()?;
+
+        Ok(())
+    }
+
+    /// Check if the file uses MVCC timestamps (not supported).
+    fn check_mvcc_support(&self) -> Result<()> {
+        if let Some(version_bytes) = self.file_info.get(FILE_INFO_KEY_VALUE_VERSION) {
+            if version_bytes.len() >= 4 {
+                let version = i32::from_be_bytes([
+                    version_bytes[0],
+                    version_bytes[1],
+                    version_bytes[2],
+                    version_bytes[3],
+                ]);
+                if version == KEY_VALUE_VERSION_WITH_MVCC_TS {
+                    if let Some(ts_bytes) = self.file_info.get(FILE_INFO_MAX_MVCC_TS) {
+                        if ts_bytes.len() >= 8 {
+                            let max_ts = i64::from_be_bytes([
+                                ts_bytes[0],
+                                ts_bytes[1],
+                                ts_bytes[2],
+                                ts_bytes[3],
+                                ts_bytes[4],
+                                ts_bytes[5],
+                                ts_bytes[6],
+                                ts_bytes[7],
+                            ]);
+                            if max_ts > 0 {
+                                return Err(HFileError::UnsupportedMvccTimestamp);
+                            }
+                        }
+                    }
+                }
+            }
+        }
+        Ok(())
+    }
+
+    /// Read root index block and return the index entries.
+    fn read_root_index_block(
+        &self,
+        start: usize,
+    ) -> Result<(BTreeMap<Key, BlockIndexEntry>, usize)> {
+        let block = HFileBlock::parse(&self.bytes[start..], self.codec)?;
+        if block.block_type() != HFileBlockType::RootIndex {
+            return Err(HFileError::UnexpectedBlockType {
+                expected: HFileBlockType::RootIndex.to_string(),
+                actual: block.block_type().to_string(),
+            });
+        }
+
+        let entries = self.parse_root_index_entries(
+            &block.data,
+            self.trailer.data_index_count as usize,
+            false,
+        )?;
+        let next_offset = start + block.header.on_disk_size_with_header();
+
+        // Convert entries to BTreeMap
+        let mut index_map = BTreeMap::new();
+        for i in 0..entries.len() {
+            let entry = &entries[i];
+            let next_key = if i + 1 < entries.len() {
+                Some(entries[i + 1].first_key.clone())
+            } else {
+                None
+            };
+            index_map.insert(
+                entry.first_key.clone(),
+                BlockIndexEntry::new(entry.first_key.clone(), next_key, entry.offset, entry.size),
+            );
+        }
+
+        Ok((index_map, next_offset))
+    }
+
+    /// Load multi-level data block index (BFS traversal).
+    fn load_multi_level_index(&mut self) -> Result<()> {
+        let mut levels_remaining = self.trailer.num_data_index_levels - 1;
+        let mut current_entries: Vec<BlockIndexEntry> =
+            self.data_block_index.values().cloned().collect();
+
+        while levels_remaining > 0 {
+            let mut next_level_entries = Vec::new();
+
+            for entry in &current_entries {
+                let block = self.read_block_at(entry.offset as usize, entry.size as usize)?;
+
+                let entries = self.parse_leaf_index_entries(&block.data)?;
+
+                next_level_entries.extend(entries);
+            }
+
+            current_entries = next_level_entries;
+            levels_remaining -= 1;
+        }
+
+        // Build final index map from leaf entries
+        let mut index_map = BTreeMap::new();
+        for i in 0..current_entries.len() {
+            let entry = &current_entries[i];
+            let next_key = if i + 1 < current_entries.len() {
+                Some(current_entries[i + 1].first_key.clone())
+            } else {
+                None
+            };
+            index_map.insert(
+                entry.first_key.clone(),
+                BlockIndexEntry::new(entry.first_key.clone(), next_key, entry.offset, entry.size),
+            );
+        }
+
+        self.data_block_index = index_map;
+        Ok(())
+    }
+
+    /// Parse root index entries from block data.
+    fn parse_root_index_entries(
+        &self,
+        data: &[u8],
+        num_entries: usize,
+        content_key_only: bool,
+    ) -> Result<Vec<BlockIndexEntry>> {
+        let mut entries = Vec::with_capacity(num_entries);
+        let mut offset = 0;
+
+        for _ in 0..num_entries {
+            // Read offset (8 bytes)
+            let block_offset = i64::from_be_bytes([
+                data[offset],
+                data[offset + 1],
+                data[offset + 2],
+                data[offset + 3],
+                data[offset + 4],
+                data[offset + 5],
+                data[offset + 6],
+                data[offset + 7],
+            ]) as u64;
+            offset += 8;
+
+            // Read size (4 bytes)
+            let block_size = i32::from_be_bytes([
+                data[offset],
+                data[offset + 1],
+                data[offset + 2],
+                data[offset + 3],
+            ]) as u32;
+            offset += 4;
+
+            // Read key length (varint)
+            let var_len_size = var_long_size_on_disk(data, offset);
+            let (key_length, _) = read_var_long(data, offset);
+            offset += var_len_size;
+
+            // Read key bytes
+            let key_bytes = data[offset..offset + key_length as usize].to_vec();
+            offset += key_length as usize;
+
+            let key = if content_key_only {
+                // For meta index: key is just the content
+                Key::from_bytes(key_bytes)
+            } else {
+                // For data index: key has structure (length prefix + content + other info)
+                Key::new(&key_bytes, 0, key_bytes.len())
+            };
+
+            entries.push(BlockIndexEntry::new(key, None, block_offset, block_size));
+        }
+
+        Ok(entries)
+    }
+
+    /// Parse leaf index entries from block data.
+    fn parse_leaf_index_entries(&self, data: &[u8]) -> Result<Vec<BlockIndexEntry>> {
+        let mut entries = Vec::new();
+        let mut offset = 0;
+
+        // Read number of entries (4 bytes)
+        let num_entries = i32::from_be_bytes([data[0], data[1], data[2], data[3]]) as usize;
+        offset += 4;
+
+        // Read secondary index (offsets to entries)
+        let mut relative_offsets = Vec::with_capacity(num_entries + 1);
+        for _ in 0..=num_entries {
+            let rel_offset = i32::from_be_bytes([
+                data[offset],
+                data[offset + 1],
+                data[offset + 2],
+                data[offset + 3],
+            ]) as usize;
+            relative_offsets.push(rel_offset);
+            offset += 4;
+        }
+
+        let base_offset = offset;
+
+        // Read entries
+        for i in 0..num_entries {
+            // Read offset (8 bytes)
+            let block_offset = i64::from_be_bytes([
+                data[offset],
+                data[offset + 1],
+                data[offset + 2],
+                data[offset + 3],
+                data[offset + 4],
+                data[offset + 5],
+                data[offset + 6],
+                data[offset + 7],
+            ]) as u64;
+
+            // Read size (4 bytes)
+            let block_size = i32::from_be_bytes([
+                data[offset + 8],
+                data[offset + 9],
+                data[offset + 10],
+                data[offset + 11],
+            ]) as u32;
+
+            // Key is from offset+12 to next entry
+            let key_start = offset + 12;
+            let next_entry_start = base_offset + relative_offsets[i + 1];
+            let key_length = next_entry_start - key_start;
+
+            let key_bytes = data[key_start..key_start + key_length].to_vec();
+            let key = Key::new(&key_bytes, 0, key_bytes.len());
+
+            entries.push(BlockIndexEntry::new(key, None, block_offset, block_size));
+            offset = next_entry_start;
+        }
+
+        Ok(entries)
+    }
+
+    /// Read meta index block.
+    fn read_meta_index_block(
+        &self,
+        start: usize,
+    ) -> Result<(BTreeMap<String, BlockIndexEntry>, usize)> {
+        let block = HFileBlock::parse(&self.bytes[start..], self.codec)?;
+        if block.block_type() != HFileBlockType::RootIndex {
+            return Err(HFileError::UnexpectedBlockType {
+                expected: HFileBlockType::RootIndex.to_string(),
+                actual: block.block_type().to_string(),
+            });
+        }
+
+        let entries = self.parse_root_index_entries(
+            &block.data,
+            self.trailer.meta_index_count as usize,
+            true,
+        )?;
+        let next_offset = start + block.header.on_disk_size_with_header();
+
+        // Convert to string-keyed map
+        let mut index_map = BTreeMap::new();
+        for entry in entries {
+            let key_str = String::from_utf8_lossy(entry.first_key.content()).to_string();
+            index_map.insert(key_str, entry);
+        }
+
+        Ok((index_map, next_offset))
+    }
+
+    /// Read file info block.
+    fn read_file_info_block(&mut self, start: usize) -> Result<()> {
+        let block = HFileBlock::parse(&self.bytes[start..], self.codec)?;
+        if block.block_type() != HFileBlockType::FileInfo {
+            return Err(HFileError::UnexpectedBlockType {
+                expected: HFileBlockType::FileInfo.to_string(),
+                actual: block.block_type().to_string(),
+            });
+        }
+
+        // Check PBUF magic
+        if block.data.len() < 4 || &block.data[0..4] != PBUF_MAGIC {
+            return Err(HFileError::InvalidFormat(
+                "File info block missing PBUF magic".to_string(),
+            ));
+        }
+
+        // Parse protobuf (length-delimited after magic)
+        let proto_data = &block.data[4..];
+        let (length, consumed) = read_varint(proto_data);
+        let info_proto = InfoProto::decode(&proto_data[consumed..consumed + length as usize])?;
+
+        // Build file info map
+        for entry in info_proto.map_entry {
+            let key = String::from_utf8_lossy(&entry.first).to_string();
+            self.file_info.insert(key, entry.second);
+        }
+
+        Ok(())
+    }
+
+    /// Read a block at the given offset and size.
+    fn read_block_at(&self, offset: usize, size: usize) -> Result<HFileBlock> {
+        HFileBlock::parse(&self.bytes[offset..offset + size], self.codec)
+    }
+
+    /// Get the number of key-value entries in the file.
+    pub fn num_entries(&self) -> u64 {
+        self.trailer.entry_count
+    }
+
+    /// Get file info value by key.
+    pub fn get_file_info(&self, key: &str) -> Option<&[u8]> {
+        self.file_info.get(key).map(|v| v.as_slice())
+    }
+
+    /// Get meta block content by name.
+    pub fn get_meta_block(&self, name: &str) -> Result<Option<Vec<u8>>> {
+        let entry = match self.meta_block_index.get(name) {
+            Some(e) => e,
+            None => return Ok(None),
+        };
+
+        let block = self.read_block_at(entry.offset as usize, entry.size as usize)?;
+        if block.block_type() != HFileBlockType::Meta {
+            return Err(HFileError::UnexpectedBlockType {
+                expected: HFileBlockType::Meta.to_string(),
+                actual: block.block_type().to_string(),
+            });
+        }
+
+        Ok(Some(block.data))
+    }
+
+    /// Seek to the beginning of the file.
+    pub fn seek_to_first(&mut self) -> Result<bool> {
+        if self.trailer.entry_count == 0 {
+            self.cursor.eof = true;
+            self.cursor.seeked = true;
+            return Ok(false);
+        }
+
+        // Get first data block
+        let first_entry = match self.data_block_index.first_key_value() {
+            Some((_, entry)) => entry.clone(),
+            None => {
+                self.cursor.eof = true;
+                self.cursor.seeked = true;
+                return Ok(false);
+            }
+        };
+
+        self.current_block_entry = Some(first_entry.clone());
+        self.load_data_block(&first_entry)?;
+
+        self.cursor.offset = first_entry.offset as usize + BLOCK_HEADER_SIZE;
+        self.cursor.cached_kv = None;
+        self.cursor.eof = false;
+        self.cursor.seeked = true;
+
+        Ok(true)
+    }
+
+    /// Seek to the given key.
+    pub fn seek_to(&mut self, lookup_key: &Utf8Key) -> Result<SeekResult> {
+        if !self.cursor.seeked {
+            self.seek_to_first()?;
+        }
+
+        if self.trailer.entry_count == 0 {
+            return Ok(SeekResult::Eof);
+        }
+
+        // Get current key-value
+        let current_kv = match self.get_key_value()? {
+            Some(kv) => kv,
+            None => return Ok(SeekResult::Eof),
+        };
+
+        let cmp_current = compare_keys(current_kv.key(), lookup_key);
+
+        match cmp_current {
+            std::cmp::Ordering::Equal => Ok(SeekResult::Found),
+            std::cmp::Ordering::Greater => {
+                // Current key > lookup key: backward seek
+                // Check if we're at the first key of a block and lookup >= fake first key
+                if let Some(entry) = &self.current_block_entry {
+                    if self.is_at_first_key_of_block()
+                        && compare_keys(&entry.first_key, lookup_key) != std::cmp::Ordering::Greater
+                    {
+                        return Ok(SeekResult::BeforeBlockFirstKey);
+                    }
+                }
+
+                // Check if before file's first key
+                if self.data_block_index.first_key_value().is_some()
+                    && self.is_at_first_key_of_block()
+                {
+                    return Ok(SeekResult::BeforeFileFirstKey);
+                }
+
+                Err(HFileError::BackwardSeekNotSupported)
+            }
+            std::cmp::Ordering::Less => {
+                // Current key < lookup key: forward seek
+                self.forward_seek(lookup_key)
+            }
+        }
+    }
+
+    /// Forward seek to find the lookup key.
+    fn forward_seek(&mut self, lookup_key: &Utf8Key) -> Result<SeekResult> {
+        // Check if we need to jump to a different block
+        if let Some(entry) = &self.current_block_entry {
+            if let Some(next_key) = &entry.next_block_first_key {
+                if compare_keys(next_key, lookup_key) != std::cmp::Ordering::Greater {
+                    // Need to find the right block
+                    self.find_block_for_key(lookup_key)?;
+                }
+            } else {
+                // Last block - check against last key
+                if let Some(last_key) = &self.last_key {
+                    if compare_keys(last_key, lookup_key) == std::cmp::Ordering::Less {
+                        self.cursor.eof = true;
+                        self.current_block = None;
+                        self.current_block_entry = None;
+                        return Ok(SeekResult::Eof);
+                    }
+                }
+            }
+        }
+
+        // Scan within the current block
+        self.scan_block_for_key(lookup_key)
+    }
+
+    /// Find the block that may contain the lookup key.
+    fn find_block_for_key(&mut self, lookup_key: &Utf8Key) -> Result<()> {
+        // Binary search using BTreeMap's range
+        let lookup_bytes = lookup_key.as_bytes();
+        let fake_key = Key::from_bytes(lookup_bytes.to_vec());
+
+        // Find the entry with greatest key <= lookup_key
+        let entry = self
+            .data_block_index
+            .range(..=fake_key)
+            .next_back()
+            .map(|(_, e)| e.clone());
+
+        if let Some(entry) = entry {
+            self.current_block_entry = Some(entry.clone());
+            self.load_data_block(&entry)?;
+            self.cursor.offset = entry.offset as usize + BLOCK_HEADER_SIZE;
+            self.cursor.cached_kv = None;
+        }
+
+        Ok(())
+    }
+
+    /// Scan within the current block to find the key.
+    /// Uses iteration instead of recursion to avoid stack overflow with many blocks.
+    fn scan_block_for_key(&mut self, lookup_key: &Utf8Key) -> Result<SeekResult> {
+        loop {
+            let block = match &self.current_block {
+                Some(b) => b,
+                None => return Ok(SeekResult::Eof),
+            };
+
+            let block_start = self.current_block_entry.as_ref().unwrap().offset as usize;
+            let mut offset = self.cursor.offset - block_start - BLOCK_HEADER_SIZE;
+            let mut last_offset = offset;
+            let mut last_kv = self.cursor.cached_kv.clone();
+
+            while block.is_valid_offset(offset) {
+                let kv = block.read_key_value(offset);
+                let cmp = compare_keys(kv.key(), lookup_key);
+
+                match cmp {
+                    std::cmp::Ordering::Equal => {
+                        self.cursor.offset = block_start + BLOCK_HEADER_SIZE + offset;
+                        self.cursor.cached_kv = Some(kv);
+                        return Ok(SeekResult::Found);
+                    }
+                    std::cmp::Ordering::Greater => {
+                        // Key at offset > lookup key
+                        // Set cursor to previous position
+                        if let Some(prev_kv) = last_kv {
+                            self.cursor.offset = block_start + BLOCK_HEADER_SIZE + last_offset;
+                            self.cursor.cached_kv = Some(prev_kv);
+                        }
+                        if self.is_at_first_key_of_block() {
+                            return Ok(SeekResult::BeforeBlockFirstKey);
+                        }
+                        return Ok(SeekResult::InRange);
+                    }
+                    std::cmp::Ordering::Less => {
+                        last_offset = offset;
+                        last_kv = Some(kv.clone());
+                        offset += kv.record_size();
+                    }
+                }
+            }
+
+            // Reached end of block - need to check if there are more blocks
+            let current_entry = self.current_block_entry.clone().unwrap();
+            let next_entry = self.get_next_block_entry(&current_entry);
+
+            match next_entry {
+                Some(entry) => {
+                    // Move to next block and continue scanning (iterate instead of recurse)
+                    self.current_block_entry = Some(entry.clone());
+                    self.load_data_block(&entry)?;
+                    self.cursor.offset = entry.offset as usize + BLOCK_HEADER_SIZE;
+                    self.cursor.cached_kv = None;
+                    // Continue the loop to scan the next block
+                }
+                None => {
+                    // No more blocks - this is the last block
+                    // Check if lookup key is past the last key in the file
+                    if let Some(kv) = last_kv {
+                        if compare_keys(kv.key(), lookup_key) == std::cmp::Ordering::Less {
+                            // We're past the last key in the file
+                            self.cursor.eof = true;
+                            self.cursor.cached_kv = None;
+                            return Ok(SeekResult::Eof);
+                        }
+                        // Otherwise, stay at the last key
+                        self.cursor.offset = block_start + BLOCK_HEADER_SIZE + last_offset;
+                        self.cursor.cached_kv = Some(kv);
+                    }
+                    return Ok(SeekResult::InRange);
+                }
+            }
+        }
+    }
+
+    /// Load a data block.
+    fn load_data_block(&mut self, entry: &BlockIndexEntry) -> Result<()> {
+        let block = self.read_block_at(entry.offset as usize, entry.size as usize)?;
+        if block.block_type() != HFileBlockType::Data {
+            return Err(HFileError::UnexpectedBlockType {
+                expected: HFileBlockType::Data.to_string(),
+                actual: block.block_type().to_string(),
+            });
+        }
+        self.current_block = Some(DataBlock::from_block(block));
+        Ok(())
+    }
+
+    /// Move to the next key-value pair.
+    #[allow(clippy::should_implement_trait)]
+    pub fn next(&mut self) -> Result<bool> {
+        if !self.cursor.seeked || self.cursor.eof {
+            return Ok(false);
+        }
+
+        let block = match &self.current_block {
+            Some(b) => b,
+            None => return Ok(false),
+        };
+
+        let block_start = self.current_block_entry.as_ref().unwrap().offset as usize;
+        let current_offset = self.cursor.offset - block_start - BLOCK_HEADER_SIZE;
+
+        // Get current key-value to calculate next offset
+        let kv = if let Some(cached) = &self.cursor.cached_kv {
+            cached.clone()
+        } else {
+            block.read_key_value(current_offset)
+        };
+
+        let next_offset = current_offset + kv.record_size();
+
+        if block.is_valid_offset(next_offset) {
+            self.cursor.offset = block_start + BLOCK_HEADER_SIZE + next_offset;
+            self.cursor.cached_kv = None;
+            return Ok(true);
+        }
+
+        // Move to next block
+        let current_entry = self.current_block_entry.clone().unwrap();
+        let next_entry = self.get_next_block_entry(&current_entry);
+
+        match next_entry {
+            Some(entry) => {
+                self.current_block_entry = Some(entry.clone());
+                self.load_data_block(&entry)?;
+                self.cursor.offset = entry.offset as usize + BLOCK_HEADER_SIZE;
+                self.cursor.cached_kv = None;
+                Ok(true)
+            }
+            None => {
+                self.cursor.eof = true;
+                Ok(false)
+            }
+        }
+    }
+
+    /// Get the next block index entry.
+    fn get_next_block_entry(&self, current: &BlockIndexEntry) -> Option<BlockIndexEntry> {
+        self.data_block_index
+            .range((
+                std::ops::Bound::Excluded(&current.first_key),
+                std::ops::Bound::Unbounded,
+            ))
+            .next()
+            .map(|(_, e)| e.clone())
+    }
+
+    /// Get the current key-value pair.
+    pub fn get_key_value(&mut self) -> Result<Option<KeyValue>> {
+        if !self.cursor.seeked || self.cursor.eof {
+            return Ok(None);
+        }
+
+        if let Some(cached) = &self.cursor.cached_kv {
+            return Ok(Some(cached.clone()));
+        }
+
+        let block = match &self.current_block {
+            Some(b) => b,
+            None => {
+                // Need to load the block
+                let entry = self.current_block_entry.clone().unwrap();
+                self.load_data_block(&entry)?;
+                self.current_block.as_ref().unwrap()
+            }
+        };
+
+        let block_start = self.current_block_entry.as_ref().unwrap().offset as usize;
+        let offset = self.cursor.offset - block_start - BLOCK_HEADER_SIZE;
+
+        let kv = block.read_key_value(offset);
+        self.cursor.cached_kv = Some(kv.clone());
+
+        Ok(Some(kv))
+    }
+
+    /// Check if cursor is at the first key of the current block.
+    fn is_at_first_key_of_block(&self) -> bool {
+        if let Some(entry) = &self.current_block_entry {
+            return self.cursor.offset == entry.offset as usize + BLOCK_HEADER_SIZE;
+        }
+        false
+    }
+
+    /// Check if the reader has been seeked.
+    pub fn is_seeked(&self) -> bool {
+        self.cursor.seeked
+    }
+
+    /// Iterate over all key-value pairs.
+    pub fn iter(&mut self) -> Result<HFileIterator<'_>> {
+        self.seek_to_first()?;
+        Ok(HFileIterator { reader: self })
+    }
+
+    // ================== HFileRecord API for MDT ==================
+
+    /// Convert a KeyValue to an owned HFileRecord.
+    ///
+    /// This extracts the key content (without length prefix) and value bytes
+    /// into an owned struct suitable for MDT operations.
+    fn key_value_to_record(kv: &KeyValue) -> HFileRecord {
+        HFileRecord::new(kv.key().content().to_vec(), kv.value().to_vec())
+    }
+
+    /// Collect all records from the HFile as owned HFileRecords.
+    ///
+    /// This is useful for MDT operations where records need to be stored
+    /// and merged with log file records.
+    ///
+    /// # Example
+    /// ```ignore
+    /// let records = reader.collect_records()?;
+    /// for record in records {
+    ///     println!("Key: {}", record.key_as_str().unwrap_or("<binary>"));
+    /// }
+    /// ```
+    pub fn collect_records(&mut self) -> Result<Vec<HFileRecord>> {
+        let mut records = Vec::with_capacity(self.trailer.entry_count as usize);
+        for result in self.iter()? {
+            let kv = result?;
+            records.push(Self::key_value_to_record(&kv));
+        }
+        Ok(records)
+    }
+
+    /// Iterate over all records as owned HFileRecords.
+    ///
+    /// Unlike `iter()` which returns references into file bytes,
+    /// this iterator yields owned `HFileRecord` instances.
+    pub fn record_iter(&mut self) -> Result<HFileRecordIterator<'_>> {
+        self.seek_to_first()?;
+        Ok(HFileRecordIterator { reader: self })
+    }
+
+    /// Get the current position's record as an owned HFileRecord.
+    ///
+    /// Returns None if not seeked or at EOF.
+    pub fn get_record(&mut self) -> Result<Option<HFileRecord>> {
+        match self.get_key_value()? {
+            Some(kv) => Ok(Some(Self::key_value_to_record(&kv))),
+            None => Ok(None),
+        }
+    }
+
+    /// Lookup records by keys and return as HFileRecords.
+    ///
+    /// Keys must be sorted in ascending order. This method efficiently
+    /// scans forward through the file to find matching keys.
+    ///
+    /// Returns a vector of (key, Option<HFileRecord>) tuples where
+    /// the Option is Some if the key was found.
+    pub fn lookup_records(&mut self, keys: &[&str]) -> Result<Vec<(String, Option<HFileRecord>)>> {
+        let mut results = Vec::with_capacity(keys.len());
+
+        if keys.is_empty() {
+            return Ok(results);
+        }
+
+        self.seek_to_first()?;
+        if self.cursor.eof {
+            // Empty file - return all as not found
+            for key in keys {
+                results.push((key.to_string(), None));
+            }
+            return Ok(results);
+        }
+
+        for key in keys {
+            let lookup = Utf8Key::new(*key);
+            match self.seek_to(&lookup)? {
+                SeekResult::Found => {
+                    let record = self.get_record()?;
+                    results.push((key.to_string(), record));
+                }
+                _ => {
+                    results.push((key.to_string(), None));
+                }
+            }
+        }
+
+        Ok(results)
+    }
+
+    /// Collect records matching a key prefix.
+    ///
+    /// Returns all records where the key starts with the given prefix.
+    pub fn collect_records_by_prefix(&mut self, prefix: &str) -> Result<Vec<HFileRecord>> {
+        let mut records = Vec::new();
+        let prefix_bytes = prefix.as_bytes();
+
+        // Seek to the prefix (or first key >= prefix)
+        let lookup = Utf8Key::new(prefix);
+        self.seek_to_first()?;
+
+        if self.cursor.eof {
+            return Ok(records);
+        }
+
+        // Find starting position
+        let start_result = self.seek_to(&lookup)?;
+        match start_result {
+            SeekResult::Eof => return Ok(records),
+            SeekResult::Found | SeekResult::InRange | SeekResult::BeforeBlockFirstKey => {
+                // We may be at or past a matching key
+            }
+            SeekResult::BeforeFileFirstKey => {
+                // Key is before first key, move to first
+                self.seek_to_first()?;
+            }
+        }
+
+        // Scan and collect records with matching prefix
+        loop {
+            if self.cursor.eof {
+                break;
+            }
+
+            match self.get_key_value()? {
+                Some(kv) => {
+                    let key_content = kv.key().content();
+                    if key_content.starts_with(prefix_bytes) {
+                        records.push(Self::key_value_to_record(&kv));
+                    } else if key_content > prefix_bytes {
+                        // Past the prefix range
+                        break;
+                    }
+                }
+                None => break,
+            }
+
+            if !self.next()? {
+                break;
+            }
+        }
+
+        Ok(records)
+    }
+}
+
+/// Iterator over all records as owned HFileRecords.
+pub struct HFileRecordIterator<'a> {
+    reader: &'a mut HFileReader,
+}
+
+impl<'a> Iterator for HFileRecordIterator<'a> {
+    type Item = Result<HFileRecord>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        if self.reader.cursor.eof {
+            return None;
+        }
+
+        match self.reader.get_key_value() {
+            Ok(Some(kv)) => {
+                let record = HFileReader::key_value_to_record(&kv);
+                match self.reader.next() {
+                    Ok(_) => {}
+                    Err(e) => return Some(Err(e)),
+                }
+                Some(Ok(record))
+            }
+            Ok(None) => None,
+            Err(e) => Some(Err(e)),
+        }
+    }
+}
+
+/// Iterator over all key-value pairs in an HFile.
+pub struct HFileIterator<'a> {
+    reader: &'a mut HFileReader,
+}
+
+impl<'a> Iterator for HFileIterator<'a> {
+    type Item = Result<KeyValue>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        if self.reader.cursor.eof {
+            return None;
+        }
+
+        match self.reader.get_key_value() {
+            Ok(Some(kv)) => {
+                match self.reader.next() {
+                    Ok(_) => {}
+                    Err(e) => return Some(Err(e)),
+                }
+                Some(Ok(kv))
+            }
+            Ok(None) => None,
+            Err(e) => Some(Err(e)),
+        }
+    }
+}
+
+/// Read a varint from bytes. Returns (value, bytes_consumed).
+fn read_varint(bytes: &[u8]) -> (u64, usize) {
+    let mut result: u64 = 0;
+    let mut shift = 0;
+    let mut pos = 0;
+
+    while pos < bytes.len() {
+        let b = bytes[pos] as u64;
+        pos += 1;
+        result |= (b & 0x7F) << shift;
+        if b & 0x80 == 0 {
+            break;
+        }
+        shift += 7;
+    }
+
+    (result, pos)
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use std::path::PathBuf;
+
+    fn test_data_dir() -> PathBuf {
+        PathBuf::from(env!("CARGO_MANIFEST_DIR"))
+            .join("tests")
+            .join("data")
+            .join("hfile")
+    }
+
+    fn read_test_hfile(filename: &str) -> Vec<u8> {
+        let path = test_data_dir().join(filename);
+        std::fs::read(&path).unwrap_or_else(|_| panic!("Failed to read test file: {:?}", path))
+    }
+
+    #[test]
+    fn test_read_uncompressed_hfile() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+        let reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        // Check entry count
+        assert_eq!(reader.num_entries(), 5000);
+    }
+
+    #[test]
+    fn test_read_gzip_hfile() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_GZ_20000.hfile");
+        let reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        // Check entry count
+        assert_eq!(reader.num_entries(), 20000);
+    }
+
+    #[test]
+    fn test_read_empty_hfile() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_no_entry.hfile");
+        let reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        // Check entry count
+        assert_eq!(reader.num_entries(), 0);
+    }
+
+    #[test]
+    fn test_seek_to_first_uncompressed() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        // Seek to first
+        assert!(reader.seek_to_first().expect("Failed to seek"));
+
+        // Get first key-value
+        let kv = reader.get_key_value().expect("Failed to get kv").unwrap();
+        assert_eq!(kv.key().content_as_str().unwrap(), "hudi-key-000000000");
+
+        let value = std::str::from_utf8(kv.value()).unwrap();
+        assert_eq!(value, "hudi-value-000000000");
+    }
+
+    #[test]
+    fn test_sequential_read_uncompressed() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        // Seek to first
+        assert!(reader.seek_to_first().expect("Failed to seek"));
+
+        // Read first 10 entries
+        for i in 0..10 {
+            let kv = reader.get_key_value().expect("Failed to get kv").unwrap();
+            let expected_key = format!("hudi-key-{:09}", i);
+            let expected_value = format!("hudi-value-{:09}", i);
+
+            assert_eq!(kv.key().content_as_str().unwrap(), expected_key);
+            assert_eq!(std::str::from_utf8(kv.value()).unwrap(), expected_value);
+
+            if i < 9 {
+                assert!(reader.next().expect("Failed to move next"));
+            }
+        }
+    }
+
+    #[test]
+    fn test_seek_to_key_exact() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        // Seek to first
+        assert!(reader.seek_to_first().expect("Failed to seek"));
+
+        // Seek to specific key
+        let lookup = Utf8Key::new("hudi-key-000000100");
+        let result = reader.seek_to(&lookup).expect("Failed to seek");
+        assert_eq!(result, SeekResult::Found);
+
+        // Verify key
+        let kv = reader.get_key_value().expect("Failed to get kv").unwrap();
+        assert_eq!(kv.key().content_as_str().unwrap(), "hudi-key-000000100");
+    }
+
+    #[test]
+    fn test_seek_to_key_eof() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        // Seek to first
+        assert!(reader.seek_to_first().expect("Failed to seek"));
+
+        // Seek past last key
+        let lookup = Utf8Key::new("hudi-key-999999999");
+        let result = reader.seek_to(&lookup).expect("Failed to seek");
+        assert_eq!(result, SeekResult::Eof);
+    }
+
+    #[test]
+    fn test_seek_to_key_before_first() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        // Seek to first
+        assert!(reader.seek_to_first().expect("Failed to seek"));
+
+        // Seek before first key
+        let lookup = Utf8Key::new("aaa");
+        let result = reader.seek_to(&lookup).expect("Failed to seek");
+        assert_eq!(result, SeekResult::BeforeFileFirstKey);
+    }
+
+    #[test]
+    fn test_iterate_all_entries() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        let mut count = 0;
+        for result in reader.iter().expect("Failed to create iterator") {
+            let kv = result.expect("Failed to read kv");
+            let expected_key = format!("hudi-key-{:09}", count);
+            assert_eq!(kv.key().content_as_str().unwrap(), expected_key);
+            count += 1;
+        }
+
+        assert_eq!(count, 5000);
+    }
+
+    #[test]
+    fn test_empty_hfile_seek() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_no_entry.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        // Seek to first should return false
+        assert!(!reader.seek_to_first().expect("Failed to seek"));
+
+        // Get key-value should return None
+        assert!(reader.get_key_value().expect("Failed to get kv").is_none());
+    }
+
+    #[test]
+    fn test_gzip_sequential_read() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_GZ_20000.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        // Seek to first
+        assert!(reader.seek_to_first().expect("Failed to seek"));
+
+        // Read first 10 entries
+        for i in 0..10 {
+            let kv = reader.get_key_value().expect("Failed to get kv").unwrap();
+            let expected_key = format!("hudi-key-{:09}", i);
+            let expected_value = format!("hudi-value-{:09}", i);
+
+            assert_eq!(kv.key().content_as_str().unwrap(), expected_key);
+            assert_eq!(std::str::from_utf8(kv.value()).unwrap(), expected_value);
+
+            if i < 9 {
+                assert!(reader.next().expect("Failed to move next"));
+            }
+        }
+    }
+
+    // ================== HFileRecord Tests ==================
+
+    #[test]
+    fn test_collect_records() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        let records = reader.collect_records().expect("Failed to collect records");
+        assert_eq!(records.len(), 5000);
+
+        // Verify first and last records
+        assert_eq!(records[0].key_as_str(), Some("hudi-key-000000000"));
+        assert_eq!(
+            std::str::from_utf8(records[0].value()).unwrap(),
+            "hudi-value-000000000"
+        );
+
+        assert_eq!(records[4999].key_as_str(), Some("hudi-key-000004999"));
+        assert_eq!(
+            std::str::from_utf8(records[4999].value()).unwrap(),
+            "hudi-value-000004999"
+        );
+    }
+
+    #[test]
+    fn test_record_iterator() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        let mut count = 0;
+        for result in reader.record_iter().expect("Failed to create iterator") {
+            let record = result.expect("Failed to read record");
+            let expected_key = format!("hudi-key-{:09}", count);
+            assert_eq!(record.key_as_str(), Some(expected_key.as_str()));
+            count += 1;
+        }
+
+        assert_eq!(count, 5000);
+    }
+
+    #[test]
+    fn test_get_record() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        // Before seeking, should return None
+        assert!(reader.get_record().expect("Failed to get record").is_none());
+
+        // After seeking, should return a record
+        reader.seek_to_first().expect("Failed to seek");
+        let record = reader.get_record().expect("Failed to get record").unwrap();
+        assert_eq!(record.key_as_str(), Some("hudi-key-000000000"));
+    }
+
+    #[test]
+    fn test_lookup_records() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        let keys = vec![
+            "hudi-key-000000000",
+            "hudi-key-000000100",
+            "hudi-key-nonexistent",
+        ];
+        let results = reader.lookup_records(&keys).expect("Failed to lookup");
+
+        assert_eq!(results.len(), 3);
+
+        // First key should be found
+        assert_eq!(results[0].0, "hudi-key-000000000");
+        assert!(results[0].1.is_some());
+        assert_eq!(
+            results[0].1.as_ref().unwrap().key_as_str(),
+            Some("hudi-key-000000000")
+        );
+
+        // Second key should be found
+        assert_eq!(results[1].0, "hudi-key-000000100");
+        assert!(results[1].1.is_some());
+        assert_eq!(
+            results[1].1.as_ref().unwrap().key_as_str(),
+            Some("hudi-key-000000100")
+        );
+
+        // Third key should not be found
+        assert_eq!(results[2].0, "hudi-key-nonexistent");
+        assert!(results[2].1.is_none());
+    }
+
+    #[test]
+    fn test_collect_records_by_prefix() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        // Collect records with prefix "hudi-key-00000010" (should match 100-109)
+        // Keys are 9-digit padded, so "000000100" to "000000109" match this prefix
+        let records = reader
+            .collect_records_by_prefix("hudi-key-00000010")
+            .expect("Failed to collect by prefix");
+
+        assert_eq!(records.len(), 10);
+        for (i, record) in records.iter().enumerate() {
+            let expected = format!("hudi-key-{:09}", 100 + i);
+            assert_eq!(record.key_as_str(), Some(expected.as_str()));
+        }
+    }
+
+    #[test]
+    fn test_collect_records_empty_file() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_no_entry.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        let records = reader.collect_records().expect("Failed to collect records");
+        assert!(records.is_empty());
+    }
+
+    #[test]
+    fn test_hfile_record_ownership() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        // Collect some records
+        reader.seek_to_first().expect("Failed to seek");
+        let record1 = reader.get_record().expect("Failed to get record").unwrap();
+        reader.next().expect("Failed to move next");
+        let record2 = reader.get_record().expect("Failed to get record").unwrap();
+
+        // Records should be independent (owned data)
+        assert_ne!(record1.key(), record2.key());
+        assert_eq!(record1.key_as_str(), Some("hudi-key-000000000"));
+        assert_eq!(record2.key_as_str(), Some("hudi-key-000000001"));
+
+        // Can use records after reader has moved
+        drop(reader);
+        assert_eq!(record1.key_as_str(), Some("hudi-key-000000000"));
+    }
+
+    // ================== Additional Test Files ==================
+
+    // Priority 1: Different Block Sizes
+
+    #[test]
+    fn test_read_512kb_blocks_gzip() {
+        // 512KB block size, GZIP compression, 20000 entries
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_512KB_GZ_20000.hfile");
+        let reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        assert_eq!(reader.num_entries(), 20000);
+    }
+
+    #[test]
+    fn test_512kb_blocks_sequential_read() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_512KB_GZ_20000.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        assert!(reader.seek_to_first().expect("Failed to seek"));
+
+        // Read first 10 entries
+        for i in 0..10 {
+            let kv = reader.get_key_value().expect("Failed to get kv").unwrap();
+            let expected_key = format!("hudi-key-{:09}", i);
+            let expected_value = format!("hudi-value-{:09}", i);
+
+            assert_eq!(kv.key().content_as_str().unwrap(), expected_key);
+            assert_eq!(std::str::from_utf8(kv.value()).unwrap(), expected_value);
+
+            if i < 9 {
+                assert!(reader.next().expect("Failed to move next"));
+            }
+        }
+    }
+
+    #[test]
+    fn test_512kb_blocks_seek() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_512KB_GZ_20000.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        assert!(reader.seek_to_first().expect("Failed to seek"));
+
+        // Seek to key in second block (block 0 ends at ~8886)
+        let lookup = Utf8Key::new("hudi-key-000008888");
+        let result = reader.seek_to(&lookup).expect("Failed to seek");
+        assert_eq!(result, SeekResult::Found);
+
+        let kv = reader.get_key_value().expect("Failed to get kv").unwrap();
+        assert_eq!(kv.key().content_as_str().unwrap(), "hudi-key-000008888");
+    }
+
+    #[test]
+    fn test_read_64kb_blocks_uncompressed() {
+        // 64KB block size, no compression, 5000 entries
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_64KB_NONE_5000.hfile");
+        let reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        assert_eq!(reader.num_entries(), 5000);
+    }
+
+    #[test]
+    fn test_64kb_blocks_sequential_read() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_64KB_NONE_5000.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        assert!(reader.seek_to_first().expect("Failed to seek"));
+
+        // Read first 10 entries
+        for i in 0..10 {
+            let kv = reader.get_key_value().expect("Failed to get kv").unwrap();
+            let expected_key = format!("hudi-key-{:09}", i);
+            let expected_value = format!("hudi-value-{:09}", i);
+
+            assert_eq!(kv.key().content_as_str().unwrap(), expected_key);
+            assert_eq!(std::str::from_utf8(kv.value()).unwrap(), expected_value);
+
+            if i < 9 {
+                assert!(reader.next().expect("Failed to move next"));
+            }
+        }
+    }
+
+    #[test]
+    fn test_64kb_blocks_seek() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_64KB_NONE_5000.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        assert!(reader.seek_to_first().expect("Failed to seek"));
+
+        // Seek to key in second block (block 0 ends at ~1110)
+        let lookup = Utf8Key::new("hudi-key-000001688");
+        let result = reader.seek_to(&lookup).expect("Failed to seek");
+        assert_eq!(result, SeekResult::Found);
+
+        let kv = reader.get_key_value().expect("Failed to get kv").unwrap();
+        assert_eq!(kv.key().content_as_str().unwrap(), "hudi-key-000001688");
+    }
+
+    // Priority 2: Edge Cases
+
+    #[test]
+    fn test_read_non_unique_keys() {
+        // 200 unique keys, each with 21 values (1 primary + 20 duplicates)
+        // Total: 200 * 21 = 4200 entries
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_GZ_200_20_non_unique.hfile");
+        let reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        assert_eq!(reader.num_entries(), 4200);
+    }
+
+    #[test]
+    fn test_non_unique_keys_sequential_read() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_GZ_200_20_non_unique.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        assert!(reader.seek_to_first().expect("Failed to seek"));
+
+        // First entry for key 0
+        let kv = reader.get_key_value().expect("Failed to get kv").unwrap();
+        assert_eq!(kv.key().content_as_str().unwrap(), "hudi-key-000000000");
+        assert_eq!(
+            std::str::from_utf8(kv.value()).unwrap(),
+            "hudi-value-000000000"
+        );
+
+        // Next 20 entries should be duplicates with _0 to _19 suffix
+        for j in 0..20 {
+            assert!(reader.next().expect("Failed to move next"));
+            let kv = reader.get_key_value().expect("Failed to get kv").unwrap();
+            assert_eq!(kv.key().content_as_str().unwrap(), "hudi-key-000000000");
+            let expected_value = format!("hudi-value-000000000_{}", j);
+            assert_eq!(std::str::from_utf8(kv.value()).unwrap(), expected_value);
+        }
+
+        // Next entry should be key 1
+        assert!(reader.next().expect("Failed to move next"));
+        let kv = reader.get_key_value().expect("Failed to get kv").unwrap();
+        assert_eq!(kv.key().content_as_str().unwrap(), "hudi-key-000000001");
+    }
+
+    #[test]
+    fn test_non_unique_keys_seek() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_GZ_200_20_non_unique.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        assert!(reader.seek_to_first().expect("Failed to seek"));
+
+        // Seek to a key - should find the first occurrence
+        let lookup = Utf8Key::new("hudi-key-000000005");
+        let result = reader.seek_to(&lookup).expect("Failed to seek");
+        assert_eq!(result, SeekResult::Found);
+
+        let kv = reader.get_key_value().expect("Failed to get kv").unwrap();
+        assert_eq!(kv.key().content_as_str().unwrap(), "hudi-key-000000005");
+        // First occurrence has the base value
+        assert_eq!(
+            std::str::from_utf8(kv.value()).unwrap(),
+            "hudi-value-000000005"
+        );
+    }
+
+    #[test]
+    fn test_read_fake_first_key() {
+        // File with fake first keys in meta index block
+        // Keys have suffix "-abcdefghij"
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_13_16KB_GZ_20000_fake_first_key.hfile");
+        let reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        assert_eq!(reader.num_entries(), 20000);
+    }
+
+    #[test]
+    fn test_fake_first_key_sequential_read() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_13_16KB_GZ_20000_fake_first_key.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        assert!(reader.seek_to_first().expect("Failed to seek"));
+
+        // Read first 10 entries - keys have "-abcdefghij" suffix
+        for i in 0..10 {
+            let kv = reader.get_key_value().expect("Failed to get kv").unwrap();
+            let expected_key = format!("hudi-key-{:09}-abcdefghij", i);
+            let expected_value = format!("hudi-value-{:09}", i);
+
+            assert_eq!(kv.key().content_as_str().unwrap(), expected_key);
+            assert_eq!(std::str::from_utf8(kv.value()).unwrap(), expected_value);
+
+            if i < 9 {
+                assert!(reader.next().expect("Failed to move next"));
+            }
+        }
+    }
+
+    #[test]
+    fn test_fake_first_key_seek_exact() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_13_16KB_GZ_20000_fake_first_key.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        assert!(reader.seek_to_first().expect("Failed to seek"));
+
+        // Seek to exact key with suffix
+        let lookup = Utf8Key::new("hudi-key-000000099-abcdefghij");
+        let result = reader.seek_to(&lookup).expect("Failed to seek");
+        assert_eq!(result, SeekResult::Found);
+
+        let kv = reader.get_key_value().expect("Failed to get kv").unwrap();
+        assert_eq!(
+            kv.key().content_as_str().unwrap(),
+            "hudi-key-000000099-abcdefghij"
+        );
+    }
+
+    #[test]
+    fn test_fake_first_key_seek_before_block_first() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_13_16KB_GZ_20000_fake_first_key.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        assert!(reader.seek_to_first().expect("Failed to seek"));
+
+        // First, move to a known position
+        let lookup = Utf8Key::new("hudi-key-000000469-abcdefghij");
+        let result = reader.seek_to(&lookup).expect("Failed to seek");
+        assert_eq!(result, SeekResult::Found);
+
+        // Now seek to a key that falls between fake first key and actual first key of next block
+        // Block 2 has fake first key "hudi-key-00000047" but actual first key "hudi-key-000000470-abcdefghij"
+        let lookup = Utf8Key::new("hudi-key-000000470");
+        let result = reader.seek_to(&lookup).expect("Failed to seek");
+        // This should return BeforeBlockFirstKey since the lookup key is >= fake first key
+        // but < actual first key
+        assert_eq!(result, SeekResult::BeforeBlockFirstKey);
+    }
+
+    // Priority 3: Multi-level Index
+
+    #[test]
+    fn test_read_large_keys_2level_index() {
+        // Large keys (>100 bytes), 2-level data block index
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_13_1KB_GZ_20000_large_keys.hfile");
+        let reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        assert_eq!(reader.num_entries(), 20000);
+    }
+
+    #[test]
+    fn test_large_keys_2level_sequential_read() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_13_1KB_GZ_20000_large_keys.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        let large_key_prefix = "hudi-key-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
+            aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-";
+
+        assert!(reader.seek_to_first().expect("Failed to seek"));
+
+        // Read first 5 entries
+        for i in 0..5 {
+            let kv = reader.get_key_value().expect("Failed to get kv").unwrap();
+            let expected_key = format!("{}{:09}", large_key_prefix, i);
+            let expected_value = format!("hudi-value-{:09}", i);
+
+            assert_eq!(kv.key().content_as_str().unwrap(), expected_key);
+            assert_eq!(std::str::from_utf8(kv.value()).unwrap(), expected_value);
+
+            if i < 4 {
+                assert!(reader.next().expect("Failed to move next"));
+            }
+        }
+    }
+
+    #[test]
+    fn test_large_keys_2level_seek() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_13_1KB_GZ_20000_large_keys.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        let large_key_prefix = "hudi-key-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
+            aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-";
+
+        assert!(reader.seek_to_first().expect("Failed to seek"));
+
+        // Seek to a key deep in the file
+        let lookup_key = format!("{}000005340", large_key_prefix);
+        let lookup = Utf8Key::new(&lookup_key);
+        let result = reader.seek_to(&lookup).expect("Failed to seek");
+        assert_eq!(result, SeekResult::Found);
+
+        let kv = reader.get_key_value().expect("Failed to get kv").unwrap();
+        assert_eq!(kv.key().content_as_str().unwrap(), lookup_key);
+    }
+
+    #[test]
+    fn test_large_keys_2level_iterate_all() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_13_1KB_GZ_20000_large_keys.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        let mut count = 0;
+        for result in reader.iter().expect("Failed to create iterator") {
+            let _ = result.expect("Failed to read kv");
+            count += 1;
+        }
+
+        assert_eq!(count, 20000);
+    }
+
+    #[test]
+    fn test_read_large_keys_3level_index() {
+        // Large keys, 3-level deep data block index
+        let bytes =
+            read_test_hfile("hudi_1_0_hbase_2_4_13_1KB_GZ_10000_large_keys_deep_index.hfile");
+        let reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        assert_eq!(reader.num_entries(), 10000);
+    }
+
+    #[test]
+    fn test_large_keys_3level_sequential_read() {
+        let bytes =
+            read_test_hfile("hudi_1_0_hbase_2_4_13_1KB_GZ_10000_large_keys_deep_index.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        let large_key_prefix = "hudi-key-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
+            aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-";
+
+        assert!(reader.seek_to_first().expect("Failed to seek"));
+
+        // Read first 5 entries
+        for i in 0..5 {
+            let kv = reader.get_key_value().expect("Failed to get kv").unwrap();
+            let expected_key = format!("{}{:09}", large_key_prefix, i);
+            let expected_value = format!("hudi-value-{:09}", i);
+
+            assert_eq!(kv.key().content_as_str().unwrap(), expected_key);
+            assert_eq!(std::str::from_utf8(kv.value()).unwrap(), expected_value);
+
+            if i < 4 {
+                assert!(reader.next().expect("Failed to move next"));
+            }
+        }
+    }
+
+    #[test]
+    fn test_large_keys_3level_seek() {
+        let bytes =
+            read_test_hfile("hudi_1_0_hbase_2_4_13_1KB_GZ_10000_large_keys_deep_index.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        let large_key_prefix = "hudi-key-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
+            aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-";
+
+        assert!(reader.seek_to_first().expect("Failed to seek"));
+
+        // Seek to a key deep in the file
+        let lookup_key = format!("{}000005340", large_key_prefix);
+        let lookup = Utf8Key::new(&lookup_key);
+        let result = reader.seek_to(&lookup).expect("Failed to seek");
+        assert_eq!(result, SeekResult::Found);
+
+        let kv = reader.get_key_value().expect("Failed to get kv").unwrap();
+        assert_eq!(kv.key().content_as_str().unwrap(), lookup_key);
+    }
+
+    #[test]
+    fn test_large_keys_3level_iterate_all() {
+        let bytes =
+            read_test_hfile("hudi_1_0_hbase_2_4_13_1KB_GZ_10000_large_keys_deep_index.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        let mut count = 0;
+        for result in reader.iter().expect("Failed to create iterator") {
+            let _ = result.expect("Failed to read kv");
+            count += 1;
+        }
+
+        assert_eq!(count, 10000);
+    }
+
+    #[test]
+    fn test_large_keys_3level_last_key() {
+        let bytes =
+            read_test_hfile("hudi_1_0_hbase_2_4_13_1KB_GZ_10000_large_keys_deep_index.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        let large_key_prefix = "hudi-key-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
+            aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-";
+
+        assert!(reader.seek_to_first().expect("Failed to seek"));
+
+        // Seek to last key
+        let lookup_key = format!("{}000009999", large_key_prefix);
+        let lookup = Utf8Key::new(&lookup_key);
+        let result = reader.seek_to(&lookup).expect("Failed to seek");
+        assert_eq!(result, SeekResult::Found);
+
+        let kv = reader.get_key_value().expect("Failed to get kv").unwrap();
+        assert_eq!(kv.key().content_as_str().unwrap(), lookup_key);
+        assert_eq!(
+            std::str::from_utf8(kv.value()).unwrap(),
+            "hudi-value-000009999"
+        );
+
+        // Next should return false (EOF)
+        assert!(!reader.next().expect("Failed to move next"));
+    }
+
+    #[test]
+    fn test_large_keys_3level_seek_eof() {
+        let bytes =
+            read_test_hfile("hudi_1_0_hbase_2_4_13_1KB_GZ_10000_large_keys_deep_index.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        let large_key_prefix = "hudi-key-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
+            aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-";
+
+        assert!(reader.seek_to_first().expect("Failed to seek"));
+
+        // Seek past last key
+        let lookup_key = format!("{}000009999a", large_key_prefix);
+        let lookup = Utf8Key::new(&lookup_key);
+        let result = reader.seek_to(&lookup).expect("Failed to seek");
+        assert_eq!(result, SeekResult::Eof);
+    }
+
+    // ================== Additional Coverage Tests ==================
+
+    #[test]
+    fn test_is_seeked() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        assert!(!reader.is_seeked());
+        reader.seek_to_first().expect("Failed to seek");
+        assert!(reader.is_seeked());
+    }
+
+    #[test]
+    fn test_get_key_value_not_seeked() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        // Not seeked yet
+        let result = reader.get_key_value().expect("Failed to get kv");
+        assert!(result.is_none());
+    }
+
+    #[test]
+    fn test_next_not_seeked() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        // next() without seek should return false
+        assert!(!reader.next().expect("Failed to next"));
+    }
+
+    #[test]
+    fn test_seek_before_first_key() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        reader.seek_to_first().expect("Failed to seek");
+
+        // Seek to a key before the first key in the file
+        let lookup = Utf8Key::new("aaa"); // Before "hudi-key-000000000"
+        let result = reader.seek_to(&lookup).expect("Failed to seek");
+        assert_eq!(result, SeekResult::BeforeFileFirstKey);
+    }
+
+    #[test]
+    fn test_seek_in_range() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        reader.seek_to_first().expect("Failed to seek");
+
+        // First move past the first key to avoid BeforeBlockFirstKey result
+        let lookup = Utf8Key::new("hudi-key-000000100");
+        reader.seek_to(&lookup).expect("Failed to seek");
+
+        // Now seek to a key that doesn't exist but is in range (between 100 and 101)
+        let lookup = Utf8Key::new("hudi-key-000000100a");
+        let result = reader.seek_to(&lookup).expect("Failed to seek");
+        assert_eq!(result, SeekResult::InRange);
+    }
+
+    #[test]
+    fn test_lookup_records_empty_keys() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        let results = reader.lookup_records(&[]).expect("Failed to lookup");
+        assert!(results.is_empty());
+    }
+
+    #[test]
+    fn test_lookup_records_not_found() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        let results = reader
+            .lookup_records(&["nonexistent-key"])
+            .expect("Failed to lookup");
+        assert_eq!(results.len(), 1);
+        assert_eq!(results[0].0, "nonexistent-key");
+        assert!(results[0].1.is_none());
+    }
+
+    #[test]
+    fn test_lookup_records_found() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        let results = reader
+            .lookup_records(&["hudi-key-000000000", "hudi-key-000000001"])
+            .expect("Failed to lookup");
+
+        assert_eq!(results.len(), 2);
+        assert_eq!(results[0].0, "hudi-key-000000000");
+        assert!(results[0].1.is_some());
+        assert_eq!(results[1].0, "hudi-key-000000001");
+        assert!(results[1].1.is_some());
+    }
+
+    #[test]
+    fn test_collect_records_by_prefix_no_matches() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        let records = reader
+            .collect_records_by_prefix("nonexistent-prefix-")
+            .expect("Failed to collect");
+        assert!(records.is_empty());
+    }
+
+    #[test]
+    fn test_collect_records_by_prefix_found() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        // All keys start with "hudi-key-00000000" for keys 0-9
+        let records = reader
+            .collect_records_by_prefix("hudi-key-00000000")
+            .expect("Failed to collect");
+        // Keys 0-9 match this prefix
+        assert_eq!(records.len(), 10);
+    }
+
+    #[test]
+    fn test_trailer_info() {
+        let bytes = read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+        let reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        assert_eq!(reader.num_entries(), 5000);
+        // Just verify we can access trailer info
+        assert!(reader.num_entries() > 0);
+    }
+
+    #[test]
+    fn test_seek_result_enum() {
+        // Test SeekResult values
+        assert_eq!(SeekResult::BeforeBlockFirstKey as i32, -2);
+        assert_eq!(SeekResult::BeforeFileFirstKey as i32, -1);
+        assert_eq!(SeekResult::Found as i32, 0);
+        assert_eq!(SeekResult::InRange as i32, 1);
+        assert_eq!(SeekResult::Eof as i32, 2);
+
+        // Test Debug implementation
+        let _ = format!("{:?}", SeekResult::Found);
+    }
+}
diff --git a/crates/core/src/hfile/record.rs b/crates/core/src/hfile/record.rs
new file mode 100644
index 0000000..4ee6364
--- /dev/null
+++ b/crates/core/src/hfile/record.rs
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! HFile record types for MDT (Metadata Table) operations.
+//!
+//! This module provides simple, owned record types for HFile key-value pairs.
+//! These are designed for use in MDT operations where:
+//! - Records need to be passed around and stored
+//! - Key-based lookups and merging are primary operations
+//! - Values are Avro-serialized payloads decoded on demand
+//!
+//! Unlike the `KeyValue` type which references into file bytes,
+//! `HFileRecord` owns its data and can be freely moved.
+
+use std::cmp::Ordering;
+
+/// An owned HFile record with key and value.
+///
+/// This is a simple struct designed for MDT operations. The key is the
+/// UTF-8 record key (content only, without HFile key structure), and
+/// the value is the raw bytes (typically Avro-serialized payload).
+///
+/// # Example
+/// ```ignore
+/// let record = HFileRecord::new("my-key".into(), value_bytes);
+/// println!("Key: {}", record.key_as_str());
+/// // Decode value on demand
+/// let payload = decode_avro(&record.value);
+/// ```
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct HFileRecord {
+    /// Record key (UTF-8 string content only, no length prefix)
+    pub key: Vec<u8>,
+    /// Record value (raw bytes, typically Avro-serialized)
+    pub value: Vec<u8>,
+}
+
+impl HFileRecord {
+    /// Create a new HFile record.
+    pub fn new(key: Vec<u8>, value: Vec<u8>) -> Self {
+        Self { key, value }
+    }
+
+    /// Create a record from string key and value bytes.
+    pub fn from_str_key(key: &str, value: Vec<u8>) -> Self {
+        Self {
+            key: key.as_bytes().to_vec(),
+            value,
+        }
+    }
+
+    /// Returns the key as a UTF-8 string.
+    ///
+    /// Returns `None` if the key is not valid UTF-8.
+    pub fn key_as_str(&self) -> Option<&str> {
+        std::str::from_utf8(&self.key).ok()
+    }
+
+    /// Returns the key as bytes.
+    pub fn key(&self) -> &[u8] {
+        &self.key
+    }
+
+    /// Returns the value as bytes.
+    pub fn value(&self) -> &[u8] {
+        &self.value
+    }
+
+    /// Returns whether this record represents a deletion.
+    ///
+    /// In MDT, a deleted record has an empty value.
+    pub fn is_deleted(&self) -> bool {
+        self.value.is_empty()
+    }
+}
+
+impl PartialOrd for HFileRecord {
+    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
+impl Ord for HFileRecord {
+    fn cmp(&self, other: &Self) -> Ordering {
+        self.key.cmp(&other.key)
+    }
+}
+
+impl std::fmt::Display for HFileRecord {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self.key_as_str() {
+            Some(key) => write!(
+                f,
+                "HFileRecord{{key={}, value_len={}}}",
+                key,
+                self.value.len()
+            ),
+            None => write!(
+                f,
+                "HFileRecord{{key=<binary {} bytes>, value_len={}}}",
+                self.key.len(),
+                self.value.len()
+            ),
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_hfile_record_creation() {
+        let record = HFileRecord::new(b"test-key".to_vec(), b"test-value".to_vec());
+        assert_eq!(record.key_as_str(), Some("test-key"));
+        assert_eq!(record.value(), b"test-value");
+        assert!(!record.is_deleted());
+    }
+
+    #[test]
+    fn test_hfile_record_from_str() {
+        let record = HFileRecord::from_str_key("my-key", b"my-value".to_vec());
+        assert_eq!(record.key_as_str(), Some("my-key"));
+        assert_eq!(record.value(), b"my-value");
+    }
+
+    #[test]
+    fn test_hfile_record_deleted() {
+        let record = HFileRecord::new(b"deleted-key".to_vec(), vec![]);
+        assert!(record.is_deleted());
+    }
+
+    #[test]
+    fn test_hfile_record_ordering() {
+        let r1 = HFileRecord::from_str_key("aaa", vec![1]);
+        let r2 = HFileRecord::from_str_key("bbb", vec![2]);
+        let r3 = HFileRecord::from_str_key("aaa", vec![3]);
+
+        assert!(r1 < r2);
+        assert_eq!(r1.cmp(&r3), Ordering::Equal); // Same key, ordering only by key
+    }
+
+    #[test]
+    fn test_hfile_record_display() {
+        let record = HFileRecord::from_str_key("test", b"value".to_vec());
+        let display = format!("{}", record);
+        assert!(display.contains("test"));
+        assert!(display.contains("value_len=5"));
+    }
+}
diff --git a/crates/core/src/hfile/trailer.rs b/crates/core/src/hfile/trailer.rs
new file mode 100644
index 0000000..1c8d786
--- /dev/null
+++ b/crates/core/src/hfile/trailer.rs
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! HFile trailer parsing.
+
+use crate::hfile::block_type::{HFileBlockType, MAGIC_LENGTH};
+use crate::hfile::compression::CompressionCodec;
+use crate::hfile::error::{HFileError, Result};
+use crate::hfile::proto::TrailerProto;
+use prost::Message;
+
+/// HFile trailer size (fixed at 4096 bytes for HFile v3)
+pub const TRAILER_SIZE: usize = 4096;
+
+/// HFile trailer containing file metadata.
+#[derive(Debug, Clone)]
+#[allow(dead_code)]
+pub struct HFileTrailer {
+    /// Major version (should be 3 for HFile v3)
+    pub major_version: u32,
+    /// Minor version
+    pub minor_version: u32,
+    /// Offset to file info block
+    pub file_info_offset: u64,
+    /// Offset to load-on-open section
+    pub load_on_open_data_offset: u64,
+    /// Total uncompressed size of data block index
+    pub uncompressed_data_index_size: u64,
+    /// Total uncompressed bytes in file
+    pub total_uncompressed_bytes: u64,
+    /// Number of entries in data block index
+    pub data_index_count: u32,
+    /// Number of entries in meta block index
+    pub meta_index_count: u32,
+    /// Number of key-value entries in file
+    pub entry_count: u64,
+    /// Number of levels in data block index
+    pub num_data_index_levels: u32,
+    /// Offset to first data block
+    pub first_data_block_offset: u64,
+    /// Offset past the last data block
+    pub last_data_block_offset: u64,
+    /// Comparator class name (not used by Hudi)
+    pub comparator_class_name: String,
+    /// Compression codec used for blocks
+    pub compression_codec: CompressionCodec,
+}
+
+impl HFileTrailer {
+    /// Read and parse the trailer from file bytes.
+    ///
+    /// The trailer is always at the end of the file with fixed size.
+    pub fn read(file_bytes: &[u8]) -> Result<Self> {
+        let file_size = file_bytes.len();
+        if file_size < TRAILER_SIZE {
+            return Err(HFileError::InvalidFormat(format!(
+                "File too small for HFile trailer: {} bytes, need at least {}",
+                file_size, TRAILER_SIZE
+            )));
+        }
+
+        let trailer_start = file_size - TRAILER_SIZE;
+        let trailer_bytes = &file_bytes[trailer_start..];
+
+        // Verify trailer magic
+        HFileBlockType::Trailer.check_magic(trailer_bytes)?;
+
+        // Read version from last 4 bytes
+        // Format: [minor_version (1 byte)] [major_version (3 bytes)]
+        let version_bytes = &trailer_bytes[TRAILER_SIZE - 4..];
+        let minor_version = version_bytes[0] as u32;
+        let major_version = ((version_bytes[1] as u32) << 16)
+            | ((version_bytes[2] as u32) << 8)
+            | (version_bytes[3] as u32);
+
+        if major_version != 3 {
+            return Err(HFileError::UnsupportedVersion {
+                major: major_version,
+                minor: minor_version,
+            });
+        }
+
+        // Parse protobuf content (after magic, before version)
+        let proto_start = MAGIC_LENGTH;
+        let proto_end = TRAILER_SIZE - 4;
+        let proto_bytes = &trailer_bytes[proto_start..proto_end];
+
+        // Protobuf is length-delimited
+        let proto = Self::parse_length_delimited_proto(proto_bytes)?;
+
+        let compression_codec = proto
+            .compression_codec
+            .map(CompressionCodec::from_id)
+            .transpose()?
+            .unwrap_or_default();
+
+        Ok(Self {
+            major_version,
+            minor_version,
+            file_info_offset: proto.file_info_offset.unwrap_or(0),
+            load_on_open_data_offset: proto.load_on_open_data_offset.unwrap_or(0),
+            uncompressed_data_index_size: proto.uncompressed_data_index_size.unwrap_or(0),
+            total_uncompressed_bytes: proto.total_uncompressed_bytes.unwrap_or(0),
+            data_index_count: proto.data_index_count.unwrap_or(0),
+            meta_index_count: proto.meta_index_count.unwrap_or(0),
+            entry_count: proto.entry_count.unwrap_or(0),
+            num_data_index_levels: proto.num_data_index_levels.unwrap_or(1),
+            first_data_block_offset: proto.first_data_block_offset.unwrap_or(0),
+            last_data_block_offset: proto.last_data_block_offset.unwrap_or(0),
+            comparator_class_name: proto.comparator_class_name.unwrap_or_default(),
+            compression_codec,
+        })
+    }
+
+    /// Parse a length-delimited protobuf message.
+    fn parse_length_delimited_proto(bytes: &[u8]) -> Result<TrailerProto> {
+        // Read varint length
+        let (length, consumed) = read_varint(bytes);
+        let proto_bytes = &bytes[consumed..consumed + length as usize];
+        TrailerProto::decode(proto_bytes).map_err(HFileError::from)
+    }
+}
+
+/// Read a varint from bytes. Returns (value, bytes_consumed).
+fn read_varint(bytes: &[u8]) -> (u64, usize) {
+    let mut result: u64 = 0;
+    let mut shift = 0;
+    let mut pos = 0;
+
+    while pos < bytes.len() {
+        let b = bytes[pos] as u64;
+        pos += 1;
+        result |= (b & 0x7F) << shift;
+        if b & 0x80 == 0 {
+            break;
+        }
+        shift += 7;
+    }
+
+    (result, pos)
+}
diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs
index 9438db5..5b1d891 100644
--- a/crates/core/src/lib.rs
+++ b/crates/core/src/lib.rs
@@ -48,6 +48,7 @@
 pub mod error;
 pub mod expr;
 pub mod file_group;
+pub mod hfile;
 pub mod merge;
 pub mod metadata;
 mod record;
diff --git a/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_13_16KB_GZ_20000_fake_first_key.hfile b/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_13_16KB_GZ_20000_fake_first_key.hfile
new file mode 100644
index 0000000..4825637
--- /dev/null
+++ b/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_13_16KB_GZ_20000_fake_first_key.hfile
Binary files differ
diff --git a/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_13_1KB_GZ_10000_large_keys_deep_index.hfile b/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_13_1KB_GZ_10000_large_keys_deep_index.hfile
new file mode 100644
index 0000000..03c7960
--- /dev/null
+++ b/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_13_1KB_GZ_10000_large_keys_deep_index.hfile
Binary files differ
diff --git a/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_13_1KB_GZ_20000_large_keys.hfile b/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_13_1KB_GZ_20000_large_keys.hfile
new file mode 100644
index 0000000..bacd245
--- /dev/null
+++ b/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_13_1KB_GZ_20000_large_keys.hfile
Binary files differ
diff --git a/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_9_16KB_GZ_20000.hfile b/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_9_16KB_GZ_20000.hfile
new file mode 100644
index 0000000..243eb66
--- /dev/null
+++ b/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_9_16KB_GZ_20000.hfile
Binary files differ
diff --git a/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_9_16KB_GZ_200_20_non_unique.hfile b/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_9_16KB_GZ_200_20_non_unique.hfile
new file mode 100644
index 0000000..6c86b01
--- /dev/null
+++ b/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_9_16KB_GZ_200_20_non_unique.hfile
Binary files differ
diff --git a/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile b/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile
new file mode 100644
index 0000000..c12188d
--- /dev/null
+++ b/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile
Binary files differ
diff --git a/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_9_512KB_GZ_20000.hfile b/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_9_512KB_GZ_20000.hfile
new file mode 100644
index 0000000..4d35a3e
--- /dev/null
+++ b/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_9_512KB_GZ_20000.hfile
Binary files differ
diff --git a/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_9_64KB_NONE_5000.hfile b/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_9_64KB_NONE_5000.hfile
new file mode 100644
index 0000000..923bb84
--- /dev/null
+++ b/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_9_64KB_NONE_5000.hfile
Binary files differ
diff --git a/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_9_no_entry.hfile b/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_9_no_entry.hfile
new file mode 100644
index 0000000..0471f98
--- /dev/null
+++ b/crates/core/tests/data/hfile/hudi_1_0_hbase_2_4_9_no_entry.hfile
Binary files differ