fix format.
diff --git a/cpp/src/common/tsblock/tsblock.h b/cpp/src/common/tsblock/tsblock.h
index c7744f0..859ad39 100644
--- a/cpp/src/common/tsblock/tsblock.h
+++ b/cpp/src/common/tsblock/tsblock.h
@@ -144,12 +144,6 @@
         ASSERT(tsblock_->row_count_ > 0);
         tsblock_->row_count_--;
     }
-    FORCE_INLINE uint32_t remaining() const {
-        return tsblock_->max_row_count_ - tsblock_->row_count_;
-    }
-    FORCE_INLINE void add_rows(uint32_t count) {
-        tsblock_->row_count_ += count;
-    }
 
     FORCE_INLINE void append(uint32_t slot_index, const char* value,
                              uint32_t len) {
diff --git a/cpp/src/encoding/decoder.h b/cpp/src/encoding/decoder.h
index f85ccf1..c290b57 100644
--- a/cpp/src/encoding/decoder.h
+++ b/cpp/src/encoding/decoder.h
@@ -21,7 +21,6 @@
 #define ENCODING_DECODER_H
 
 #include "common/allocator/byte_stream.h"
-#include "common/db_common.h"
 
 namespace storage {
 
@@ -38,102 +37,6 @@
     virtual int read_double(double& ret_value, common::ByteStream& in) = 0;
     virtual int read_String(common::String& ret_value, common::PageArena& pa,
                             common::ByteStream& in) = 0;
-
-    virtual int read_batch_int32(int32_t* out, int capacity, int& actual,
-                                 common::ByteStream& in) {
-        actual = 0;
-        int ret = common::E_OK;
-        int32_t val;
-        while (actual < capacity && has_remaining(in)) {
-            ret = read_int32(val, in);
-            if (ret != common::E_OK) return ret;
-            out[actual++] = val;
-        }
-        return common::E_OK;
-    }
-
-    virtual int read_batch_int64(int64_t* out, int capacity, int& actual,
-                                 common::ByteStream& in) {
-        actual = 0;
-        int ret = common::E_OK;
-        int64_t val;
-        while (actual < capacity && has_remaining(in)) {
-            ret = read_int64(val, in);
-            if (ret != common::E_OK) return ret;
-            out[actual++] = val;
-        }
-        return common::E_OK;
-    }
-
-    virtual int read_batch_float(float* out, int capacity, int& actual,
-                                 common::ByteStream& in) {
-        actual = 0;
-        int ret = common::E_OK;
-        float val;
-        while (actual < capacity && has_remaining(in)) {
-            ret = read_float(val, in);
-            if (ret != common::E_OK) return ret;
-            out[actual++] = val;
-        }
-        return common::E_OK;
-    }
-
-    virtual int read_batch_double(double* out, int capacity, int& actual,
-                                  common::ByteStream& in) {
-        actual = 0;
-        int ret = common::E_OK;
-        double val;
-        while (actual < capacity && has_remaining(in)) {
-            ret = read_double(val, in);
-            if (ret != common::E_OK) return ret;
-            out[actual++] = val;
-        }
-        return common::E_OK;
-    }
-
-    virtual int skip_int32(int count, int& skipped, common::ByteStream& in) {
-        skipped = 0;
-        int32_t dummy;
-        while (skipped < count && has_remaining(in)) {
-            int ret = read_int32(dummy, in);
-            if (ret != common::E_OK) return ret;
-            ++skipped;
-        }
-        return common::E_OK;
-    }
-
-    virtual int skip_int64(int count, int& skipped, common::ByteStream& in) {
-        skipped = 0;
-        int64_t dummy;
-        while (skipped < count && has_remaining(in)) {
-            int ret = read_int64(dummy, in);
-            if (ret != common::E_OK) return ret;
-            ++skipped;
-        }
-        return common::E_OK;
-    }
-
-    virtual int skip_float(int count, int& skipped, common::ByteStream& in) {
-        skipped = 0;
-        float dummy;
-        while (skipped < count && has_remaining(in)) {
-            int ret = read_float(dummy, in);
-            if (ret != common::E_OK) return ret;
-            ++skipped;
-        }
-        return common::E_OK;
-    }
-
-    virtual int skip_double(int count, int& skipped, common::ByteStream& in) {
-        skipped = 0;
-        double dummy;
-        while (skipped < count && has_remaining(in)) {
-            int ret = read_double(dummy, in);
-            if (ret != common::E_OK) return ret;
-            ++skipped;
-        }
-        return common::E_OK;
-    }
 };
 
 }  // end namespace storage
diff --git a/cpp/src/reader/aligned_chunk_reader.cc b/cpp/src/reader/aligned_chunk_reader.cc
index 1150e40..7688ed1 100644
--- a/cpp/src/reader/aligned_chunk_reader.cc
+++ b/cpp/src/reader/aligned_chunk_reader.cc
@@ -168,7 +168,9 @@
 
 int AlignedChunkReader::get_next_page(TsBlock* ret_tsblock,
                                       Filter* oneshoot_filter, PageArena& pa) {
-    return get_next_page_multi(ret_tsblock, oneshoot_filter, pa);
+    return get_next_page_multi(ret_tsblock, oneshoot_filter, pa,
+                               std::numeric_limits<int64_t>::min(), nullptr,
+                               nullptr);
 }
 
 int AlignedChunkReader::get_cur_page_header(ChunkMeta*& chunk_meta,
@@ -332,7 +334,8 @@
     if (row_limit == 0) {
         return E_NO_MORE_DATA;
     }
-    return get_next_page_multi(ret_tsblock, oneshoot_filter, pa);
+    return get_next_page_multi(ret_tsblock, oneshoot_filter, pa, min_time_hint,
+                               &row_offset, &row_limit);
 }
 
 int AlignedChunkReader::load_by_aligned_meta_multi(
@@ -448,15 +451,20 @@
 
 int AlignedChunkReader::get_next_page_multi(TsBlock* ret_tsblock,
                                             Filter* oneshoot_filter,
-                                            PageArena& pa) {
+                                            PageArena& pa,
+                                            int64_t min_time_hint,
+                                            int* row_offset, int* row_limit) {
     int ret = E_OK;
     Filter* filter =
         (oneshoot_filter != nullptr ? oneshoot_filter : time_filter_);
 
+    if (row_limit != nullptr && *row_limit == 0) return E_NO_MORE_DATA;
+
     // Resume chunk-level scatter from previous E_OVERFLOW.
     if (chunk_level_active_) {
         RowAppender row_appender(ret_tsblock);
-        ret = scatter_chunk_pages(ret_tsblock, row_appender, filter, &pa);
+        ret = scatter_chunk_pages(ret_tsblock, row_appender, filter, &pa,
+                                  row_offset, row_limit);
         if (ret != E_OVERFLOW) {
             cleanup_chunk_decode();
         } else {
@@ -482,7 +490,8 @@
 
         chunk_level_active_ = true;
         RowAppender row_appender(ret_tsblock);
-        ret = scatter_chunk_pages(ret_tsblock, row_appender, filter, &pa);
+        ret = scatter_chunk_pages(ret_tsblock, row_appender, filter, &pa,
+                                  row_offset, row_limit);
         if (ret != E_OVERFLOW) {
             cleanup_chunk_decode();
         } else {
@@ -493,18 +502,21 @@
 #endif
 
     // Serial fallback.
-    return get_next_page_multi_serial(ret_tsblock, filter, pa);
+    return get_next_page_multi_serial(ret_tsblock, filter, pa, min_time_hint,
+                                      row_offset);
 }
 
 int AlignedChunkReader::get_next_page_multi_serial(TsBlock* ret_tsblock,
                                                    Filter* filter,
-                                                   PageArena& pa) {
+                                                   PageArena& pa,
+                                                   int64_t min_time_hint,
+                                                   int* row_offset) {
     int ret = E_OK;
     bool pt = prev_time_page_not_finish();
     bool pv = prev_any_value_page_not_finish_multi();
     if (pt && pv) {
-        ret =
-            decode_time_value_buf_into_tsblock_multi(ret_tsblock, filter, &pa);
+        ret = decode_time_value_buf_into_tsblock_multi(ret_tsblock, filter, &pa,
+                                                       row_offset, nullptr);
         return ret;
     }
     if (!pt && !pv) {
@@ -523,8 +535,25 @@
                 }
             }
             if (IS_FAIL(ret)) break;
-            if (cur_page_statisify_filter_multi(filter)) break;
-            if (RET_FAIL(skip_cur_page_multi())) break;
+            if (!cur_page_statisify_filter_multi(filter)) {
+                if (RET_FAIL(skip_cur_page_multi())) break;
+            } else if (min_time_hint != std::numeric_limits<int64_t>::min() &&
+                       cur_time_page_header_.statistic_ != nullptr &&
+                       cur_time_page_header_.statistic_->end_time_ <
+                           min_time_hint) {
+                // Skip page whose time range is entirely before hint.
+                if (RET_FAIL(skip_cur_page_multi())) break;
+            } else if (row_offset != nullptr && *row_offset > 0 &&
+                       cur_time_page_header_.statistic_ != nullptr &&
+                       cur_time_page_header_.statistic_->count_ > 0 &&
+                       *row_offset >=
+                           cur_time_page_header_.statistic_->count_) {
+                // Skip entire page by offset.
+                *row_offset -= cur_time_page_header_.statistic_->count_;
+                if (RET_FAIL(skip_cur_page_multi())) break;
+            } else {
+                break;
+            }
             if (!has_more_data()) {
                 ret = E_NO_MORE_DATA;
                 break;
@@ -536,8 +565,8 @@
         }
     }
     if (IS_SUCC(ret)) {
-        ret =
-            decode_time_value_buf_into_tsblock_multi(ret_tsblock, filter, &pa);
+        ret = decode_time_value_buf_into_tsblock_multi(ret_tsblock, filter, &pa,
+                                                       row_offset, nullptr);
     }
     return ret;
 }
@@ -657,10 +686,12 @@
 }
 
 int AlignedChunkReader::decode_time_value_buf_into_tsblock_multi(
-    TsBlock*& ret_tsblock, Filter* filter, PageArena* pa) {
+    TsBlock*& ret_tsblock, Filter* filter, PageArena* pa, int* row_offset,
+    int* row_limit) {
     int ret = E_OK;
     RowAppender row_appender(ret_tsblock);
-    ret = multi_DECODE_TV_BATCH(ret_tsblock, row_appender, filter, pa);
+    ret = multi_decode_tv_row_by_row(ret_tsblock, row_appender, filter, pa,
+                                     row_offset, row_limit);
 
     // Release uncompressed buffers if pages are done
     if (ret != E_OVERFLOW) {
@@ -689,224 +720,165 @@
     return ret;
 }
 
-int AlignedChunkReader::multi_DECODE_TV_BATCH(TsBlock* ret_tsblock,
-                                              RowAppender& row_appender,
-                                              Filter* filter, PageArena* pa) {
+int AlignedChunkReader::multi_decode_tv_row_by_row(
+    TsBlock* ret_tsblock, RowAppender& row_appender, Filter* filter,
+    PageArena* pa, int* row_offset, int* row_limit) {
     int ret = E_OK;
-    const int BATCH = 129;
-    int64_t times[BATCH];
     const uint32_t null_mask_base = 1 << 7;
     const uint32_t num_cols = value_columns_.size();
+    int64_t time = 0;
+
+    auto skip_value = [](ValueColumnState* col, common::PageArena* pa) {
+        switch (col->chunk_header.data_type_) {
+            case common::BOOLEAN: {
+                bool d;
+                col->decoder->read_boolean(d, col->in);
+                break;
+            }
+            case common::INT32:
+            case common::DATE: {
+                int32_t d;
+                col->decoder->read_int32(d, col->in);
+                break;
+            }
+            case common::INT64:
+            case common::TIMESTAMP: {
+                int64_t d;
+                col->decoder->read_int64(d, col->in);
+                break;
+            }
+            case common::FLOAT: {
+                float d;
+                col->decoder->read_float(d, col->in);
+                break;
+            }
+            case common::DOUBLE: {
+                double d;
+                col->decoder->read_double(d, col->in);
+                break;
+            }
+            case common::STRING:
+            case common::TEXT:
+            case common::BLOB: {
+                common::String d;
+                col->decoder->read_String(d, *pa, col->in);
+                break;
+            }
+            default:
+                break;
+        }
+    };
+
+    auto read_and_append_value = [&](ValueColumnState* col, uint32_t slot,
+                                     RowAppender& ra, common::PageArena* pa) {
+        switch (col->chunk_header.data_type_) {
+            case common::BOOLEAN: {
+                bool v;
+                col->decoder->read_boolean(v, col->in);
+                ra.append(slot, (char*)&v, sizeof(v));
+                break;
+            }
+            case common::INT32:
+            case common::DATE: {
+                int32_t v;
+                col->decoder->read_int32(v, col->in);
+                ra.append(slot, (char*)&v, sizeof(v));
+                break;
+            }
+            case common::INT64:
+            case common::TIMESTAMP: {
+                int64_t v;
+                col->decoder->read_int64(v, col->in);
+                ra.append(slot, (char*)&v, sizeof(v));
+                break;
+            }
+            case common::FLOAT: {
+                float v;
+                col->decoder->read_float(v, col->in);
+                ra.append(slot, (char*)&v, sizeof(v));
+                break;
+            }
+            case common::DOUBLE: {
+                double v;
+                col->decoder->read_double(v, col->in);
+                ra.append(slot, (char*)&v, sizeof(v));
+                break;
+            }
+            case common::STRING:
+            case common::TEXT:
+            case common::BLOB: {
+                common::String v;
+                col->decoder->read_String(v, *pa, col->in);
+                ra.append(slot, v.buf_, v.len_);
+                break;
+            }
+            default:
+                ra.append_null(slot);
+                break;
+        }
+    };
 
     while (time_decoder_->has_remaining(time_in_)) {
-        if (row_appender.remaining() < (uint32_t)BATCH) {
+        if (row_limit != nullptr && *row_limit == 0) break;
+
+        // Check capacity BEFORE consuming timestamp
+        if (UNLIKELY(!row_appender.add_row())) {
             ret = E_OVERFLOW;
             break;
         }
 
-        // ── Phase 1: Decode a batch of timestamps ──
-        int time_count = 0;
-        if (RET_FAIL(time_decoder_->read_batch_int64(times, BATCH, time_count,
-                                                     time_in_))) {
+        ret = time_decoder_->read_int64(time, time_in_);
+        if (ret != E_OK) {
+            row_appender.backoff_add_row();
             break;
         }
-        if (time_count == 0) break;
 
-        // ── Phase 2: Apply time filter ──
-        bool time_mask[BATCH];
-        bool block_all_pass = (filter == nullptr);
-        int pass_count = time_count;
-        if (!block_all_pass) {
-            pass_count =
-                filter->satisfy_batch_time(times, time_count, time_mask);
-        }
-
-        // ── Phase 3: Per-column null check + value decode ──
-        struct ColBatch {
-            bool is_null[BATCH];
-            int nonnull_count;
-            char val_buf[BATCH * 8];
-            int val_count;
-        };
-        std::vector<ColBatch> col_batches(num_cols);
-
+        // Advance value index for all columns
         for (uint32_t c = 0; c < num_cols; c++) {
-            auto* col = value_columns_[c];
-            auto& cb = col_batches[c];
-            cb.nonnull_count = 0;
-            cb.val_count = 0;
-            for (int i = 0; i < time_count; i++) {
-                int vi = col->cur_value_index + 1 + i;
-                if (col->notnull_bitmap.empty() ||
-                    ((col->notnull_bitmap[vi / 8] & 0xFF) &
-                     (null_mask_base >> (vi % 8))) == 0) {
-                    cb.is_null[i] = true;
-                } else {
-                    cb.is_null[i] = false;
-                    cb.nonnull_count++;
-                }
-            }
-
-            // Skip values if no rows pass time filter
-            if (pass_count == 0 && cb.nonnull_count > 0) {
-                switch (col->chunk_header.data_type_) {
-                    case common::BOOLEAN: {
-                        for (int s = 0; s < cb.nonnull_count; s++) {
-                            bool dummy;
-                            col->decoder->read_boolean(dummy, col->in);
-                        }
-                        break;
-                    }
-                    case common::INT32:
-                    case common::DATE: {
-                        int sk = 0;
-                        col->decoder->skip_int32(cb.nonnull_count, sk, col->in);
-                        break;
-                    }
-                    case common::INT64:
-                    case common::TIMESTAMP: {
-                        int sk = 0;
-                        col->decoder->skip_int64(cb.nonnull_count, sk, col->in);
-                        break;
-                    }
-                    case common::FLOAT: {
-                        int sk = 0;
-                        col->decoder->skip_float(cb.nonnull_count, sk, col->in);
-                        break;
-                    }
-                    case common::DOUBLE: {
-                        int sk = 0;
-                        col->decoder->skip_double(cb.nonnull_count, sk,
-                                                  col->in);
-                        break;
-                    }
-                    default:
-                        break;
-                }
-                cb.nonnull_count = 0;
-            }
-
-            // Decode non-null values
-            if (cb.nonnull_count > 0) {
-                switch (col->chunk_header.data_type_) {
-                    case common::BOOLEAN: {
-                        bool* out = reinterpret_cast<bool*>(cb.val_buf);
-                        cb.val_count = 0;
-                        for (int s = 0; s < cb.nonnull_count; s++) {
-                            bool v;
-                            if (col->decoder->read_boolean(v, col->in) !=
-                                common::E_OK)
-                                break;
-                            out[cb.val_count++] = v;
-                        }
-                        break;
-                    }
-                    case common::INT32:
-                    case common::DATE:
-                        col->decoder->read_batch_int32(
-                            reinterpret_cast<int32_t*>(cb.val_buf),
-                            cb.nonnull_count, cb.val_count, col->in);
-                        break;
-                    case common::INT64:
-                    case common::TIMESTAMP:
-                        col->decoder->read_batch_int64(
-                            reinterpret_cast<int64_t*>(cb.val_buf),
-                            cb.nonnull_count, cb.val_count, col->in);
-                        break;
-                    case common::FLOAT:
-                        col->decoder->read_batch_float(
-                            reinterpret_cast<float*>(cb.val_buf),
-                            cb.nonnull_count, cb.val_count, col->in);
-                        break;
-                    case common::DOUBLE:
-                        col->decoder->read_batch_double(
-                            reinterpret_cast<double*>(cb.val_buf),
-                            cb.nonnull_count, cb.val_count, col->in);
-                        break;
-                    default:
-                        break;
-                }
-            }
+            value_columns_[c]->cur_value_index++;
         }
 
-        // ── Phase 4: Skip if no rows pass ──
-        if (pass_count == 0) {
+        // Time filter — skip row
+        bool skip_row =
+            (filter != nullptr && !filter->satisfy_start_end_time(time, time));
+
+        // Offset skip — skip row but count it
+        if (!skip_row && row_offset != nullptr && *row_offset > 0) {
+            (*row_offset)--;
+            skip_row = true;
+        }
+
+        if (skip_row) {
+            row_appender.backoff_add_row();
             for (uint32_t c = 0; c < num_cols; c++) {
-                value_columns_[c]->cur_value_index += time_count;
+                auto* col = value_columns_[c];
+                int vi = col->cur_value_index;
+                bool is_nonnull = !col->notnull_bitmap.empty() &&
+                                  ((col->notnull_bitmap[vi / 8] & 0xFF) &
+                                   (null_mask_base >> (vi % 8))) != 0;
+                if (is_nonnull && col->decoder->has_remaining(col->in)) {
+                    skip_value(col, pa);
+                }
             }
             continue;
         }
 
-        // ── Phase 5: Scatter into TsBlock ──
+        row_appender.append(0, (char*)&time, sizeof(time));
 
-        // Fast path: all rows pass filter AND all columns have no nulls
-        if (pass_count == time_count) {
-            bool all_nonnull = true;
-            for (uint32_t c = 0; c < num_cols; c++) {
-                if (col_batches[c].nonnull_count != time_count) {
-                    all_nonnull = false;
-                    break;
-                }
-            }
-            if (all_nonnull) {
-                common::Vector* time_vec = ret_tsblock->get_vector(0);
-                time_vec->get_value_data().append_fixed_value(
-                    (const char*)times,
-                    static_cast<uint32_t>(time_count) * sizeof(int64_t));
-                for (uint32_t c = 0; c < num_cols; c++) {
-                    auto& cb = col_batches[c];
-                    auto* col = value_columns_[c];
-                    uint32_t elem_size = common::get_data_type_size(
-                        col->chunk_header.data_type_);
-                    common::Vector* vec = ret_tsblock->get_vector(c + 1);
-                    vec->get_value_data().append_fixed_value(
-                        cb.val_buf,
-                        static_cast<uint32_t>(cb.val_count) * elem_size);
-                    col->cur_value_index += time_count;
-                }
-                row_appender.add_rows(static_cast<uint32_t>(time_count));
-                continue;
+        for (uint32_t c = 0; c < num_cols; c++) {
+            auto* col = value_columns_[c];
+            int vi = col->cur_value_index;
+            bool is_nonnull = !col->notnull_bitmap.empty() &&
+                              ((col->notnull_bitmap[vi / 8] & 0xFF) &
+                               (null_mask_base >> (vi % 8))) != 0;
+
+            if (!is_nonnull || !col->decoder->has_remaining(col->in)) {
+                row_appender.append_null(c + 1);
+            } else {
+                read_and_append_value(col, c + 1, row_appender, pa);
             }
         }
-
-        // Slow path: per-row scatter
-        std::vector<int> val_idx(num_cols, 0);
-
-        for (int i = 0; i < time_count; i++) {
-            bool passes = block_all_pass || time_mask[i];
-
-            if (!passes) {
-                for (uint32_t c = 0; c < num_cols; c++) {
-                    value_columns_[c]->cur_value_index++;
-                    if (!col_batches[c].is_null[i]) val_idx[c]++;
-                }
-                continue;
-            }
-
-            if (UNLIKELY(!row_appender.add_row())) {
-                ret = E_OVERFLOW;
-                break;
-            }
-
-            row_appender.append(0, (char*)&times[i], sizeof(int64_t));
-
-            for (uint32_t c = 0; c < num_cols; c++) {
-                value_columns_[c]->cur_value_index++;
-                auto& cb = col_batches[c];
-                auto* col = value_columns_[c];
-
-                if (cb.is_null[i]) {
-                    row_appender.append_null(c + 1);
-                } else {
-                    uint32_t elem_size = common::get_data_type_size(
-                        col->chunk_header.data_type_);
-                    row_appender.append(
-                        c + 1, cb.val_buf + val_idx[c] * elem_size, elem_size);
-                    val_idx[c]++;
-                }
-            }
-        }
-        if (ret != E_OK) break;
+        if (row_limit != nullptr) (*row_limit)--;
     }
     return ret;
 }
@@ -1063,13 +1035,10 @@
         ts_in.wrap_from(ub, us);
         time_decoder_->reset();
         td.times.clear();
-        const int BS = 1024;
-        int64_t buf[BS];
         while (time_decoder_->has_remaining(ts_in)) {
-            int actual = 0;
-            time_decoder_->read_batch_int64(buf, BS, actual, ts_in);
-            if (actual == 0) break;
-            td.times.insert(td.times.end(), buf, buf + actual);
+            int64_t t;
+            if (time_decoder_->read_int64(t, ts_in) != E_OK) break;
+            td.times.push_back(t);
         }
         td.count = (int)td.times.size();
         time_compressor_->after_uncompress(ub);
@@ -1161,27 +1130,43 @@
                     break;
                 }
                 case common::INT32:
-                case common::DATE:
-                    col->decoder->read_batch_int32(
-                        reinterpret_cast<int32_t*>(cp.values.data()), nn,
-                        cp.nonnull_count, vi);
+                case common::DATE: {
+                    int32_t* out = reinterpret_cast<int32_t*>(cp.values.data());
+                    for (int s = 0; s < nn; s++) {
+                        int32_t v;
+                        if (col->decoder->read_int32(v, vi) != E_OK) break;
+                        out[cp.nonnull_count++] = v;
+                    }
                     break;
+                }
                 case common::INT64:
-                case common::TIMESTAMP:
-                    col->decoder->read_batch_int64(
-                        reinterpret_cast<int64_t*>(cp.values.data()), nn,
-                        cp.nonnull_count, vi);
+                case common::TIMESTAMP: {
+                    int64_t* out = reinterpret_cast<int64_t*>(cp.values.data());
+                    for (int s = 0; s < nn; s++) {
+                        int64_t v;
+                        if (col->decoder->read_int64(v, vi) != E_OK) break;
+                        out[cp.nonnull_count++] = v;
+                    }
                     break;
-                case common::FLOAT:
-                    col->decoder->read_batch_float(
-                        reinterpret_cast<float*>(cp.values.data()), nn,
-                        cp.nonnull_count, vi);
+                }
+                case common::FLOAT: {
+                    float* out = reinterpret_cast<float*>(cp.values.data());
+                    for (int s = 0; s < nn; s++) {
+                        float v;
+                        if (col->decoder->read_float(v, vi) != E_OK) break;
+                        out[cp.nonnull_count++] = v;
+                    }
                     break;
-                case common::DOUBLE:
-                    col->decoder->read_batch_double(
-                        reinterpret_cast<double*>(cp.values.data()), nn,
-                        cp.nonnull_count, vi);
+                }
+                case common::DOUBLE: {
+                    double* out = reinterpret_cast<double*>(cp.values.data());
+                    for (int s = 0; s < nn; s++) {
+                        double v;
+                        if (col->decoder->read_double(v, vi) != E_OK) break;
+                        out[cp.nonnull_count++] = v;
+                    }
                     break;
+                }
                 default:
                     break;
             }
@@ -1207,18 +1192,36 @@
 
 int AlignedChunkReader::scatter_chunk_pages(TsBlock* ret_tsblock,
                                             RowAppender& row_appender,
-                                            Filter* filter, PageArena* pa) {
+                                            Filter* filter, PageArena* pa,
+                                            int* row_offset, int* row_limit) {
     int ret = E_OK;
     const uint32_t null_mask_base = 1 << 7;
     const uint32_t num_cols = value_columns_.size();
     const size_t np = chunk_pages_.size();
 
     while ((size_t)chunk_page_cursor_ < np) {
+        if (row_limit != nullptr && *row_limit == 0) break;
+
         auto& td = chunk_times_[chunk_page_cursor_];
         if (td.cursor >= td.count) {
             chunk_page_cursor_++;
             continue;
         }
+
+        // Page-level offset skip: skip entire pre-decoded page.
+        if (row_offset != nullptr && *row_offset > 0 &&
+            *row_offset >= (td.count - td.cursor)) {
+            *row_offset -= (td.count - td.cursor);
+            // Advance read_pos for all columns
+            for (uint32_t c = 0; c < num_cols; c++) {
+                auto& cp = chunk_cols_[c][chunk_page_cursor_];
+                cp.read_pos = cp.nonnull_count;  // fully consumed
+            }
+            td.cursor = td.count;
+            chunk_page_cursor_++;
+            continue;
+        }
+
         auto& info = chunk_pages_[chunk_page_cursor_];
 
         bool need_filter = (info.pass_type == PagePassType::BOUNDARY);
@@ -1239,33 +1242,38 @@
 
         if (can_bulk) {
             while (td.cursor < td.count) {
-                int avail = (int)row_appender.remaining();
-                if (avail <= 0) return E_OVERFLOW;
-                int batch = std::min(td.count - td.cursor, avail);
-
-                ret_tsblock->get_vector(0)->get_value_data().append_fixed_value(
-                    (const char*)&td.times[td.cursor],
-                    static_cast<uint32_t>(batch) * sizeof(int64_t));
+                if (row_limit != nullptr && *row_limit == 0) break;
+                // Row-level offset skip
+                if (row_offset != nullptr && *row_offset > 0) {
+                    (*row_offset)--;
+                    for (uint32_t c = 0; c < num_cols; c++)
+                        chunk_cols_[c][chunk_page_cursor_].read_pos++;
+                    td.cursor++;
+                    continue;
+                }
+                if (UNLIKELY(!row_appender.add_row())) return E_OVERFLOW;
+                int64_t t = td.times[td.cursor];
+                row_appender.append(0, (char*)&t, sizeof(int64_t));
                 for (uint32_t c = 0; c < num_cols; c++) {
                     auto& cp = chunk_cols_[c][chunk_page_cursor_];
                     uint32_t es = common::get_data_type_size(
                         value_columns_[c]->chunk_header.data_type_);
-                    ret_tsblock->get_vector(c + 1)
-                        ->get_value_data()
-                        .append_fixed_value(
-                            cp.values.data() +
-                                static_cast<size_t>(cp.read_pos) * es,
-                            static_cast<uint32_t>(batch) * es);
-                    cp.read_pos += batch;
+                    row_appender.append(
+                        c + 1,
+                        cp.values.data() +
+                            static_cast<size_t>(cp.read_pos) * es,
+                        es);
+                    cp.read_pos++;
                 }
-                row_appender.add_rows(static_cast<uint32_t>(batch));
-                td.cursor += batch;
+                td.cursor++;
+                if (row_limit != nullptr) (*row_limit)--;
             }
         } else {
             while (td.cursor < td.count) {
-                if (row_appender.remaining() == 0) return E_OVERFLOW;
+                if (row_limit != nullptr && *row_limit == 0) break;
                 int64_t t = td.times[td.cursor];
 
+                // Filter skip
                 if (need_filter && filter != nullptr &&
                     !filter->satisfy_start_end_time(t, t)) {
                     for (uint32_t c = 0; c < num_cols; c++) {
@@ -1281,6 +1289,22 @@
                     continue;
                 }
 
+                // Offset skip
+                if (row_offset != nullptr && *row_offset > 0) {
+                    (*row_offset)--;
+                    for (uint32_t c = 0; c < num_cols; c++) {
+                        auto& cp = chunk_cols_[c][chunk_page_cursor_];
+                        if (cp.data_num > 0 && !cp.bitmap.empty()) {
+                            int vi = td.cursor;
+                            if ((cp.bitmap[vi / 8] & 0xFF) &
+                                (null_mask_base >> (vi % 8)))
+                                cp.read_pos++;
+                        }
+                    }
+                    td.cursor++;
+                    continue;
+                }
+
                 if (UNLIKELY(!row_appender.add_row())) return E_OVERFLOW;
                 row_appender.append(0, (char*)&t, sizeof(int64_t));
 
@@ -1306,6 +1330,7 @@
                     }
                 }
                 td.cursor++;
+                if (row_limit != nullptr) (*row_limit)--;
             }
         }
         chunk_page_cursor_++;
diff --git a/cpp/src/reader/aligned_chunk_reader.h b/cpp/src/reader/aligned_chunk_reader.h
index 9b74c5e..9b6f474 100644
--- a/cpp/src/reader/aligned_chunk_reader.h
+++ b/cpp/src/reader/aligned_chunk_reader.h
@@ -81,11 +81,6 @@
     PageHeader cur_page_header;
     std::vector<uint8_t> notnull_bitmap;
     int32_t cur_value_index = -1;
-
-    std::vector<char> predecoded_values;
-    int predecoded_count = 0;
-    int predecoded_read_pos = 0;
-    bool predecoded = false;
 };
 
 class AlignedChunkReader : public IChunkReader {
@@ -173,29 +168,34 @@
     bool has_more_data_multi() const;
     bool prev_any_value_page_not_finish_multi() const;
     int get_next_page_multi(common::TsBlock* ret_tsblock,
-                            Filter* oneshoot_filter, common::PageArena& pa);
+                            Filter* oneshoot_filter, common::PageArena& pa,
+                            int64_t min_time_hint, int* row_offset,
+                            int* row_limit);
     int get_next_page_multi_serial(common::TsBlock* ret_tsblock, Filter* filter,
-                                   common::PageArena& pa);
+                                   common::PageArena& pa, int64_t min_time_hint,
+                                   int* row_offset);
     int skip_cur_page_multi();
     bool cur_page_statisify_filter_multi(Filter* filter);
     int decode_cur_value_pages_multi();
-    int decode_cur_value_page_data_for(ValueColumnState& col);
     int ensure_value_page_loaded(ValueColumnState& col);
     static int decompress_and_parse_value_page(ValueColumnState& col);
-    void predecode_all_timestamps();
     int decode_time_value_buf_into_tsblock_multi(common::TsBlock*& ret_tsblock,
                                                  Filter* filter,
-                                                 common::PageArena* pa);
-    int multi_DECODE_TV_BATCH(common::TsBlock* ret_tsblock,
-                              common::RowAppender& row_appender, Filter* filter,
-                              common::PageArena* pa);
+                                                 common::PageArena* pa,
+                                                 int* row_offset,
+                                                 int* row_limit);
+    int multi_decode_tv_row_by_row(common::TsBlock* ret_tsblock,
+                                   common::RowAppender& row_appender,
+                                   Filter* filter, common::PageArena* pa,
+                                   int* row_offset, int* row_limit);
 
     // ── Chunk-level parallel decode methods ─────────────────────────────
     int scan_chunk_pages(Filter* filter);
     int decode_chunk_pages();
     int scatter_chunk_pages(common::TsBlock* tsblock,
                             common::RowAppender& row_appender, Filter* filter,
-                            common::PageArena* pa);
+                            common::PageArena* pa, int* row_offset,
+                            int* row_limit);
     void cleanup_chunk_decode();
 
    private:
diff --git a/cpp/src/reader/filter/filter.h b/cpp/src/reader/filter/filter.h
index 1146e46..f39dddb 100644
--- a/cpp/src/reader/filter/filter.h
+++ b/cpp/src/reader/filter/filter.h
@@ -63,19 +63,6 @@
         ASSERT(false);
         return nullptr;
     }
-
-    // Batch time filter: evaluate time filter on an array of timestamps.
-    // Writes true/false into @mask for each element.
-    // Returns the number of elements that passed (mask[i] == true).
-    virtual int satisfy_batch_time(const int64_t* times, int count,
-                                   bool* mask) {
-        int pass = 0;
-        for (int i = 0; i < count; ++i) {
-            mask[i] = satisfy_start_end_time(times[i], times[i]);
-            if (mask[i]) ++pass;
-        }
-        return pass;
-    }
 };
 
 }  // namespace storage