| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * License); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License a |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| #include <gtest/gtest.h> |
| |
| #include <random> |
| |
| #include "common/record.h" |
| #include "common/schema.h" |
| #include "common/tablet.h" |
| #include "file/tsfile_io_writer.h" |
| #include "file/write_file.h" |
| #include "reader/filter/tag_filter.h" |
| #include "reader/table_result_set.h" |
| #include "reader/tsfile_reader.h" |
| #include "writer/chunk_writer.h" |
| #include "writer/tsfile_table_writer.h" |
| |
| using namespace storage; |
| using namespace common; |
| |
| class TsFileTableReaderTest : public ::testing::Test { |
| protected: |
| void SetUp() override { |
| libtsfile_init(); |
| file_name_ = std::string("tsfile_writer_table_test_") + |
| generate_random_string(10) + std::string(".tsfile"); |
| remove(file_name_.c_str()); |
| int flags = O_WRONLY | O_CREAT | O_TRUNC; |
| #ifdef _WIN32 |
| flags |= O_BINARY; |
| #endif |
| mode_t mode = 0666; |
| write_file_.create(file_name_, flags, mode); |
| } |
| void TearDown() override { |
| remove(file_name_.c_str()); |
| libtsfile_destroy(); |
| } |
| std::string file_name_; |
| WriteFile write_file_; |
| |
| public: |
| static std::string generate_random_string(int length) { |
| std::random_device rd; |
| std::mt19937 gen(rd()); |
| std::uniform_int_distribution<> dis(0, 61); |
| |
| const std::string chars = |
| "0123456789" |
| "abcdefghijklmnopqrstuvwxyz" |
| "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; |
| |
| std::string random_string; |
| |
| for (int i = 0; i < length; ++i) { |
| random_string += chars[dis(gen)]; |
| } |
| |
| return random_string; |
| } |
| |
| static TableSchema* gen_table_schema(int table_num) { |
| std::vector<MeasurementSchema*> measurement_schemas; |
| std::vector<ColumnCategory> column_categories; |
| int id_schema_num = 5; |
| int measurement_schema_num = 5; |
| for (int i = 0; i < id_schema_num; i++) { |
| measurement_schemas.emplace_back(new MeasurementSchema( |
| "id" + std::to_string(i), TSDataType::STRING, TSEncoding::PLAIN, |
| CompressionType::UNCOMPRESSED)); |
| column_categories.emplace_back(ColumnCategory::TAG); |
| } |
| for (int i = 0; i < measurement_schema_num; i++) { |
| measurement_schemas.emplace_back(new MeasurementSchema( |
| "s" + std::to_string(i), TSDataType::INT64, TSEncoding::PLAIN, |
| CompressionType::UNCOMPRESSED)); |
| column_categories.emplace_back(ColumnCategory::FIELD); |
| } |
| return new TableSchema("testTable" + std::to_string(table_num), |
| measurement_schemas, column_categories); |
| } |
| |
| static storage::Tablet gen_tablet(TableSchema* table_schema, int offset, |
| int device_num, |
| int num_timestamp_per_device = 10) { |
| storage::Tablet tablet(table_schema->get_table_name(), |
| table_schema->get_measurement_names(), |
| table_schema->get_data_types(), |
| table_schema->get_column_categories(), |
| device_num * num_timestamp_per_device); |
| |
| char* literal = new char[std::strlen("device_id") + 1]; |
| std::strcpy(literal, "device_id"); |
| String literal_str(literal, std::strlen("device_id")); |
| for (int i = 0; i < device_num; i++) { |
| for (int l = 0; l < num_timestamp_per_device; l++) { |
| int row_index = i * num_timestamp_per_device + l; |
| tablet.add_timestamp(row_index, offset + l); |
| auto column_schemas = table_schema->get_measurement_schemas(); |
| for (const auto& column_schema : column_schemas) { |
| switch (column_schema->data_type_) { |
| case TSDataType::INT64: |
| tablet.add_value(row_index, |
| column_schema->measurement_name_, |
| static_cast<int64_t>(i)); |
| break; |
| case TSDataType::STRING: |
| tablet.add_value(row_index, |
| column_schema->measurement_name_, |
| literal_str); |
| break; |
| default: |
| break; |
| } |
| } |
| } |
| } |
| delete[] literal; |
| return tablet; |
| } |
| |
| void test_table_model_query(uint32_t points_per_device = 10, |
| uint32_t device_num = 1, |
| int64_t end_time = 1000000000000) { |
| auto table_schema = gen_table_schema(0); |
| auto tsfile_table_writer_ = |
| std::make_shared<TsFileTableWriter>(&write_file_, table_schema); |
| |
| auto tablet = |
| gen_tablet(table_schema, 0, device_num, points_per_device); |
| ASSERT_EQ(tsfile_table_writer_->write_table(tablet), common::E_OK); |
| ASSERT_EQ(tsfile_table_writer_->flush(), common::E_OK); |
| ASSERT_EQ(tsfile_table_writer_->close(), common::E_OK); |
| storage::TsFileReader reader; |
| int ret = reader.open(file_name_); |
| ASSERT_EQ(ret, common::E_OK); |
| |
| ResultSet* tmp_result_set = nullptr; |
| Filter* tag_filter = |
| TagFilterBuilder(table_schema).eq("id0", "device_id"); |
| ret = reader.query(table_schema->get_table_name(), |
| table_schema->get_measurement_names(), 0, end_time, |
| tmp_result_set, tag_filter); |
| auto* table_result_set = (TableResultSet*)tmp_result_set; |
| char* literal = new char[std::strlen("device_id") + 1]; |
| std::strcpy(literal, "device_id"); |
| String literal_str(literal, std::strlen("device_id")); |
| bool has_next = false; |
| int64_t row_num = 0; |
| while (IS_SUCC(table_result_set->next(has_next)) && has_next) { |
| auto column_schemas = table_schema->get_measurement_schemas(); |
| for (const auto& column_schema : column_schemas) { |
| switch (column_schema->data_type_) { |
| case TSDataType::INT64: |
| ASSERT_EQ(table_result_set->get_value<int64_t>( |
| column_schema->measurement_name_), |
| (row_num / points_per_device) % device_num); |
| break; |
| case TSDataType::STRING: |
| ASSERT_EQ(table_result_set |
| ->get_value<common::String*>( |
| column_schema->measurement_name_) |
| ->compare(literal_str), |
| 0); |
| break; |
| default: |
| break; |
| } |
| } |
| for (int i = 2; i <= 6; i++) { |
| ASSERT_EQ( |
| table_result_set->get_value<common::String*>(i)->compare( |
| literal_str), |
| 0); |
| } |
| for (int i = 7; i <= 11; i++) { |
| ASSERT_EQ(table_result_set->get_value<int64_t>(i), |
| (row_num / points_per_device) % device_num); |
| } |
| ASSERT_EQ(table_result_set->get_value<int64_t>(1), |
| row_num % points_per_device); |
| row_num++; |
| } |
| ASSERT_EQ(row_num, std::min<int64_t>(points_per_device * device_num, |
| end_time + 1)); |
| reader.destroy_query_data_set(table_result_set); |
| delete[] literal; |
| ASSERT_EQ(reader.close(), common::E_OK); |
| delete table_schema; |
| delete tag_filter; |
| } |
| }; |
| |
| TEST_F(TsFileTableReaderTest, TableModelQuery) { test_table_model_query(); } |
| |
| TEST_F(TsFileTableReaderTest, TableModelQueryOneSmallPage) { |
| int prev_config = g_config_value_.page_writer_max_point_num_; |
| g_config_value_.page_writer_max_point_num_ = 5; |
| test_table_model_query(g_config_value_.page_writer_max_point_num_); |
| g_config_value_.page_writer_max_point_num_ = prev_config; |
| } |
| |
| TEST_F(TsFileTableReaderTest, TableModelQueryOneLargePage) { |
| int prev_config = g_config_value_.page_writer_max_point_num_; |
| g_config_value_.page_writer_max_point_num_ = 10000; |
| test_table_model_query(g_config_value_.page_writer_max_point_num_); |
| g_config_value_.page_writer_max_point_num_ = prev_config; |
| } |
| |
| TEST_F(TsFileTableReaderTest, TableModelQueryMultiLargePage) { |
| int prev_config = g_config_value_.page_writer_max_point_num_; |
| g_config_value_.page_writer_max_point_num_ = 10000; |
| test_table_model_query(1000000); |
| g_config_value_.page_writer_max_point_num_ = prev_config; |
| } |
| |
| TEST_F(TsFileTableReaderTest, TableModelQueryMultiDevices) { |
| int prev_config = g_config_value_.page_writer_max_point_num_; |
| g_config_value_.page_writer_max_point_num_ = 10000; |
| test_table_model_query(g_config_value_.page_writer_max_point_num_, 10); |
| g_config_value_.page_writer_max_point_num_ = prev_config; |
| } |
| |
| TEST_F(TsFileTableReaderTest, TableModelQueryWithTimeFilter) { |
| test_table_model_query(10, 1, 2); |
| } |
| |
| TEST_F(TsFileTableReaderTest, TableModelResultMetadata) { |
| auto table_schema = gen_table_schema(0); |
| auto tsfile_table_writer_ = |
| std::make_shared<TsFileTableWriter>(&write_file_, table_schema); |
| |
| auto tablet = gen_tablet(table_schema, 0, 1); |
| ASSERT_EQ(tsfile_table_writer_->write_table(tablet), common::E_OK); |
| ASSERT_EQ(tsfile_table_writer_->flush(), common::E_OK); |
| ASSERT_EQ(tsfile_table_writer_->close(), common::E_OK); |
| storage::TsFileReader reader; |
| int ret = reader.open(file_name_); |
| ASSERT_EQ(ret, common::E_OK); |
| |
| ResultSet* tmp_result_set = nullptr; |
| ret = reader.query(table_schema->get_table_name(), |
| table_schema->get_measurement_names(), 0, 1000000000000, |
| tmp_result_set); |
| auto* table_result_set = (TableResultSet*)tmp_result_set; |
| auto result_set_metadata = table_result_set->get_metadata(); |
| ASSERT_EQ(result_set_metadata->get_column_count(), 11); |
| ASSERT_EQ(result_set_metadata->get_column_name(1), "time"); |
| ASSERT_EQ(result_set_metadata->get_column_type(1), INT64); |
| for (int i = 2; i <= 6; i++) { |
| ASSERT_EQ(result_set_metadata->get_column_name(i), |
| "id" + std::to_string(i - 2)); |
| ASSERT_EQ(result_set_metadata->get_column_type(i), TSDataType::STRING); |
| } |
| for (int i = 7; i <= 11; i++) { |
| ASSERT_EQ(result_set_metadata->get_column_name(i), |
| "s" + std::to_string(i - 7)); |
| ASSERT_EQ(result_set_metadata->get_column_type(i), TSDataType::INT64); |
| } |
| reader.destroy_query_data_set(table_result_set); |
| ASSERT_EQ(reader.close(), common::E_OK); |
| delete table_schema; |
| } |
| |
| TEST_F(TsFileTableReaderTest, TableModelGetSchema) { |
| auto tmp_table_schema = gen_table_schema(0); |
| auto tsfile_table_writer_ = |
| std::make_shared<TsFileTableWriter>(&write_file_, tmp_table_schema); |
| auto tmp_tablet = gen_tablet(tmp_table_schema, 0, 1); |
| ASSERT_EQ(tsfile_table_writer_->write_table(tmp_tablet), common::E_OK); |
| for (int i = 1; i < 10; i++) { |
| auto table_schema = gen_table_schema(i); |
| auto tablet = gen_tablet(table_schema, 0, 1); |
| auto table_schema_ptr = std::shared_ptr<TableSchema>(table_schema); |
| tsfile_table_writer_->register_table(table_schema_ptr); |
| ASSERT_EQ(tsfile_table_writer_->write_table(tablet), common::E_OK); |
| } |
| ASSERT_EQ(tsfile_table_writer_->flush(), common::E_OK); |
| ASSERT_EQ(tsfile_table_writer_->close(), common::E_OK); |
| storage::TsFileReader reader; |
| int ret = reader.open(file_name_); |
| ASSERT_EQ(ret, common::E_OK); |
| |
| auto table_schemas = reader.get_all_table_schemas(); |
| std::sort(table_schemas.begin(), table_schemas.end(), |
| [](const std::shared_ptr<TableSchema>& a, |
| const std::shared_ptr<TableSchema>& b) { |
| return a->get_table_name() < b->get_table_name(); |
| }); // The table_schema returned is not guaranteed to be sorted |
| // by table_name |
| ASSERT_EQ(table_schemas.size(), 10); |
| for (int i = 0; i < 10; i++) { |
| ASSERT_EQ(table_schemas[i]->get_table_name(), |
| "testtable" + std::to_string(i)); |
| for (int j = 0; j < 5; j++) { |
| ASSERT_EQ(table_schemas[i]->get_data_types()[j], |
| TSDataType::STRING); |
| ASSERT_EQ(table_schemas[i]->get_column_categories()[j], |
| ColumnCategory::TAG); |
| } |
| for (int j = 5; j < 10; j++) { |
| ASSERT_EQ(table_schemas[i]->get_data_types()[j], TSDataType::INT64); |
| ASSERT_EQ(table_schemas[i]->get_column_categories()[j], |
| ColumnCategory::FIELD); |
| } |
| } |
| |
| auto table_schema = reader.get_table_schema("testtable0"); |
| ASSERT_EQ(table_schema->get_table_name(), "testtable0"); |
| for (int i = 0; i < 5; i++) { |
| ASSERT_EQ(table_schema->get_data_types()[i], TSDataType::STRING); |
| ASSERT_EQ(table_schema->get_column_categories()[i], |
| ColumnCategory::TAG); |
| } |
| for (int i = 5; i < 10; i++) { |
| ASSERT_EQ(table_schema->get_data_types()[i], TSDataType::INT64); |
| ASSERT_EQ(table_schema->get_column_categories()[i], |
| ColumnCategory::FIELD); |
| } |
| |
| ASSERT_EQ(reader.close(), common::E_OK); |
| delete tmp_table_schema; |
| } |
| |
| TEST_F(TsFileTableReaderTest, TableModelQueryWithMultiTabletsMultiFlush) { |
| auto tmp_table_schema = gen_table_schema(0); |
| auto tsfile_table_writer_ = |
| std::make_shared<TsFileTableWriter>(&write_file_, tmp_table_schema); |
| int max_rows = 100000; |
| int tablet_size = 10000; |
| int cur_row = 0; |
| for (; cur_row < max_rows;) { |
| if (cur_row + tablet_size > max_rows) { |
| tablet_size = max_rows - cur_row; |
| } |
| auto tablet = gen_tablet(tmp_table_schema, cur_row, 1, tablet_size); |
| ASSERT_EQ(tsfile_table_writer_->write_table(tablet), common::E_OK); |
| cur_row += tablet_size; |
| } |
| ASSERT_EQ(tsfile_table_writer_->flush(), common::E_OK); |
| ASSERT_EQ(tsfile_table_writer_->close(), common::E_OK); |
| common::init_config_value(); |
| storage::TsFileReader reader; |
| int ret = reader.open(file_name_); |
| ASSERT_EQ(ret, common::E_OK); |
| storage::ResultSet* tmp_result_set = nullptr; |
| ret = reader.query("testtable0", tmp_table_schema->get_measurement_names(), |
| 0, 1000000000000, tmp_result_set); |
| auto* table_result_set = (storage::TableResultSet*)tmp_result_set; |
| bool has_next = false; |
| char* literal = new char[std::strlen("device_id") + 1]; |
| std::strcpy(literal, "device_id"); |
| String literal_str(literal, std::strlen("device_id")); |
| while (IS_SUCC(table_result_set->next(has_next)) && has_next) { |
| for (int i = 0; i < 1; i++) { |
| auto column_schemas = tmp_table_schema->get_measurement_schemas(); |
| for (size_t j = 0; j < column_schemas.size(); j++) { |
| switch (column_schemas[j]->data_type_) { |
| case TSDataType::INT64: |
| ASSERT_EQ(table_result_set->get_value<int64_t>(j + 2), |
| i); |
| break; |
| case TSDataType::STRING: |
| ASSERT_EQ( |
| table_result_set->get_value<common::String*>(j + 2) |
| ->compare(literal_str), |
| 0); |
| break; |
| default: |
| break; |
| } |
| } |
| } |
| } |
| reader.destroy_query_data_set(table_result_set); |
| delete[] literal; |
| delete tmp_table_schema; |
| } |
| |
| TEST_F(TsFileTableReaderTest, ReadNonExistColumn) { |
| std::vector<MeasurementSchema*> measurement_schemas; |
| std::vector<ColumnCategory> column_categories; |
| measurement_schemas.resize(2); |
| measurement_schemas[0] = new MeasurementSchema("device", STRING); |
| measurement_schemas[1] = new MeasurementSchema("value", DOUBLE); |
| column_categories.emplace_back(ColumnCategory::TAG); |
| column_categories.emplace_back(ColumnCategory::FIELD); |
| TableSchema* table_schema = |
| new TableSchema("test_table", measurement_schemas, column_categories); |
| auto tsfile_table_writer = |
| std::make_shared<TsFileTableWriter>(&write_file_, table_schema); |
| Tablet tablet = Tablet(table_schema->get_measurement_names(), |
| table_schema->get_data_types()); |
| tablet.set_table_name("test_table"); |
| for (int i = 0; i < 100; i++) { |
| tablet.add_timestamp(i, static_cast<int64_t>(i)); |
| tablet.add_value(i, "device", |
| std::string("device" + std::to_string(i)).c_str()); |
| tablet.add_value(i, "value", i * 1.1); |
| } |
| tsfile_table_writer->write_table(tablet); |
| tsfile_table_writer->flush(); |
| tsfile_table_writer->close(); |
| |
| TsFileReader reader = TsFileReader(); |
| reader.open(write_file_.get_file_path()); |
| ResultSet* ret = nullptr; |
| std::vector<std::string> column_names = {"non-exist-column"}; |
| int ret_value = reader.query("test_table", column_names, 0, 50, ret); |
| ASSERT_NE(common::E_OK, ret_value); |
| ASSERT_EQ(ret, nullptr); |
| reader.close(); |
| delete table_schema; |
| } |
| |
| TEST_F(TsFileTableReaderTest, TestDecoder) { |
| std::vector<ColumnSchema> column_schema; |
| column_schema.emplace_back("value1", TSDataType::INT32); |
| auto* schema = new TableSchema("test_table", column_schema); |
| auto tsfile_table_writer_ = |
| std::make_shared<TsFileTableWriter>(&write_file_, schema); |
| std::vector<std::string> columns = {"value1"}; |
| std::vector<TSDataType> datatypes = {TSDataType::INT32}; |
| storage::Tablet tablet("test_table", &columns, &datatypes, 5000); |
| std::random_device rd; |
| std::mt19937 gen(rd()); |
| std::uniform_int_distribution<> dis(1, 200); |
| int64_t timestamp = 0; |
| for (int i = 0; i < 5000; i++) { |
| // Time will bigger than value after encoding and compression. |
| timestamp += dis(gen); |
| tablet.add_timestamp(i, timestamp); |
| tablet.add_value(i, 0, (int32_t)i); |
| } |
| int ret_ = tsfile_table_writer_->write_table(tablet); |
| ASSERT_EQ(ret_, common::E_OK); |
| ret_ = tsfile_table_writer_->flush(); |
| ASSERT_EQ(ret_, common::E_OK); |
| ret_ = tsfile_table_writer_->close(); |
| ASSERT_EQ(ret_, common::E_OK); |
| TsFileReader reader = TsFileReader(); |
| reader.open(write_file_.get_file_path()); |
| ResultSet* ret = nullptr; |
| int ret_value = |
| reader.query("test_table", columns, INT64_MIN, INT64_MAX, ret); |
| ASSERT_EQ(ret_value, E_OK); |
| auto* table_result_set = (storage::TableResultSet*)ret; |
| bool has_next = false; |
| int cur_lin = 0; |
| int64_t prev_time = 0; |
| while (IS_SUCC(table_result_set->next(has_next)) && has_next) { |
| auto t = table_result_set->get_value<int64_t>(1); |
| ASSERT_TRUE(t - prev_time <= 200); |
| prev_time = t; |
| auto value = table_result_set->get_value<int32_t>(2); |
| ASSERT_EQ(value, cur_lin); |
| cur_lin++; |
| } |
| ASSERT_EQ(cur_lin, 5000); |
| delete schema; |
| reader.destroy_query_data_set(table_result_set); |
| reader.close(); |
| } |
| |
| void test_null_table(WriteFile* write_file, int max_rows, |
| std::function<void(Tablet*, int)> insert_data_into_tablet, |
| std::function<void(TableResultSet*, int)> check) { |
| std::string table_name = "t1"; |
| auto* schema = new storage::TableSchema( |
| table_name, |
| { |
| common::ColumnSchema("id1", common::TSDataType::STRING, |
| common::CompressionType::UNCOMPRESSED, |
| common::TSEncoding::PLAIN, |
| common::ColumnCategory::TAG), |
| common::ColumnSchema("id2", common::TSDataType::STRING, |
| common::CompressionType::UNCOMPRESSED, |
| common::TSEncoding::PLAIN, |
| common::ColumnCategory::TAG), |
| common::ColumnSchema("s1", common::TSDataType::INT64, |
| common::CompressionType::UNCOMPRESSED, |
| common::TSEncoding::PLAIN, |
| common::ColumnCategory::FIELD), |
| common::ColumnSchema("s2", common::TSDataType::INT32, |
| common::CompressionType::UNCOMPRESSED, |
| common::TSEncoding::PLAIN, |
| common::ColumnCategory::FIELD), |
| common::ColumnSchema("s3", common::TSDataType::FLOAT, |
| common::CompressionType::UNCOMPRESSED, |
| common::TSEncoding::PLAIN, |
| common::ColumnCategory::FIELD), |
| common::ColumnSchema("s4", common::TSDataType::DOUBLE, |
| common::CompressionType::UNCOMPRESSED, |
| common::TSEncoding::PLAIN, |
| common::ColumnCategory::FIELD), |
| common::ColumnSchema("s5", common::TSDataType::STRING, |
| common::CompressionType::UNCOMPRESSED, |
| common::TSEncoding::PLAIN, |
| common::ColumnCategory::FIELD), |
| }); |
| uint64_t memory_threshold = 128 * 1024 * 1024; |
| auto* writer = |
| new storage::TsFileTableWriter(write_file, schema, memory_threshold); |
| storage::Tablet tablet( |
| { |
| "id1", |
| "id2", |
| "s1", |
| "s2", |
| "s3", |
| "s4", |
| "s5", |
| }, |
| { |
| common::TSDataType::STRING, |
| common::TSDataType::STRING, |
| common::TSDataType::INT64, |
| common::TSDataType::INT32, |
| common::TSDataType::FLOAT, |
| common::TSDataType::DOUBLE, |
| common::TSDataType::STRING, |
| }, |
| max_rows); |
| insert_data_into_tablet(&tablet, max_rows); |
| writer->write_table(tablet); |
| writer->flush(); |
| writer->close(); |
| delete writer; |
| delete schema; |
| storage::TsFileReader reader; |
| reader.open(write_file->get_file_path()); |
| std::vector<std::string> columns; |
| std::int64_t start_time = INT64_MIN; |
| std::int64_t end_time = INT64_MAX; |
| storage::ResultSet* temp_ret = nullptr; |
| reader.query(table_name, {"id1", "id2", "s1", "s2", "s3", "s4", "s5"}, |
| start_time, end_time, temp_ret); |
| auto ret = dynamic_cast<storage::TableResultSet*>(temp_ret); |
| std::cout << std::endl; |
| check(ret, max_rows); |
| ret->close(); |
| reader.destroy_query_data_set(ret); |
| reader.close(); |
| } |
| |
| TEST_F(TsFileTableReaderTest, TestNullInTable) { |
| // 1. In some rows, all FIELD columns are empty. |
| test_null_table( |
| &write_file_, 10, |
| [](Tablet* tablet, int max_rows) { |
| for (int row = 0; row < max_rows; row++) { |
| int64_t timestamp = row; |
| tablet->add_timestamp(row, timestamp); |
| tablet->add_value(row, "id1", "id1"); |
| tablet->add_value(row, "id2", "id2"); |
| if (row % 2 == 0) { |
| tablet->add_value(row, "s1", static_cast<int64_t>(row)); |
| tablet->add_value(row, "s2", 1); |
| tablet->add_value(row, "s3", 1.1f); |
| tablet->add_value(row, "s4", 1.2); |
| tablet->add_value(row, "s5", "test"); |
| } |
| } |
| }, |
| [](TableResultSet* result, int max_rows) { |
| bool has_next = false; |
| int line = 0; |
| while ((result->next(has_next)) == common::E_OK && has_next) { |
| line++; |
| if (result->get_value<int64_t>(1) % 2 != 0) { |
| ASSERT_TRUE(result->is_null("s1")); |
| ASSERT_TRUE(result->is_null("s2")); |
| ASSERT_TRUE(result->is_null("s3")); |
| ASSERT_TRUE(result->is_null("s4")); |
| ASSERT_TRUE(result->is_null("s5")); |
| } |
| ASSERT_FALSE(result->is_null("id1")); |
| ASSERT_FALSE(result->is_null("id2")); |
| } |
| ASSERT_EQ(line, max_rows); |
| }); |
| } |
| |
| TEST_F(TsFileTableReaderTest, TestNullInTable2) { |
| // 2. In some rows, the TAG column is entirely empty, |
| // and in some rows, all FIELD columns are empty. |
| test_null_table( |
| &write_file_, 10, |
| [](Tablet* tablet, int max_rows) { |
| for (int row = 0; row < max_rows; row++) { |
| int64_t timestamp = row; |
| tablet->add_timestamp(row, timestamp); |
| if (row % 2 == 0) { |
| tablet->add_value(row, "id1", "id1"); |
| tablet->add_value(row, "id2", "id2"); |
| } else { |
| tablet->add_value(row, "s1", static_cast<int64_t>(row)); |
| tablet->add_value(row, "s2", 1); |
| tablet->add_value(row, "s3", 1.1f); |
| tablet->add_value(row, "s4", 1.2); |
| tablet->add_value(row, "s5", "test"); |
| } |
| } |
| }, |
| [](TableResultSet* result, int max_rows) { |
| bool has_next = false; |
| int line = 0; |
| while ((result->next(has_next)) == common::E_OK && has_next) { |
| line++; |
| bool even = result->get_value<int64_t>(1) % 2 == 0; |
| ASSERT_EQ(result->is_null("s1"), even); |
| ASSERT_EQ(result->is_null("s2"), even); |
| ASSERT_EQ(result->is_null("s3"), even); |
| ASSERT_EQ(result->is_null("s4"), even); |
| ASSERT_EQ(result->is_null("s5"), even); |
| ASSERT_EQ(result->is_null("id1"), !even); |
| ASSERT_EQ(result->is_null("id2"), !even); |
| } |
| ASSERT_EQ(line, max_rows); |
| }); |
| } |
| |
| TEST_F(TsFileTableReaderTest, TestNullInTable3) { |
| // 3. In some rows, the TAG and Field columns are entirely empty, |
| test_null_table( |
| &write_file_, 10, |
| [](Tablet* tablet, int max_rows) { |
| for (int row = 0; row < max_rows; row++) { |
| int64_t timestamp = row; |
| tablet->add_timestamp(row, timestamp); |
| if (row % 2 == 0) { |
| tablet->add_value(row, "id1", "id1"); |
| tablet->add_value(row, "id2", "id2"); |
| tablet->add_value(row, "s1", static_cast<int64_t>(row)); |
| tablet->add_value(row, "s2", 1); |
| tablet->add_value(row, "s3", 1.1f); |
| tablet->add_value(row, "s4", 1.2); |
| tablet->add_value(row, "s5", "test"); |
| } |
| } |
| }, |
| [](TableResultSet* result, int max_rows) { |
| bool has_next = false; |
| int line = 0; |
| while ((result->next(has_next)) == common::E_OK && has_next) { |
| line++; |
| bool odd = result->get_value<int64_t>(1) % 2 != 0; |
| ASSERT_EQ(result->is_null("s1"), odd); |
| ASSERT_EQ(result->is_null("s2"), odd); |
| ASSERT_EQ(result->is_null("s3"), odd); |
| ASSERT_EQ(result->is_null("s4"), odd); |
| ASSERT_EQ(result->is_null("s5"), odd); |
| ASSERT_EQ(result->is_null("id1"), odd); |
| ASSERT_EQ(result->is_null("id2"), odd); |
| } |
| ASSERT_EQ(line, max_rows); |
| }); |
| } |
| |
| TEST_F(TsFileTableReaderTest, TestNullInTable4) { |
| // 3. In some rows, the TAG and Field columns are entirely empty, |
| test_null_table( |
| &write_file_, 1000000, |
| [](Tablet* tablet, int max_rows) { |
| for (int row = 0; row < max_rows; row++) { |
| int64_t timestamp = row; |
| tablet->add_timestamp(row, timestamp); |
| tablet->add_value(row, "id1", "id1"); |
| tablet->add_value(row, "id2", "id2"); |
| if (row < 10) { |
| tablet->add_value(row, "s1", static_cast<int64_t>(row)); |
| tablet->add_value(row, "s2", 1); |
| tablet->add_value(row, "s3", 1.1f); |
| tablet->add_value(row, "s4", 1.2); |
| tablet->add_value(row, "s5", "test"); |
| } |
| } |
| }, |
| [](TableResultSet* result, int max_rows) { |
| bool has_next = false; |
| int line = 0; |
| while ((result->next(has_next)) == common::E_OK && has_next) { |
| line++; |
| bool available = result->get_value<int64_t>(1) < 10; |
| ASSERT_EQ(!result->is_null("s1"), available); |
| ASSERT_EQ(!result->is_null("s2"), available); |
| ASSERT_EQ(!result->is_null("s3"), available); |
| ASSERT_EQ(!result->is_null("s4"), available); |
| ASSERT_EQ(!result->is_null("s5"), available); |
| } |
| ASSERT_EQ(line, max_rows); |
| }); |
| } |