blob: c281de413dfca095867cff4efb3b220cb98a4595 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License a
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <gtest/gtest.h>
#include <random>
#include "common/record.h"
#include "common/schema.h"
#include "common/tablet.h"
#include "file/tsfile_io_writer.h"
#include "file/write_file.h"
#include "reader/filter/tag_filter.h"
#include "reader/table_result_set.h"
#include "reader/tsfile_reader.h"
#include "writer/chunk_writer.h"
#include "writer/tsfile_table_writer.h"
using namespace storage;
using namespace common;
class TsFileTableReaderTest : public ::testing::Test {
protected:
void SetUp() override {
libtsfile_init();
file_name_ = std::string("tsfile_writer_table_test_") +
generate_random_string(10) + std::string(".tsfile");
remove(file_name_.c_str());
int flags = O_WRONLY | O_CREAT | O_TRUNC;
#ifdef _WIN32
flags |= O_BINARY;
#endif
mode_t mode = 0666;
write_file_.create(file_name_, flags, mode);
}
void TearDown() override {
remove(file_name_.c_str());
libtsfile_destroy();
}
std::string file_name_;
WriteFile write_file_;
public:
static std::string generate_random_string(int length) {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(0, 61);
const std::string chars =
"0123456789"
"abcdefghijklmnopqrstuvwxyz"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
std::string random_string;
for (int i = 0; i < length; ++i) {
random_string += chars[dis(gen)];
}
return random_string;
}
static TableSchema* gen_table_schema(int table_num) {
std::vector<MeasurementSchema*> measurement_schemas;
std::vector<ColumnCategory> column_categories;
int id_schema_num = 5;
int measurement_schema_num = 5;
for (int i = 0; i < id_schema_num; i++) {
measurement_schemas.emplace_back(new MeasurementSchema(
"id" + std::to_string(i), TSDataType::STRING, TSEncoding::PLAIN,
CompressionType::UNCOMPRESSED));
column_categories.emplace_back(ColumnCategory::TAG);
}
for (int i = 0; i < measurement_schema_num; i++) {
measurement_schemas.emplace_back(new MeasurementSchema(
"s" + std::to_string(i), TSDataType::INT64, TSEncoding::PLAIN,
CompressionType::UNCOMPRESSED));
column_categories.emplace_back(ColumnCategory::FIELD);
}
return new TableSchema("testTable" + std::to_string(table_num),
measurement_schemas, column_categories);
}
static storage::Tablet gen_tablet(TableSchema* table_schema, int offset,
int device_num,
int num_timestamp_per_device = 10) {
storage::Tablet tablet(table_schema->get_table_name(),
table_schema->get_measurement_names(),
table_schema->get_data_types(),
table_schema->get_column_categories(),
device_num * num_timestamp_per_device);
char* literal = new char[std::strlen("device_id") + 1];
std::strcpy(literal, "device_id");
String literal_str(literal, std::strlen("device_id"));
for (int i = 0; i < device_num; i++) {
for (int l = 0; l < num_timestamp_per_device; l++) {
int row_index = i * num_timestamp_per_device + l;
tablet.add_timestamp(row_index, offset + l);
auto column_schemas = table_schema->get_measurement_schemas();
for (const auto& column_schema : column_schemas) {
switch (column_schema->data_type_) {
case TSDataType::INT64:
tablet.add_value(row_index,
column_schema->measurement_name_,
static_cast<int64_t>(i));
break;
case TSDataType::STRING:
tablet.add_value(row_index,
column_schema->measurement_name_,
literal_str);
break;
default:
break;
}
}
}
}
delete[] literal;
return tablet;
}
void test_table_model_query(uint32_t points_per_device = 10,
uint32_t device_num = 1,
int64_t end_time = 1000000000000) {
auto table_schema = gen_table_schema(0);
auto tsfile_table_writer_ =
std::make_shared<TsFileTableWriter>(&write_file_, table_schema);
auto tablet =
gen_tablet(table_schema, 0, device_num, points_per_device);
ASSERT_EQ(tsfile_table_writer_->write_table(tablet), common::E_OK);
ASSERT_EQ(tsfile_table_writer_->flush(), common::E_OK);
ASSERT_EQ(tsfile_table_writer_->close(), common::E_OK);
storage::TsFileReader reader;
int ret = reader.open(file_name_);
ASSERT_EQ(ret, common::E_OK);
ResultSet* tmp_result_set = nullptr;
Filter* tag_filter =
TagFilterBuilder(table_schema).eq("id0", "device_id");
ret = reader.query(table_schema->get_table_name(),
table_schema->get_measurement_names(), 0, end_time,
tmp_result_set, tag_filter);
auto* table_result_set = (TableResultSet*)tmp_result_set;
char* literal = new char[std::strlen("device_id") + 1];
std::strcpy(literal, "device_id");
String literal_str(literal, std::strlen("device_id"));
bool has_next = false;
int64_t row_num = 0;
while (IS_SUCC(table_result_set->next(has_next)) && has_next) {
auto column_schemas = table_schema->get_measurement_schemas();
for (const auto& column_schema : column_schemas) {
switch (column_schema->data_type_) {
case TSDataType::INT64:
ASSERT_EQ(table_result_set->get_value<int64_t>(
column_schema->measurement_name_),
(row_num / points_per_device) % device_num);
break;
case TSDataType::STRING:
ASSERT_EQ(table_result_set
->get_value<common::String*>(
column_schema->measurement_name_)
->compare(literal_str),
0);
break;
default:
break;
}
}
for (int i = 2; i <= 6; i++) {
ASSERT_EQ(
table_result_set->get_value<common::String*>(i)->compare(
literal_str),
0);
}
for (int i = 7; i <= 11; i++) {
ASSERT_EQ(table_result_set->get_value<int64_t>(i),
(row_num / points_per_device) % device_num);
}
ASSERT_EQ(table_result_set->get_value<int64_t>(1),
row_num % points_per_device);
row_num++;
}
ASSERT_EQ(row_num, std::min<int64_t>(points_per_device * device_num,
end_time + 1));
reader.destroy_query_data_set(table_result_set);
delete[] literal;
ASSERT_EQ(reader.close(), common::E_OK);
delete table_schema;
delete tag_filter;
}
};
TEST_F(TsFileTableReaderTest, TableModelQuery) { test_table_model_query(); }
TEST_F(TsFileTableReaderTest, TableModelQueryOneSmallPage) {
int prev_config = g_config_value_.page_writer_max_point_num_;
g_config_value_.page_writer_max_point_num_ = 5;
test_table_model_query(g_config_value_.page_writer_max_point_num_);
g_config_value_.page_writer_max_point_num_ = prev_config;
}
TEST_F(TsFileTableReaderTest, TableModelQueryOneLargePage) {
int prev_config = g_config_value_.page_writer_max_point_num_;
g_config_value_.page_writer_max_point_num_ = 10000;
test_table_model_query(g_config_value_.page_writer_max_point_num_);
g_config_value_.page_writer_max_point_num_ = prev_config;
}
TEST_F(TsFileTableReaderTest, TableModelQueryMultiLargePage) {
int prev_config = g_config_value_.page_writer_max_point_num_;
g_config_value_.page_writer_max_point_num_ = 10000;
test_table_model_query(1000000);
g_config_value_.page_writer_max_point_num_ = prev_config;
}
TEST_F(TsFileTableReaderTest, TableModelQueryMultiDevices) {
int prev_config = g_config_value_.page_writer_max_point_num_;
g_config_value_.page_writer_max_point_num_ = 10000;
test_table_model_query(g_config_value_.page_writer_max_point_num_, 10);
g_config_value_.page_writer_max_point_num_ = prev_config;
}
TEST_F(TsFileTableReaderTest, TableModelQueryWithTimeFilter) {
test_table_model_query(10, 1, 2);
}
TEST_F(TsFileTableReaderTest, TableModelResultMetadata) {
auto table_schema = gen_table_schema(0);
auto tsfile_table_writer_ =
std::make_shared<TsFileTableWriter>(&write_file_, table_schema);
auto tablet = gen_tablet(table_schema, 0, 1);
ASSERT_EQ(tsfile_table_writer_->write_table(tablet), common::E_OK);
ASSERT_EQ(tsfile_table_writer_->flush(), common::E_OK);
ASSERT_EQ(tsfile_table_writer_->close(), common::E_OK);
storage::TsFileReader reader;
int ret = reader.open(file_name_);
ASSERT_EQ(ret, common::E_OK);
ResultSet* tmp_result_set = nullptr;
ret = reader.query(table_schema->get_table_name(),
table_schema->get_measurement_names(), 0, 1000000000000,
tmp_result_set);
auto* table_result_set = (TableResultSet*)tmp_result_set;
auto result_set_metadata = table_result_set->get_metadata();
ASSERT_EQ(result_set_metadata->get_column_count(), 11);
ASSERT_EQ(result_set_metadata->get_column_name(1), "time");
ASSERT_EQ(result_set_metadata->get_column_type(1), INT64);
for (int i = 2; i <= 6; i++) {
ASSERT_EQ(result_set_metadata->get_column_name(i),
"id" + std::to_string(i - 2));
ASSERT_EQ(result_set_metadata->get_column_type(i), TSDataType::STRING);
}
for (int i = 7; i <= 11; i++) {
ASSERT_EQ(result_set_metadata->get_column_name(i),
"s" + std::to_string(i - 7));
ASSERT_EQ(result_set_metadata->get_column_type(i), TSDataType::INT64);
}
reader.destroy_query_data_set(table_result_set);
ASSERT_EQ(reader.close(), common::E_OK);
delete table_schema;
}
TEST_F(TsFileTableReaderTest, TableModelGetSchema) {
auto tmp_table_schema = gen_table_schema(0);
auto tsfile_table_writer_ =
std::make_shared<TsFileTableWriter>(&write_file_, tmp_table_schema);
auto tmp_tablet = gen_tablet(tmp_table_schema, 0, 1);
ASSERT_EQ(tsfile_table_writer_->write_table(tmp_tablet), common::E_OK);
for (int i = 1; i < 10; i++) {
auto table_schema = gen_table_schema(i);
auto tablet = gen_tablet(table_schema, 0, 1);
auto table_schema_ptr = std::shared_ptr<TableSchema>(table_schema);
tsfile_table_writer_->register_table(table_schema_ptr);
ASSERT_EQ(tsfile_table_writer_->write_table(tablet), common::E_OK);
}
ASSERT_EQ(tsfile_table_writer_->flush(), common::E_OK);
ASSERT_EQ(tsfile_table_writer_->close(), common::E_OK);
storage::TsFileReader reader;
int ret = reader.open(file_name_);
ASSERT_EQ(ret, common::E_OK);
auto table_schemas = reader.get_all_table_schemas();
std::sort(table_schemas.begin(), table_schemas.end(),
[](const std::shared_ptr<TableSchema>& a,
const std::shared_ptr<TableSchema>& b) {
return a->get_table_name() < b->get_table_name();
}); // The table_schema returned is not guaranteed to be sorted
// by table_name
ASSERT_EQ(table_schemas.size(), 10);
for (int i = 0; i < 10; i++) {
ASSERT_EQ(table_schemas[i]->get_table_name(),
"testtable" + std::to_string(i));
for (int j = 0; j < 5; j++) {
ASSERT_EQ(table_schemas[i]->get_data_types()[j],
TSDataType::STRING);
ASSERT_EQ(table_schemas[i]->get_column_categories()[j],
ColumnCategory::TAG);
}
for (int j = 5; j < 10; j++) {
ASSERT_EQ(table_schemas[i]->get_data_types()[j], TSDataType::INT64);
ASSERT_EQ(table_schemas[i]->get_column_categories()[j],
ColumnCategory::FIELD);
}
}
auto table_schema = reader.get_table_schema("testtable0");
ASSERT_EQ(table_schema->get_table_name(), "testtable0");
for (int i = 0; i < 5; i++) {
ASSERT_EQ(table_schema->get_data_types()[i], TSDataType::STRING);
ASSERT_EQ(table_schema->get_column_categories()[i],
ColumnCategory::TAG);
}
for (int i = 5; i < 10; i++) {
ASSERT_EQ(table_schema->get_data_types()[i], TSDataType::INT64);
ASSERT_EQ(table_schema->get_column_categories()[i],
ColumnCategory::FIELD);
}
ASSERT_EQ(reader.close(), common::E_OK);
delete tmp_table_schema;
}
TEST_F(TsFileTableReaderTest, TableModelQueryWithMultiTabletsMultiFlush) {
auto tmp_table_schema = gen_table_schema(0);
auto tsfile_table_writer_ =
std::make_shared<TsFileTableWriter>(&write_file_, tmp_table_schema);
int max_rows = 100000;
int tablet_size = 10000;
int cur_row = 0;
for (; cur_row < max_rows;) {
if (cur_row + tablet_size > max_rows) {
tablet_size = max_rows - cur_row;
}
auto tablet = gen_tablet(tmp_table_schema, cur_row, 1, tablet_size);
ASSERT_EQ(tsfile_table_writer_->write_table(tablet), common::E_OK);
cur_row += tablet_size;
}
ASSERT_EQ(tsfile_table_writer_->flush(), common::E_OK);
ASSERT_EQ(tsfile_table_writer_->close(), common::E_OK);
common::init_config_value();
storage::TsFileReader reader;
int ret = reader.open(file_name_);
ASSERT_EQ(ret, common::E_OK);
storage::ResultSet* tmp_result_set = nullptr;
ret = reader.query("testtable0", tmp_table_schema->get_measurement_names(),
0, 1000000000000, tmp_result_set);
auto* table_result_set = (storage::TableResultSet*)tmp_result_set;
bool has_next = false;
char* literal = new char[std::strlen("device_id") + 1];
std::strcpy(literal, "device_id");
String literal_str(literal, std::strlen("device_id"));
while (IS_SUCC(table_result_set->next(has_next)) && has_next) {
for (int i = 0; i < 1; i++) {
auto column_schemas = tmp_table_schema->get_measurement_schemas();
for (size_t j = 0; j < column_schemas.size(); j++) {
switch (column_schemas[j]->data_type_) {
case TSDataType::INT64:
ASSERT_EQ(table_result_set->get_value<int64_t>(j + 2),
i);
break;
case TSDataType::STRING:
ASSERT_EQ(
table_result_set->get_value<common::String*>(j + 2)
->compare(literal_str),
0);
break;
default:
break;
}
}
}
}
reader.destroy_query_data_set(table_result_set);
delete[] literal;
delete tmp_table_schema;
}
TEST_F(TsFileTableReaderTest, ReadNonExistColumn) {
std::vector<MeasurementSchema*> measurement_schemas;
std::vector<ColumnCategory> column_categories;
measurement_schemas.resize(2);
measurement_schemas[0] = new MeasurementSchema("device", STRING);
measurement_schemas[1] = new MeasurementSchema("value", DOUBLE);
column_categories.emplace_back(ColumnCategory::TAG);
column_categories.emplace_back(ColumnCategory::FIELD);
TableSchema* table_schema =
new TableSchema("test_table", measurement_schemas, column_categories);
auto tsfile_table_writer =
std::make_shared<TsFileTableWriter>(&write_file_, table_schema);
Tablet tablet = Tablet(table_schema->get_measurement_names(),
table_schema->get_data_types());
tablet.set_table_name("test_table");
for (int i = 0; i < 100; i++) {
tablet.add_timestamp(i, static_cast<int64_t>(i));
tablet.add_value(i, "device",
std::string("device" + std::to_string(i)).c_str());
tablet.add_value(i, "value", i * 1.1);
}
tsfile_table_writer->write_table(tablet);
tsfile_table_writer->flush();
tsfile_table_writer->close();
TsFileReader reader = TsFileReader();
reader.open(write_file_.get_file_path());
ResultSet* ret = nullptr;
std::vector<std::string> column_names = {"non-exist-column"};
int ret_value = reader.query("test_table", column_names, 0, 50, ret);
ASSERT_NE(common::E_OK, ret_value);
ASSERT_EQ(ret, nullptr);
reader.close();
delete table_schema;
}
TEST_F(TsFileTableReaderTest, TestDecoder) {
std::vector<ColumnSchema> column_schema;
column_schema.emplace_back("value1", TSDataType::INT32);
auto* schema = new TableSchema("test_table", column_schema);
auto tsfile_table_writer_ =
std::make_shared<TsFileTableWriter>(&write_file_, schema);
std::vector<std::string> columns = {"value1"};
std::vector<TSDataType> datatypes = {TSDataType::INT32};
storage::Tablet tablet("test_table", &columns, &datatypes, 5000);
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(1, 200);
int64_t timestamp = 0;
for (int i = 0; i < 5000; i++) {
// Time will bigger than value after encoding and compression.
timestamp += dis(gen);
tablet.add_timestamp(i, timestamp);
tablet.add_value(i, 0, (int32_t)i);
}
int ret_ = tsfile_table_writer_->write_table(tablet);
ASSERT_EQ(ret_, common::E_OK);
ret_ = tsfile_table_writer_->flush();
ASSERT_EQ(ret_, common::E_OK);
ret_ = tsfile_table_writer_->close();
ASSERT_EQ(ret_, common::E_OK);
TsFileReader reader = TsFileReader();
reader.open(write_file_.get_file_path());
ResultSet* ret = nullptr;
int ret_value =
reader.query("test_table", columns, INT64_MIN, INT64_MAX, ret);
ASSERT_EQ(ret_value, E_OK);
auto* table_result_set = (storage::TableResultSet*)ret;
bool has_next = false;
int cur_lin = 0;
int64_t prev_time = 0;
while (IS_SUCC(table_result_set->next(has_next)) && has_next) {
auto t = table_result_set->get_value<int64_t>(1);
ASSERT_TRUE(t - prev_time <= 200);
prev_time = t;
auto value = table_result_set->get_value<int32_t>(2);
ASSERT_EQ(value, cur_lin);
cur_lin++;
}
ASSERT_EQ(cur_lin, 5000);
delete schema;
reader.destroy_query_data_set(table_result_set);
reader.close();
}
void test_null_table(WriteFile* write_file, int max_rows,
std::function<void(Tablet*, int)> insert_data_into_tablet,
std::function<void(TableResultSet*, int)> check) {
std::string table_name = "t1";
auto* schema = new storage::TableSchema(
table_name,
{
common::ColumnSchema("id1", common::TSDataType::STRING,
common::CompressionType::UNCOMPRESSED,
common::TSEncoding::PLAIN,
common::ColumnCategory::TAG),
common::ColumnSchema("id2", common::TSDataType::STRING,
common::CompressionType::UNCOMPRESSED,
common::TSEncoding::PLAIN,
common::ColumnCategory::TAG),
common::ColumnSchema("s1", common::TSDataType::INT64,
common::CompressionType::UNCOMPRESSED,
common::TSEncoding::PLAIN,
common::ColumnCategory::FIELD),
common::ColumnSchema("s2", common::TSDataType::INT32,
common::CompressionType::UNCOMPRESSED,
common::TSEncoding::PLAIN,
common::ColumnCategory::FIELD),
common::ColumnSchema("s3", common::TSDataType::FLOAT,
common::CompressionType::UNCOMPRESSED,
common::TSEncoding::PLAIN,
common::ColumnCategory::FIELD),
common::ColumnSchema("s4", common::TSDataType::DOUBLE,
common::CompressionType::UNCOMPRESSED,
common::TSEncoding::PLAIN,
common::ColumnCategory::FIELD),
common::ColumnSchema("s5", common::TSDataType::STRING,
common::CompressionType::UNCOMPRESSED,
common::TSEncoding::PLAIN,
common::ColumnCategory::FIELD),
});
uint64_t memory_threshold = 128 * 1024 * 1024;
auto* writer =
new storage::TsFileTableWriter(write_file, schema, memory_threshold);
storage::Tablet tablet(
{
"id1",
"id2",
"s1",
"s2",
"s3",
"s4",
"s5",
},
{
common::TSDataType::STRING,
common::TSDataType::STRING,
common::TSDataType::INT64,
common::TSDataType::INT32,
common::TSDataType::FLOAT,
common::TSDataType::DOUBLE,
common::TSDataType::STRING,
},
max_rows);
insert_data_into_tablet(&tablet, max_rows);
writer->write_table(tablet);
writer->flush();
writer->close();
delete writer;
delete schema;
storage::TsFileReader reader;
reader.open(write_file->get_file_path());
std::vector<std::string> columns;
std::int64_t start_time = INT64_MIN;
std::int64_t end_time = INT64_MAX;
storage::ResultSet* temp_ret = nullptr;
reader.query(table_name, {"id1", "id2", "s1", "s2", "s3", "s4", "s5"},
start_time, end_time, temp_ret);
auto ret = dynamic_cast<storage::TableResultSet*>(temp_ret);
std::cout << std::endl;
check(ret, max_rows);
ret->close();
reader.destroy_query_data_set(ret);
reader.close();
}
TEST_F(TsFileTableReaderTest, TestNullInTable) {
// 1. In some rows, all FIELD columns are empty.
test_null_table(
&write_file_, 10,
[](Tablet* tablet, int max_rows) {
for (int row = 0; row < max_rows; row++) {
int64_t timestamp = row;
tablet->add_timestamp(row, timestamp);
tablet->add_value(row, "id1", "id1");
tablet->add_value(row, "id2", "id2");
if (row % 2 == 0) {
tablet->add_value(row, "s1", static_cast<int64_t>(row));
tablet->add_value(row, "s2", 1);
tablet->add_value(row, "s3", 1.1f);
tablet->add_value(row, "s4", 1.2);
tablet->add_value(row, "s5", "test");
}
}
},
[](TableResultSet* result, int max_rows) {
bool has_next = false;
int line = 0;
while ((result->next(has_next)) == common::E_OK && has_next) {
line++;
if (result->get_value<int64_t>(1) % 2 != 0) {
ASSERT_TRUE(result->is_null("s1"));
ASSERT_TRUE(result->is_null("s2"));
ASSERT_TRUE(result->is_null("s3"));
ASSERT_TRUE(result->is_null("s4"));
ASSERT_TRUE(result->is_null("s5"));
}
ASSERT_FALSE(result->is_null("id1"));
ASSERT_FALSE(result->is_null("id2"));
}
ASSERT_EQ(line, max_rows);
});
}
TEST_F(TsFileTableReaderTest, TestNullInTable2) {
// 2. In some rows, the TAG column is entirely empty,
// and in some rows, all FIELD columns are empty.
test_null_table(
&write_file_, 10,
[](Tablet* tablet, int max_rows) {
for (int row = 0; row < max_rows; row++) {
int64_t timestamp = row;
tablet->add_timestamp(row, timestamp);
if (row % 2 == 0) {
tablet->add_value(row, "id1", "id1");
tablet->add_value(row, "id2", "id2");
} else {
tablet->add_value(row, "s1", static_cast<int64_t>(row));
tablet->add_value(row, "s2", 1);
tablet->add_value(row, "s3", 1.1f);
tablet->add_value(row, "s4", 1.2);
tablet->add_value(row, "s5", "test");
}
}
},
[](TableResultSet* result, int max_rows) {
bool has_next = false;
int line = 0;
while ((result->next(has_next)) == common::E_OK && has_next) {
line++;
bool even = result->get_value<int64_t>(1) % 2 == 0;
ASSERT_EQ(result->is_null("s1"), even);
ASSERT_EQ(result->is_null("s2"), even);
ASSERT_EQ(result->is_null("s3"), even);
ASSERT_EQ(result->is_null("s4"), even);
ASSERT_EQ(result->is_null("s5"), even);
ASSERT_EQ(result->is_null("id1"), !even);
ASSERT_EQ(result->is_null("id2"), !even);
}
ASSERT_EQ(line, max_rows);
});
}
TEST_F(TsFileTableReaderTest, TestNullInTable3) {
// 3. In some rows, the TAG and Field columns are entirely empty,
test_null_table(
&write_file_, 10,
[](Tablet* tablet, int max_rows) {
for (int row = 0; row < max_rows; row++) {
int64_t timestamp = row;
tablet->add_timestamp(row, timestamp);
if (row % 2 == 0) {
tablet->add_value(row, "id1", "id1");
tablet->add_value(row, "id2", "id2");
tablet->add_value(row, "s1", static_cast<int64_t>(row));
tablet->add_value(row, "s2", 1);
tablet->add_value(row, "s3", 1.1f);
tablet->add_value(row, "s4", 1.2);
tablet->add_value(row, "s5", "test");
}
}
},
[](TableResultSet* result, int max_rows) {
bool has_next = false;
int line = 0;
while ((result->next(has_next)) == common::E_OK && has_next) {
line++;
bool odd = result->get_value<int64_t>(1) % 2 != 0;
ASSERT_EQ(result->is_null("s1"), odd);
ASSERT_EQ(result->is_null("s2"), odd);
ASSERT_EQ(result->is_null("s3"), odd);
ASSERT_EQ(result->is_null("s4"), odd);
ASSERT_EQ(result->is_null("s5"), odd);
ASSERT_EQ(result->is_null("id1"), odd);
ASSERT_EQ(result->is_null("id2"), odd);
}
ASSERT_EQ(line, max_rows);
});
}
TEST_F(TsFileTableReaderTest, TestNullInTable4) {
// 3. In some rows, the TAG and Field columns are entirely empty,
test_null_table(
&write_file_, 1000000,
[](Tablet* tablet, int max_rows) {
for (int row = 0; row < max_rows; row++) {
int64_t timestamp = row;
tablet->add_timestamp(row, timestamp);
tablet->add_value(row, "id1", "id1");
tablet->add_value(row, "id2", "id2");
if (row < 10) {
tablet->add_value(row, "s1", static_cast<int64_t>(row));
tablet->add_value(row, "s2", 1);
tablet->add_value(row, "s3", 1.1f);
tablet->add_value(row, "s4", 1.2);
tablet->add_value(row, "s5", "test");
}
}
},
[](TableResultSet* result, int max_rows) {
bool has_next = false;
int line = 0;
while ((result->next(has_next)) == common::E_OK && has_next) {
line++;
bool available = result->get_value<int64_t>(1) < 10;
ASSERT_EQ(!result->is_null("s1"), available);
ASSERT_EQ(!result->is_null("s2"), available);
ASSERT_EQ(!result->is_null("s3"), available);
ASSERT_EQ(!result->is_null("s4"), available);
ASSERT_EQ(!result->is_null("s5"), available);
}
ASSERT_EQ(line, max_rows);
});
}