tmp code.
diff --git a/cpp/src/common/allocator/byte_stream.h b/cpp/src/common/allocator/byte_stream.h
index 47c5148..79efdfc 100644
--- a/cpp/src/common/allocator/byte_stream.h
+++ b/cpp/src/common/allocator/byte_stream.h
@@ -306,14 +306,14 @@
void clear_wrapped_buf() { wrapped_page_.buf_ = nullptr; }
/* ================ Part 1: basic ================ */
- FORCE_INLINE uint32_t remaining_size() const {
+ FORCE_INLINE int64_t remaining_size() const {
ASSERT(total_size_.load() >= read_pos_);
return total_size_.load() - read_pos_;
}
FORCE_INLINE bool has_remaining() const { return remaining_size() > 0; }
FORCE_INLINE void mark_read_pos() { marked_read_pos_ = read_pos_; }
- FORCE_INLINE uint32_t get_mark_len() const {
+ FORCE_INLINE int64_t get_mark_len() const {
ASSERT(marked_read_pos_ <= read_pos_);
return read_pos_ - marked_read_pos_;
}
@@ -346,8 +346,8 @@
this->total_size_.store(other.total_size_.load());
}
- FORCE_INLINE uint32_t total_size() const { return total_size_.load(); }
- FORCE_INLINE uint32_t read_pos() const { return read_pos_; };
+ FORCE_INLINE int64_t total_size() const { return total_size_.load(); }
+ FORCE_INLINE int64_t read_pos() const { return read_pos_; };
FORCE_INLINE void wrapped_buf_advance_read_pos(uint32_t size) {
if (size + read_pos_ > total_size_.load()) {
read_pos_ = total_size_.load();
@@ -527,7 +527,7 @@
// get tail position <tail_, total_size_> atomically
Page *host_end = nullptr;
- uint32_t host_total_size = 0;
+ int64_t host_total_size = 0;
while (true) {
host_end = host_.tail_.load();
host_total_size = host_.total_size_.load();
@@ -643,10 +643,10 @@
OptionalAtomic<Page *> head_;
OptionalAtomic<Page *> tail_;
Page *read_page_; // only one thread is allow to reader this ByteStream
- OptionalAtomic<uint32_t> total_size_; // total size in byte
- uint32_t read_pos_; // current reader position
- uint32_t marked_read_pos_; // current reader position
- uint32_t page_size_;
+ OptionalAtomic<int64_t> total_size_; // total size in byte
+ int64_t read_pos_; // current reader position
+ int64_t marked_read_pos_; // current reader position
+ int64_t page_size_;
AllocModID mid_;
Page wrapped_page_;
};
diff --git a/cpp/src/reader/aligned_chunk_reader.h b/cpp/src/reader/aligned_chunk_reader.h
index 58898f7..d39dd40 100644
--- a/cpp/src/reader/aligned_chunk_reader.h
+++ b/cpp/src/reader/aligned_chunk_reader.h
@@ -76,6 +76,15 @@
int get_next_page(common::TsBlock *tsblock, Filter *oneshoot_filter,
common::PageArena &pa) override;
+ bool should_skip(Filter *filter) override {
+ if (filter != nullptr && time_chunk_meta_ != nullptr &&
+ time_chunk_meta_->statistic_ != nullptr &&
+ !filter->satisfy(time_chunk_meta_->statistic_)) {
+ return true;
+ }
+ return false;
+ }
+
private:
FORCE_INLINE bool chunk_has_only_one_page(
const ChunkHeader &chunk_header) const {
diff --git a/cpp/src/reader/ichunk_reader.h b/cpp/src/reader/ichunk_reader.h
index b5f22b7..3352cfc 100644
--- a/cpp/src/reader/ichunk_reader.h
+++ b/cpp/src/reader/ichunk_reader.h
@@ -53,6 +53,7 @@
}
virtual ChunkHeader &get_chunk_header() { return chunk_header_; }
+ virtual bool should_skip(Filter* filter) { return false; }
protected:
ChunkHeader chunk_header_;
diff --git a/cpp/src/reader/tsfile_series_scan_iterator.cc b/cpp/src/reader/tsfile_series_scan_iterator.cc
index 46f4125..c7c6b42 100644
--- a/cpp/src/reader/tsfile_series_scan_iterator.cc
+++ b/cpp/src/reader/tsfile_series_scan_iterator.cc
@@ -46,31 +46,13 @@
ret_tsblock = alloc_tsblock();
}
- if (chunk_reader_->has_more_data()) {
- ChunkMeta* cm = nullptr;
- ChunkMeta* time_cm = nullptr;
- ChunkMeta* value_cm = nullptr;
-
- if (!is_aligned_) {
- if (chunk_meta_cursor_ == nullptr) {
- cm = nullptr;
- } else {
- cm = get_current_chunk_meta();
- }
- } else {
- time_cm = time_chunk_meta_cursor_ == nullptr ? nullptr : time_chunk_meta_cursor_.get();
- cm = time_cm;
- }
-
- if (filter != nullptr && cm != nullptr && cm->statistic_ != nullptr && !filter->satisfy(cm->statistic_)) {
- chunk_reader_->reset();
- }
+ if (chunk_reader_->should_skip(filter)) {
+ chunk_reader_->reset();
}
while (true) {
if (!chunk_reader_->has_more_data()) {
while (true) {
- advance_to_next_chunk();
if (!has_next_chunk()) {
return E_NO_MORE_DATA;
}
@@ -84,6 +66,7 @@
value_cm = value_chunk_meta_cursor_.get();
cm = time_cm;
}
+ advance_to_next_chunk();
if (filter != nullptr && cm->statistic_ != nullptr && !filter->satisfy(cm->statistic_)) {
continue;
}
diff --git a/cpp/test/writer/table_view/tsfile_writer_table_test.cc b/cpp/test/writer/table_view/tsfile_writer_table_test.cc
index 8a7bcf0..2d72fac 100644
--- a/cpp/test/writer/table_view/tsfile_writer_table_test.cc
+++ b/cpp/test/writer/table_view/tsfile_writer_table_test.cc
@@ -169,31 +169,32 @@
// tsfile_table_writer->flush();
// tsfile_table_writer->close();
- TsFileReader reader = TsFileReader();
- reader.open("/Users/colin/dev/tsfile/cpp/timebench_0.tsfile");
- ResultSet* ret = nullptr;
- auto start = std::chrono::high_resolution_clock::now();
- int ret_value = reader.query("timebench", {"value"}, 717840000, 717840000+1024, ret);
- ASSERT_EQ(common::E_OK, ret_value);
- auto* table_result_set = (TableResultSet*)ret;
- bool has_next = false;
- int cur_line = 0;
- while (IS_SUCC(table_result_set->next(has_next)) && has_next) {
- cur_line++;
- }
- // 记录结束时间点
- auto end = std::chrono::high_resolution_clock::now();
-
- // 计算耗时(微秒)
- auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
-
- // 输出结果
- std::cout << "耗时: " << duration.count() * 1.0 /1000/1000 << " 秒" << std::endl;
- ASSERT_EQ(cur_line, 1025);
- table_result_set->close();
- reader.destroy_query_data_set(table_result_set);
-
- reader.close();
+ // TsFileReader reader = TsFileReader();
+ // reader.open("/Users/colin/dev/tsfile/cpp/timebench_0.tsfile");
+ // ResultSet* ret = nullptr;
+ // auto start = std::chrono::high_resolution_clock::now();
+ // int ret_value = reader.query("timebench", {"value"}, 0, 1+1024, ret);
+ // ASSERT_EQ(common::E_OK, ret_value);
+ // auto* table_result_set = (TableResultSet*)ret;
+ // bool has_next = false;
+ // int cur_line = 0;
+ // while (IS_SUCC(table_result_set->next(has_next)) && has_next) {
+ // cur_line++;
+ // std::cout<<table_result_set->get_value<int64_t>(1);
+ // }
+ // // 记录结束时间点
+ // auto end = std::chrono::high_resolution_clock::now();
+ //
+ // // 计算耗时(微秒)
+ // auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
+ //
+ // // 输出结果
+ // std::cout << "耗时: " << duration.count() * 1.0 /1000/1000 << " 秒" << std::endl;
+ // ASSERT_EQ(cur_line, 1026);
+ // table_result_set->close();
+ // reader.destroy_query_data_set(table_result_set);
+ //
+ // reader.close();
}
TEST_F(TsFileWriterTableTest, WriteDisorderTest) {