blob: be8be1f0407e74f7d9ffd6fb3df465a0c775e0c5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License a
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "writer/tsfile_writer.h"
#include <gtest/gtest.h>
#include <random>
#include "common/path.h"
#include "common/record.h"
#include "common/schema.h"
#include "common/tablet.h"
#include "file/tsfile_io_writer.h"
#include "file/write_file.h"
#include "reader/qds_without_timegenerator.h"
#include "reader/tsfile_reader.h"
#include "writer/chunk_writer.h"
using namespace storage;
using namespace common;
class TsFileWriterTest : public ::testing::Test {
protected:
void SetUp() override {
libtsfile_init();
tsfile_writer_ = new TsFileWriter();
file_name_ = std::string("tsfile_writer_test_") +
generate_random_string(10) + std::string(".tsfile");
remove(file_name_.c_str());
int flags = O_WRONLY | O_CREAT | O_TRUNC;
#ifdef _WIN32
flags |= O_BINARY;
#endif
mode_t mode = 0666;
ASSERT_EQ(tsfile_writer_->open(file_name_, flags, mode), common::E_OK);
}
void TearDown() override {
delete tsfile_writer_;
int ret = remove(file_name_.c_str());
ASSERT_EQ(0, ret);
}
std::string file_name_;
TsFileWriter *tsfile_writer_ = nullptr;
public:
static std::string generate_random_string(int length) {
std::mt19937 gen(static_cast<unsigned int>(
std::chrono::system_clock::now().time_since_epoch().count()));
std::uniform_int_distribution<> dis(0, 61);
const std::string chars =
"0123456789"
"abcdefghijklmnopqrstuvwxyz"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
std::string random_string;
for (int i = 0; i < length; ++i) {
random_string += chars[dis(gen)];
}
return random_string;
}
static std::string field_to_string(storage::Field *value) {
if (value->type_ == common::TEXT || value->type_ == STRING ||
value->type_ == BLOB) {
return std::string(value->value_.sval_);
} else {
std::stringstream ss;
switch (value->type_) {
case common::BOOLEAN:
ss << (value->value_.bval_ ? "true" : "false");
break;
case common::INT32:
ss << value->value_.ival_;
break;
case common::INT64:
case common::TIMESTAMP:
ss << value->value_.lval_;
break;
case common::FLOAT:
ss << value->value_.fval_;
break;
case common::DOUBLE:
ss << value->value_.dval_;
break;
case common::NULL_TYPE:
ss << "NULL";
break;
default:
ASSERT(false);
break;
}
return ss.str();
}
}
};
class TsFileWriterTestSimple : public ::testing::Test {};
TEST_F(TsFileWriterTestSimple, InitWithNullWriteFile) {
TsFileWriter writer;
ASSERT_EQ(writer.init(nullptr), E_INVALID_ARG);
}
TEST_F(TsFileWriterTest, WriteDiffDataType) {
std::string device_name = "test_table";
common::TSEncoding encoding = common::TSEncoding::PLAIN;
common::CompressionType compression_type =
common::CompressionType::UNCOMPRESSED;
std::vector<std::string> measurement_names = {
"level", "num", "bools", "double", "id", "ts", "text", "blob", "date"};
std::vector<common::TSDataType> data_types = {
FLOAT, INT64, BOOLEAN, DOUBLE, STRING, TIMESTAMP, TEXT, BLOB, DATE};
for (uint32_t i = 0; i < measurement_names.size(); i++) {
std::string measurement_name = measurement_names[i];
common::TSDataType data_type = data_types[i];
tsfile_writer_->register_timeseries(
device_name,
storage::MeasurementSchema(measurement_name, data_type, encoding,
compression_type));
}
char *literal = new char[std::strlen("device_id") + 1];
std::strcpy(literal, "device_id");
String literal_str(literal, std::strlen("device_id"));
std::time_t now = std::time(nullptr);
std::tm *local_time = std::localtime(&now);
std::tm today = {};
today.tm_year = local_time->tm_year;
today.tm_mon = local_time->tm_mon;
today.tm_mday = local_time->tm_mday;
int row_num = 100000;
for (int i = 0; i < row_num; ++i) {
TsRecord record(1622505600000 + i * 100, device_name);
for (uint32_t j = 0; j < measurement_names.size(); j++) {
std::string measurement_name = measurement_names[j];
common::TSDataType data_type = data_types[j];
switch (data_type) {
case BOOLEAN:
record.add_point(measurement_name, true);
break;
case INT64:
record.add_point(measurement_name, (int64_t)415412);
break;
case FLOAT:
record.add_point(measurement_name, (float)1.0);
break;
case DOUBLE:
record.add_point(measurement_name, (double)2.0);
break;
case STRING:
record.add_point(measurement_name, literal_str);
break;
case TEXT:
record.add_point(measurement_name, literal_str);
break;
case BLOB:
record.add_point(measurement_name, literal_str);
break;
case TIMESTAMP:
record.add_point(measurement_name, (int64_t)415412);
break;
case DATE:
record.add_point(measurement_name, today);
default:
break;
}
}
ASSERT_EQ(tsfile_writer_->write_record(record), E_OK);
}
ASSERT_EQ(tsfile_writer_->flush(), E_OK);
ASSERT_EQ(tsfile_writer_->close(), E_OK);
std::vector<std::string> select_list;
select_list.reserve(measurement_names.size());
for (uint32_t i = 0; i < measurement_names.size(); ++i) {
std::string measurement_name = measurement_names[i];
std::string path_name = device_name + "." + measurement_name;
select_list.emplace_back(path_name);
}
storage::TsFileReader reader;
int ret = reader.open(file_name_);
ASSERT_EQ(ret, common::E_OK);
storage::ResultSet *tmp_qds = nullptr;
ret = reader.query(select_list, 1622505600000,
1622505600000 + row_num * 100, tmp_qds);
auto *qds = (QDSWithoutTimeGenerator *)tmp_qds;
int64_t cur_record_num = 0;
bool has_next = false;
do {
if (IS_FAIL(qds->next(has_next)) || !has_next) {
break;
}
cur_record_num++;
ASSERT_EQ(qds->get_value<float>(2), (float)1.0);
ASSERT_EQ(qds->get_value<int64_t>(3), (int64_t)415412);
ASSERT_EQ(qds->get_value<bool>(4), true);
ASSERT_EQ(qds->get_value<double>(5), (double)2.0);
ASSERT_EQ(qds->get_value<common::String *>(6)->compare(literal_str), 0);
ASSERT_EQ(qds->get_value<int64_t>(7), (int64_t)415412);
ASSERT_EQ(qds->get_value<common::String *>(8)->compare(literal_str), 0);
ASSERT_EQ(qds->get_value<common::String *>(9)->compare(literal_str), 0);
ASSERT_TRUE(
DateConverter::is_tm_ymd_equal(qds->get_value<std::tm>(10), today));
ASSERT_EQ(qds->get_value<float>(measurement_names[0]), (float)1.0);
ASSERT_EQ(qds->get_value<int64_t>(measurement_names[1]),
(int64_t)415412);
ASSERT_EQ(qds->get_value<bool>(measurement_names[2]), true);
ASSERT_EQ(qds->get_value<double>(measurement_names[3]), (double)2.0);
ASSERT_EQ(qds->get_value<common::String *>(measurement_names[4])
->compare(literal_str),
0);
ASSERT_EQ(qds->get_value<int64_t>(measurement_names[5]),
(int64_t)415412);
ASSERT_EQ(qds->get_value<common::String *>(measurement_names[6])
->compare(literal_str),
0);
ASSERT_EQ(qds->get_value<common::String *>(measurement_names[7])
->compare(literal_str),
0);
ASSERT_TRUE(DateConverter::is_tm_ymd_equal(
qds->get_value<std::tm>(measurement_names[8]), today));
} while (true);
delete[] literal;
EXPECT_EQ(cur_record_num, row_num);
reader.destroy_query_data_set(qds);
ASSERT_EQ(reader.close(), E_OK);
}
TEST_F(TsFileWriterTest, RegisterTimeSeries) {
std::string device_path = "device1";
std::string measurement_name = "temperature";
common::TSDataType data_type = common::TSDataType::INT32;
common::TSEncoding encoding = common::TSEncoding::PLAIN;
common::CompressionType compression_type =
common::CompressionType::UNCOMPRESSED;
ASSERT_EQ(tsfile_writer_->register_timeseries(
device_path,
storage::MeasurementSchema(measurement_name, data_type,
encoding, compression_type)),
E_OK);
ASSERT_EQ(tsfile_writer_->flush(), E_OK);
ASSERT_EQ(tsfile_writer_->close(), E_OK);
}
TEST_F(TsFileWriterTest, WriteMultipleRecords) {
std::string device_path = "device1";
std::string measurement_name = "temperature";
common::TSDataType data_type = common::TSDataType::INT32;
common::TSEncoding encoding = common::TSEncoding::PLAIN;
common::CompressionType compression_type =
common::CompressionType::UNCOMPRESSED;
tsfile_writer_->register_timeseries(
device_path, storage::MeasurementSchema(measurement_name, data_type,
encoding, compression_type));
for (int i = 0; i < 50000; ++i) {
TsRecord record(1622505600000 + i * 1000, device_path);
record.add_point(measurement_name, (int32_t)i);
ASSERT_EQ(tsfile_writer_->write_record(record), E_OK);
ASSERT_EQ(tsfile_writer_->flush(), E_OK);
}
ASSERT_EQ(tsfile_writer_->close(), E_OK);
}
TEST_F(TsFileWriterTest, WriteDiffrentTypeCombination) {
std::string device_path = "device1";
std::string measurement_name = "temperature";
std::vector<TSDataType> data_types = {TSDataType::INT32, TSDataType::INT64,
TSDataType::FLOAT,
TSDataType::DOUBLE};
std::vector<TSEncoding> encodings = {TSEncoding::PLAIN,
TSEncoding::TS_2DIFF};
std::vector<CompressionType> compression_types = {
CompressionType::UNCOMPRESSED, CompressionType::SNAPPY,
CompressionType::GZIP, CompressionType::LZ4};
std::vector<MeasurementSchema> schema_vecs;
schema_vecs.reserve(data_types.size() * encodings.size() *
compression_types.size());
int idx = 0;
for (auto data_type : data_types) {
for (auto encoding_type : encodings) {
for (auto compression_type : compression_types) {
schema_vecs.emplace_back(MeasurementSchema(
measurement_name + std::to_string(idx), data_type,
encoding_type, compression_type));
tsfile_writer_->register_timeseries(device_path,
schema_vecs[idx++]);
}
}
}
char *literal = new char[std::strlen("literal") + 1];
std::strcpy(literal, "literal");
String literal_str(literal, std::strlen("literal"));
for (int i = 0; i < schema_vecs.size(); ++i) {
TsRecord record(1622505600000 + i * 1000, device_path);
if (schema_vecs[i].data_type_ == TSDataType::INT32) {
record.add_point(schema_vecs[i].measurement_name_, (int32_t)i);
} else if (schema_vecs[i].data_type_ == TSDataType::FLOAT) {
record.add_point(schema_vecs[i].measurement_name_, 3.14);
} else if (schema_vecs[i].data_type_ == TSDataType::DOUBLE) {
record.add_point(schema_vecs[i].measurement_name_, 3.1415926);
} else if (schema_vecs[i].data_type_ == TSDataType::BOOLEAN) {
record.add_point(schema_vecs[i].measurement_name_, true);
} else if (schema_vecs[i].data_type_ == TSDataType::STRING) {
record.add_point(schema_vecs[i].measurement_name_, literal_str);
}
ASSERT_EQ(tsfile_writer_->write_record(record), E_OK);
}
ASSERT_EQ(tsfile_writer_->flush(), E_OK);
ASSERT_EQ(tsfile_writer_->close(), E_OK);
delete[] literal;
}
TEST_F(TsFileWriterTest, WriteMultipleTabletsMultiFlush) {
common::config_set_max_degree_of_index_node(3);
const int device_num = 20;
const int measurement_num = 20;
int max_tablet_num = 100;
std::vector<std::vector<MeasurementSchema>> schema_vecs(
device_num, std::vector<MeasurementSchema>(measurement_num));
for (int i = 0; i < device_num; i++) {
std::string device_name = "test_device" + std::to_string(i);
for (int j = 0; j < measurement_num; j++) {
std::string measure_name = "measurement" + std::to_string(j);
schema_vecs[i][j] =
MeasurementSchema(measure_name, common::TSDataType::INT32,
common::TSEncoding::PLAIN,
common::CompressionType::UNCOMPRESSED);
tsfile_writer_->register_timeseries(
device_name, storage::MeasurementSchema(
measure_name, common::TSDataType::INT32,
common::TSEncoding::PLAIN,
common::CompressionType::UNCOMPRESSED));
}
}
for (int tablet_num = 0; tablet_num < max_tablet_num; tablet_num++) {
for (int i = 0; i < device_num; i++) {
std::string device_name = "test_device" + std::to_string(i);
storage::Tablet tablet(
device_name,
std::make_shared<std::vector<MeasurementSchema>>(
schema_vecs[i]),
1);
for (int j = 0; j < measurement_num; j++) {
tablet.add_timestamp(0, 16225600000 + tablet_num * 100);
tablet.add_value(0, j, static_cast<int32_t>(tablet_num));
}
ASSERT_EQ(tsfile_writer_->write_tablet(tablet), E_OK);
}
ASSERT_EQ(tsfile_writer_->flush(), E_OK);
}
ASSERT_EQ(tsfile_writer_->close(), E_OK);
std::vector<storage::Path> select_list;
for (int i = 0; i < device_num; i++) {
for (int j = 0; j < measurement_num; ++j) {
std::string device_name = "test_device" + std::to_string(i);
std::string measure_name = "measurement" + std::to_string(j);
storage::Path path(device_name, measure_name);
select_list.push_back(path);
}
}
storage::QueryExpression *query_expr =
storage::QueryExpression::create(select_list, nullptr);
storage::TsFileReader reader;
int ret = reader.open(file_name_);
ASSERT_EQ(ret, common::E_OK);
storage::ResultSet *tmp_qds = nullptr;
ret = reader.query(query_expr, tmp_qds);
auto *qds = (QDSWithoutTimeGenerator *)tmp_qds;
storage::RowRecord *record;
int max_rows = max_tablet_num * 1;
bool has_next = false;
for (int cur_row = 0; cur_row < max_rows; cur_row++) {
if (IS_FAIL(qds->next(has_next)) || !has_next) {
break;
}
record = qds->get_row_record();
int size = record->get_fields()->size();
for (int i = 0; i < size; ++i) {
if (i == 0) {
EXPECT_EQ(std::to_string(record->get_timestamp()),
field_to_string(record->get_field(i)));
continue;
}
EXPECT_EQ(std::to_string(cur_row),
field_to_string(record->get_field(i)));
}
}
reader.destroy_query_data_set(qds);
}
TEST_F(TsFileWriterTest, WriteMultipleTabletsAlignedMultiFlush) {
common::config_set_max_degree_of_index_node(3);
const int device_num = 20;
const int measurement_num = 20;
int max_tablet_num = 100;
std::vector<std::vector<MeasurementSchema>> schema_vecs(
device_num, std::vector<MeasurementSchema>(measurement_num));
for (int i = 0; i < device_num; i++) {
std::string device_name = "test_device" + std::to_string(i);
for (int j = 0; j < measurement_num; j++) {
std::string measure_name = "measurement" + std::to_string(j);
schema_vecs[i][j] =
MeasurementSchema(measure_name, common::TSDataType::INT32,
common::TSEncoding::PLAIN,
common::CompressionType::UNCOMPRESSED);
tsfile_writer_->register_aligned_timeseries(
device_name, storage::MeasurementSchema(
measure_name, common::TSDataType::INT32,
common::TSEncoding::PLAIN,
common::CompressionType::UNCOMPRESSED));
}
}
for (int tablet_num = 0; tablet_num < max_tablet_num; tablet_num++) {
for (int i = 0; i < device_num; i++) {
std::string device_name = "test_device" + std::to_string(i);
storage::Tablet tablet(
device_name,
std::make_shared<std::vector<MeasurementSchema>>(
schema_vecs[i]),
1);
for (int j = 0; j < measurement_num; j++) {
tablet.add_timestamp(0, 16225600000 + tablet_num * 100);
tablet.add_value(0, j, static_cast<int32_t>(tablet_num));
}
ASSERT_EQ(tsfile_writer_->write_tablet_aligned(tablet), E_OK);
}
ASSERT_EQ(tsfile_writer_->flush(), E_OK);
}
ASSERT_EQ(tsfile_writer_->close(), E_OK);
std::vector<storage::Path> select_list;
for (int i = 0; i < device_num; i++) {
for (int j = 0; j < measurement_num; ++j) {
std::string device_name = "test_device" + std::to_string(i);
std::string measure_name = "measurement" + std::to_string(j);
storage::Path path(device_name, measure_name);
select_list.push_back(path);
}
}
storage::QueryExpression *query_expr =
storage::QueryExpression::create(select_list, nullptr);
storage::TsFileReader reader;
int ret = reader.open(file_name_);
ASSERT_EQ(ret, common::E_OK);
storage::ResultSet *tmp_qds = nullptr;
ret = reader.query(query_expr, tmp_qds);
ASSERT_EQ(ret, common::E_OK);
auto *qds = (QDSWithoutTimeGenerator *)tmp_qds;
storage::RowRecord *record;
int max_rows = max_tablet_num * 1;
bool has_next = false;
for (int cur_row = 0; cur_row < max_rows; cur_row++) {
if (IS_FAIL(qds->next(has_next)) || !has_next) {
break;
}
record = qds->get_row_record();
int size = record->get_fields()->size();
for (int i = 0; i < size; ++i) {
if (i == 0) {
ASSERT_EQ(field_to_string(record->get_field(0)),
std::to_string(record->get_timestamp()));
continue;
}
EXPECT_EQ(std::to_string(cur_row),
field_to_string(record->get_field(i)));
}
}
reader.destroy_query_data_set(qds);
}
TEST_F(TsFileWriterTest, WriteMultipleTabletsInt64) {
const int device_num = 50;
const int measurement_num = 50;
std::vector<MeasurementSchema> schema_vec[50];
for (int i = 0; i < device_num; i++) {
std::string device_name = "test_device" + std::to_string(i);
for (int j = 0; j < measurement_num; j++) {
std::string measure_name = "measurement" + std::to_string(j);
schema_vec[i].push_back(
MeasurementSchema(measure_name, common::TSDataType::INT64,
common::TSEncoding::PLAIN,
common::CompressionType::UNCOMPRESSED));
tsfile_writer_->register_timeseries(
device_name, storage::MeasurementSchema(
measure_name, common::TSDataType::INT64,
common::TSEncoding::PLAIN,
common::CompressionType::UNCOMPRESSED));
}
}
for (int i = 0; i < device_num; i++) {
std::string device_name = "test_device" + std::to_string(i);
int max_rows = 100;
storage::Tablet tablet(
device_name,
std::make_shared<std::vector<MeasurementSchema>>(schema_vec[i]),
max_rows);
for (int j = 0; j < measurement_num; j++) {
for (int row = 0; row < max_rows; row++) {
tablet.add_timestamp(row, 16225600 + row);
}
for (int row = 0; row < max_rows; row++) {
tablet.add_value(row, j, static_cast<int64_t>(row));
}
}
ASSERT_EQ(tsfile_writer_->write_tablet(tablet), E_OK);
}
ASSERT_EQ(tsfile_writer_->flush(), E_OK);
ASSERT_EQ(tsfile_writer_->close(), E_OK);
}
TEST_F(TsFileWriterTest, WriteMultipleTabletsDouble) {
const int device_num = 50;
const int measurement_num = 50;
std::vector<MeasurementSchema> schema_vec[50];
for (int i = 0; i < device_num; i++) {
std::string device_name = "test_device" + std::to_string(i);
for (int j = 0; j < measurement_num; j++) {
std::string measure_name = "measurement" + std::to_string(j);
schema_vec[i].push_back(
MeasurementSchema(measure_name, common::TSDataType::DOUBLE,
common::TSEncoding::PLAIN,
common::CompressionType::UNCOMPRESSED));
tsfile_writer_->register_timeseries(
device_name, storage::MeasurementSchema(
measure_name, common::TSDataType::DOUBLE,
common::TSEncoding::PLAIN,
common::CompressionType::UNCOMPRESSED));
}
}
for (int i = 0; i < device_num; i++) {
std::string device_name = "test_device" + std::to_string(i);
int max_rows = 200;
storage::Tablet tablet(
device_name,
std::make_shared<std::vector<MeasurementSchema>>(schema_vec[i]),
max_rows);
for (int j = 0; j < measurement_num; j++) {
for (int row = 0; row < max_rows; row++) {
tablet.add_timestamp(row, 16225600 + row);
}
for (int row = 0; row < max_rows; row++) {
tablet.add_value(row, j, static_cast<double>(row) + 1.0);
}
}
ASSERT_EQ(tsfile_writer_->write_tablet(tablet), E_OK);
}
ASSERT_EQ(tsfile_writer_->flush(), E_OK);
ASSERT_EQ(tsfile_writer_->close(), E_OK);
}
TEST_F(TsFileWriterTest, FlushMultipleDevice) {
const int device_num = 50;
const int measurement_num = 50;
const int max_rows = 100;
std::vector<MeasurementSchema> schema_vec[50];
for (int i = 0; i < device_num; i++) {
std::string device_name = "test_device" + std::to_string(i);
for (int j = 0; j < measurement_num; j++) {
std::string measure_name = "measurement" + std::to_string(j);
schema_vec[i].emplace_back(measure_name, common::TSDataType::INT64,
common::TSEncoding::PLAIN,
common::CompressionType::UNCOMPRESSED);
tsfile_writer_->register_timeseries(
device_name,
MeasurementSchema(measure_name, common::TSDataType::INT64,
common::TSEncoding::PLAIN,
common::CompressionType::UNCOMPRESSED));
}
}
for (int i = 0; i < device_num; i++) {
std::string device_name = "test_device" + std::to_string(i);
storage::Tablet tablet(
device_name,
std::make_shared<std::vector<MeasurementSchema>>(schema_vec[i]),
max_rows);
for (int j = 0; j < measurement_num; j++) {
for (int row = 0; row < max_rows; row++) {
tablet.add_timestamp(row, 16225600 + row);
}
for (int row = 0; row < max_rows; row++) {
tablet.add_value(row, j, static_cast<int64_t>(row));
}
}
ASSERT_EQ(tsfile_writer_->write_tablet(tablet), E_OK);
// flush after write tablet to check whether write empty chunk
ASSERT_EQ(tsfile_writer_->flush(), E_OK);
}
ASSERT_EQ(tsfile_writer_->close(), E_OK);
std::vector<storage::Path> select_list;
for (int i = 0; i < device_num; i++) {
std::string device_name = "test_device" + std::to_string(i);
for (int j = 0; j < measurement_num; j++) {
std::string measurement_name = "measurement" + std::to_string(j);
storage::Path path(device_name, measurement_name);
select_list.push_back(path);
}
}
storage::QueryExpression *query_expr =
storage::QueryExpression::create(select_list, nullptr);
storage::TsFileReader reader;
int ret = reader.open(file_name_);
ASSERT_EQ(ret, common::E_OK);
storage::ResultSet *tmp_qds = nullptr;
ret = reader.query(query_expr, tmp_qds);
auto *qds = (QDSWithoutTimeGenerator *)tmp_qds;
storage::RowRecord *record;
int64_t cur_record_num = 0;
bool has_next = false;
do {
if (IS_FAIL(qds->next(has_next)) || !has_next) {
break;
}
record = qds->get_row_record();
// if empty chunk is writen, the timestamp should be NULL
if (!record) {
break;
}
EXPECT_EQ(record->get_timestamp(), 16225600 + cur_record_num);
cur_record_num++;
} while (true);
EXPECT_EQ(cur_record_num, max_rows);
reader.destroy_query_data_set(qds);
}
TEST_F(TsFileWriterTest, AnalyzeTsfileForload) {
const int device_num = 50;
const int measurement_num = 50;
const int max_rows = 100;
std::vector<MeasurementSchema> schema_vec[50];
for (int i = 0; i < device_num; i++) {
std::string device_name = "test_device" + std::to_string(i);
for (int j = 0; j < measurement_num; j++) {
std::string measure_name = "measurement" + std::to_string(j);
schema_vec[i].push_back(
MeasurementSchema(measure_name, common::TSDataType::INT64,
common::TSEncoding::PLAIN,
common::CompressionType::UNCOMPRESSED));
tsfile_writer_->register_timeseries(
device_name,
MeasurementSchema(measure_name, common::TSDataType::INT64,
common::TSEncoding::PLAIN,
common::CompressionType::UNCOMPRESSED));
}
}
for (int i = 0; i < device_num; i++) {
std::string device_name = "test_device" + std::to_string(i);
storage::Tablet tablet(
device_name,
std::make_shared<std::vector<MeasurementSchema>>(schema_vec[i]),
max_rows);
for (int j = 0; j < measurement_num; j++) {
for (int row = 0; row < max_rows; row++) {
tablet.add_timestamp(row, 16225600 + row);
}
for (int row = 0; row < max_rows; row++) {
tablet.add_value(row, j, static_cast<int64_t>(row));
}
}
ASSERT_EQ(tsfile_writer_->write_tablet(tablet), E_OK);
}
auto schemas = tsfile_writer_->get_schema_group_map();
ASSERT_EQ(schemas->size(), 50);
for (const auto &device_iter : *schemas) {
for (const auto &chunk_iter :
device_iter.second->measurement_schema_map_) {
ASSERT_NE(chunk_iter.second->chunk_writer_, nullptr);
ASSERT_TRUE(chunk_iter.second->chunk_writer_->hasData());
}
}
ASSERT_EQ(tsfile_writer_->flush(), E_OK);
ASSERT_EQ(tsfile_writer_->close(), E_OK);
}
TEST_F(TsFileWriterTest, FlushWithoutWriteAfterRegisterTS) {
std::string device_path = "device1";
std::string measurement_name = "temperature";
common::TSDataType data_type = common::TSDataType::INT32;
common::TSEncoding encoding = common::TSEncoding::PLAIN;
common::CompressionType compression_type =
common::CompressionType::UNCOMPRESSED;
ASSERT_EQ(tsfile_writer_->register_timeseries(
device_path,
storage::MeasurementSchema(measurement_name, data_type,
encoding, compression_type)),
E_OK);
ASSERT_EQ(tsfile_writer_->flush(), E_OK);
ASSERT_EQ(tsfile_writer_->close(), E_OK);
}
TEST_F(TsFileWriterTest, WriteAlignedTimeseries) {
int measurement_num = 100, row_num = 150;
std::string device_name = "device";
std::vector<std::string> measurement_names;
for (int i = 0; i < measurement_num; i++) {
measurement_names.emplace_back("temperature" + std::to_string(i));
}
common::TSDataType data_type = common::TSDataType::INT32;
common::TSEncoding encoding = common::TSEncoding::PLAIN;
common::CompressionType compression_type =
common::CompressionType::UNCOMPRESSED;
std::vector<MeasurementSchema *> measurement_schema_vec;
for (const auto &measurement_name : measurement_names) {
auto *ms = new MeasurementSchema(measurement_name, data_type, encoding,
compression_type);
measurement_schema_vec.push_back(ms);
}
tsfile_writer_->register_aligned_timeseries(device_name,
measurement_schema_vec);
for (int i = 0; i < row_num; ++i) {
TsRecord record(1622505600000 + i * 1000, device_name);
for (const auto &measurement_name : measurement_names) {
record.add_point(measurement_name, (int32_t)i);
}
ASSERT_EQ(tsfile_writer_->write_record_aligned(record), E_OK);
}
ASSERT_EQ(tsfile_writer_->flush(), E_OK);
ASSERT_EQ(tsfile_writer_->close(), E_OK);
std::vector<storage::Path> select_list;
for (int i = 0; i < measurement_num; ++i) {
std::string measurement_name = "temperature" + std::to_string(i);
storage::Path path(device_name, measurement_name);
select_list.push_back(path);
}
storage::QueryExpression *query_expr =
storage::QueryExpression::create(select_list, nullptr);
storage::TsFileReader reader;
int ret = reader.open(file_name_);
ASSERT_EQ(ret, common::E_OK);
storage::ResultSet *tmp_qds = nullptr;
ret = reader.query(query_expr, tmp_qds);
auto *qds = (QDSWithoutTimeGenerator *)tmp_qds;
storage::RowRecord *record;
bool has_next = false;
for (int cur_row = 0; cur_row < row_num; cur_row++) {
if (IS_FAIL(qds->next(has_next)) || !has_next) {
break;
}
record = qds->get_row_record();
int size = record->get_fields()->size();
for (int i = 0; i < size; ++i) {
if (i == 0) {
EXPECT_EQ(std::to_string(record->get_timestamp()),
field_to_string(record->get_field(i)));
continue;
}
EXPECT_EQ(std::to_string(cur_row),
field_to_string(record->get_field(i)));
}
}
reader.destroy_query_data_set(qds);
}
TEST_F(TsFileWriterTest, WriteAlignedMultiFlush) {
int measurement_num = 100, row_num = 100;
std::string device_name = "device";
std::vector<std::string> measurement_names;
for (int i = 0; i < measurement_num; i++) {
measurement_names.emplace_back("temperature" + std::to_string(i));
}
common::TSDataType data_type = common::TSDataType::INT32;
common::TSEncoding encoding = common::TSEncoding::PLAIN;
common::CompressionType compression_type =
common::CompressionType::UNCOMPRESSED;
std::vector<MeasurementSchema *> measurement_schema_vec;
for (const auto &measurement_name : measurement_names) {
auto *ms = new MeasurementSchema(measurement_name, data_type, encoding,
compression_type);
measurement_schema_vec.push_back(ms);
}
tsfile_writer_->register_aligned_timeseries(device_name,
measurement_schema_vec);
for (int i = 0; i < row_num; ++i) {
TsRecord record(1622505600000 + i * 1000, device_name);
for (const auto &measurement_name : measurement_names) {
record.add_point(measurement_name, (int32_t)i);
}
ASSERT_EQ(tsfile_writer_->write_record_aligned(record), E_OK);
ASSERT_EQ(tsfile_writer_->flush(), E_OK);
}
ASSERT_EQ(tsfile_writer_->close(), E_OK);
std::vector<storage::Path> select_list;
for (int i = 0; i < measurement_num; ++i) {
std::string measurement_name = "temperature" + std::to_string(i);
storage::Path path(device_name, measurement_name);
select_list.push_back(path);
}
storage::QueryExpression *query_expr =
storage::QueryExpression::create(select_list, nullptr);
storage::TsFileReader reader;
int ret = reader.open(file_name_);
ASSERT_EQ(ret, common::E_OK);
storage::ResultSet *tmp_qds = nullptr;
ret = reader.query(query_expr, tmp_qds);
auto *qds = (QDSWithoutTimeGenerator *)tmp_qds;
storage::RowRecord *record;
bool has_next = false;
for (int cur_row = 0; cur_row < row_num; cur_row++) {
if (IS_FAIL(qds->next(has_next)) || !has_next) {
break;
}
record = qds->get_row_record();
int size = record->get_fields()->size();
for (int i = 0; i < size; ++i) {
if (i == 0) {
EXPECT_EQ(std::to_string(record->get_timestamp()),
field_to_string(record->get_field(i)));
continue;
}
EXPECT_EQ(std::to_string(cur_row),
field_to_string(record->get_field(i)));
}
}
reader.destroy_query_data_set(qds);
}
TEST_F(TsFileWriterTest, WriteAlignedPartialData) {
int measurement_num = 100, row_num = 200;
std::string device_name = "device";
std::vector<std::string> measurement_names;
for (int i = 0; i < measurement_num; i++) {
measurement_names.emplace_back("temperature" + std::to_string(i));
}
common::TSDataType data_type = common::TSDataType::INT32;
common::TSEncoding encoding = common::TSEncoding::PLAIN;
common::CompressionType compression_type =
common::CompressionType::UNCOMPRESSED;
std::vector<MeasurementSchema *> measurement_schema_vec;
for (const auto &measurement_name : measurement_names) {
auto *ms = new MeasurementSchema(measurement_name, data_type, encoding,
compression_type);
measurement_schema_vec.push_back(ms);
}
tsfile_writer_->register_aligned_timeseries(device_name,
measurement_schema_vec);
for (int i = 0; i < row_num; ++i) {
TsRecord record(1622505600000 + i * 1000, device_name);
for (const auto &measurement_name : measurement_names) {
record.add_point(measurement_name, (int32_t)i);
}
ASSERT_EQ(tsfile_writer_->write_record_aligned(record), E_OK);
}
ASSERT_EQ(tsfile_writer_->flush(), E_OK);
ASSERT_EQ(tsfile_writer_->close(), E_OK);
std::vector<storage::Path> select_list;
for (int i = 0; i < measurement_num; ++i) {
std::string measurement_name = "temperature" + std::to_string(i);
storage::Path path(device_name, measurement_name);
select_list.push_back(path);
}
storage::QueryExpression *query_expr =
storage::QueryExpression::create(select_list, nullptr);
storage::TsFileReader reader;
int ret = reader.open(file_name_);
ASSERT_EQ(ret, common::E_OK);
storage::ResultSet *tmp_qds = nullptr;
ret = reader.query(query_expr, tmp_qds);
auto *qds = (QDSWithoutTimeGenerator *)tmp_qds;
storage::RowRecord *record;
int64_t cur_row = 0;
bool has_next = false;
do {
if (IS_FAIL(qds->next(has_next)) || !has_next) {
break;
}
record = qds->get_row_record();
int size = record->get_fields()->size();
for (int i = 0; i < size; ++i) {
if (i == 0) {
EXPECT_EQ(std::to_string(record->get_timestamp()),
field_to_string(record->get_field(i)));
continue;
}
EXPECT_EQ(std::to_string(cur_row),
field_to_string(record->get_field(i)));
}
cur_row++;
} while (true);
reader.destroy_query_data_set(qds);
}
TEST_F(TsFileWriterTest, WriteTabletDataTypeMismatch) {
for (int i = 0; i < 2; i++) {
std::string device_name = "test_device" + std::to_string(i);
for (int j = 0; j < 3; j++) {
std::string measure_name = "measurement" + std::to_string(j);
tsfile_writer_->register_timeseries(
device_name, storage::MeasurementSchema(
measure_name, common::TSDataType::INT32,
common::TSEncoding::PLAIN,
common::CompressionType::UNCOMPRESSED));
}
}
std::vector<TSDataType> measurement_types{
TSDataType::INT32, TSDataType::INT64, TSDataType::INT32};
std::vector<std::string> measurement_names{"measurement0", "measurement1",
"measurement2"};
Tablet tablet("test_device0", &measurement_names, &measurement_types);
for (int row = 0; row < 100; row++) {
tablet.add_timestamp(row, row);
for (int col = 0; col < 3; col++) {
switch (measurement_types[col]) {
case TSDataType::INT32:
tablet.add_value(row, col, static_cast<int32_t>(row));
break;
case TSDataType::INT64:
tablet.add_value(row, col, static_cast<int64_t>(row));
break;
default:;
}
}
}
ASSERT_EQ(E_TYPE_NOT_MATCH, tsfile_writer_->write_tablet(tablet));
std::vector<MeasurementSchema *> measurement_schemas;
for (int i = 0; i < 3; i++) {
measurement_schemas.push_back(new MeasurementSchema(
"measurement" + std::to_string(i), TSDataType::INT32));
}
tsfile_writer_->register_aligned_timeseries("device3", measurement_schemas);
tablet.set_table_name("device3");
ASSERT_EQ(E_TYPE_NOT_MATCH, tsfile_writer_->write_tablet_aligned(tablet));
ASSERT_EQ(tsfile_writer_->flush(), E_OK);
ASSERT_EQ(tsfile_writer_->close(), E_OK);
}