| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * License); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License a |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| #include "writer/tsfile_writer.h" |
| |
| #include <gtest/gtest.h> |
| |
| #include <random> |
| |
| #include "common/path.h" |
| #include "common/record.h" |
| #include "common/schema.h" |
| #include "common/tablet.h" |
| #include "file/tsfile_io_writer.h" |
| #include "file/write_file.h" |
| #include "reader/qds_without_timegenerator.h" |
| #include "reader/tsfile_reader.h" |
| #include "writer/chunk_writer.h" |
| using namespace storage; |
| using namespace common; |
| |
| class TsFileWriterTest : public ::testing::Test { |
| protected: |
| void SetUp() override { |
| libtsfile_init(); |
| tsfile_writer_ = new TsFileWriter(); |
| file_name_ = std::string("tsfile_writer_test_") + |
| generate_random_string(10) + std::string(".tsfile"); |
| remove(file_name_.c_str()); |
| int flags = O_WRONLY | O_CREAT | O_TRUNC; |
| #ifdef _WIN32 |
| flags |= O_BINARY; |
| #endif |
| mode_t mode = 0666; |
| ASSERT_EQ(tsfile_writer_->open(file_name_, flags, mode), common::E_OK); |
| } |
| void TearDown() override { |
| delete tsfile_writer_; |
| int ret = remove(file_name_.c_str()); |
| ASSERT_EQ(0, ret); |
| } |
| |
| std::string file_name_; |
| TsFileWriter *tsfile_writer_ = nullptr; |
| |
| public: |
| static std::string generate_random_string(int length) { |
| std::mt19937 gen(static_cast<unsigned int>( |
| std::chrono::system_clock::now().time_since_epoch().count())); |
| std::uniform_int_distribution<> dis(0, 61); |
| |
| const std::string chars = |
| "0123456789" |
| "abcdefghijklmnopqrstuvwxyz" |
| "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; |
| |
| std::string random_string; |
| |
| for (int i = 0; i < length; ++i) { |
| random_string += chars[dis(gen)]; |
| } |
| return random_string; |
| } |
| |
| static std::string field_to_string(storage::Field *value) { |
| if (value->type_ == common::TEXT || value->type_ == STRING || |
| value->type_ == BLOB) { |
| return std::string(value->value_.sval_); |
| } else { |
| std::stringstream ss; |
| switch (value->type_) { |
| case common::BOOLEAN: |
| ss << (value->value_.bval_ ? "true" : "false"); |
| break; |
| case common::INT32: |
| ss << value->value_.ival_; |
| break; |
| case common::INT64: |
| case common::TIMESTAMP: |
| ss << value->value_.lval_; |
| break; |
| case common::FLOAT: |
| ss << value->value_.fval_; |
| break; |
| case common::DOUBLE: |
| ss << value->value_.dval_; |
| break; |
| case common::NULL_TYPE: |
| ss << "NULL"; |
| break; |
| default: |
| ASSERT(false); |
| break; |
| } |
| return ss.str(); |
| } |
| } |
| }; |
| |
| class TsFileWriterTestSimple : public ::testing::Test {}; |
| |
| TEST_F(TsFileWriterTestSimple, InitWithNullWriteFile) { |
| TsFileWriter writer; |
| ASSERT_EQ(writer.init(nullptr), E_INVALID_ARG); |
| } |
| |
| TEST_F(TsFileWriterTest, WriteDiffDataType) { |
| std::string device_name = "test_table"; |
| common::TSEncoding encoding = common::TSEncoding::PLAIN; |
| common::CompressionType compression_type = |
| common::CompressionType::UNCOMPRESSED; |
| std::vector<std::string> measurement_names = { |
| "level", "num", "bools", "double", "id", "ts", "text", "blob", "date"}; |
| std::vector<common::TSDataType> data_types = { |
| FLOAT, INT64, BOOLEAN, DOUBLE, STRING, TIMESTAMP, TEXT, BLOB, DATE}; |
| for (uint32_t i = 0; i < measurement_names.size(); i++) { |
| std::string measurement_name = measurement_names[i]; |
| common::TSDataType data_type = data_types[i]; |
| tsfile_writer_->register_timeseries( |
| device_name, |
| storage::MeasurementSchema(measurement_name, data_type, encoding, |
| compression_type)); |
| } |
| |
| char *literal = new char[std::strlen("device_id") + 1]; |
| std::strcpy(literal, "device_id"); |
| String literal_str(literal, std::strlen("device_id")); |
| |
| std::time_t now = std::time(nullptr); |
| std::tm *local_time = std::localtime(&now); |
| std::tm today = {}; |
| today.tm_year = local_time->tm_year; |
| today.tm_mon = local_time->tm_mon; |
| today.tm_mday = local_time->tm_mday; |
| |
| int row_num = 100000; |
| for (int i = 0; i < row_num; ++i) { |
| TsRecord record(1622505600000 + i * 100, device_name); |
| for (uint32_t j = 0; j < measurement_names.size(); j++) { |
| std::string measurement_name = measurement_names[j]; |
| common::TSDataType data_type = data_types[j]; |
| switch (data_type) { |
| case BOOLEAN: |
| record.add_point(measurement_name, true); |
| break; |
| case INT64: |
| record.add_point(measurement_name, (int64_t)415412); |
| break; |
| case FLOAT: |
| record.add_point(measurement_name, (float)1.0); |
| break; |
| case DOUBLE: |
| record.add_point(measurement_name, (double)2.0); |
| break; |
| case STRING: |
| record.add_point(measurement_name, literal_str); |
| break; |
| case TEXT: |
| record.add_point(measurement_name, literal_str); |
| break; |
| case BLOB: |
| record.add_point(measurement_name, literal_str); |
| break; |
| case TIMESTAMP: |
| record.add_point(measurement_name, (int64_t)415412); |
| break; |
| case DATE: |
| record.add_point(measurement_name, today); |
| default: |
| break; |
| } |
| } |
| ASSERT_EQ(tsfile_writer_->write_record(record), E_OK); |
| } |
| ASSERT_EQ(tsfile_writer_->flush(), E_OK); |
| ASSERT_EQ(tsfile_writer_->close(), E_OK); |
| |
| std::vector<std::string> select_list; |
| select_list.reserve(measurement_names.size()); |
| for (uint32_t i = 0; i < measurement_names.size(); ++i) { |
| std::string measurement_name = measurement_names[i]; |
| std::string path_name = device_name + "." + measurement_name; |
| select_list.emplace_back(path_name); |
| } |
| |
| storage::TsFileReader reader; |
| int ret = reader.open(file_name_); |
| ASSERT_EQ(ret, common::E_OK); |
| storage::ResultSet *tmp_qds = nullptr; |
| ret = reader.query(select_list, 1622505600000, |
| 1622505600000 + row_num * 100, tmp_qds); |
| auto *qds = (QDSWithoutTimeGenerator *)tmp_qds; |
| |
| int64_t cur_record_num = 0; |
| bool has_next = false; |
| do { |
| if (IS_FAIL(qds->next(has_next)) || !has_next) { |
| break; |
| } |
| cur_record_num++; |
| ASSERT_EQ(qds->get_value<float>(2), (float)1.0); |
| ASSERT_EQ(qds->get_value<int64_t>(3), (int64_t)415412); |
| ASSERT_EQ(qds->get_value<bool>(4), true); |
| ASSERT_EQ(qds->get_value<double>(5), (double)2.0); |
| ASSERT_EQ(qds->get_value<common::String *>(6)->compare(literal_str), 0); |
| ASSERT_EQ(qds->get_value<int64_t>(7), (int64_t)415412); |
| ASSERT_EQ(qds->get_value<common::String *>(8)->compare(literal_str), 0); |
| ASSERT_EQ(qds->get_value<common::String *>(9)->compare(literal_str), 0); |
| ASSERT_TRUE( |
| DateConverter::is_tm_ymd_equal(qds->get_value<std::tm>(10), today)); |
| |
| ASSERT_EQ(qds->get_value<float>(measurement_names[0]), (float)1.0); |
| ASSERT_EQ(qds->get_value<int64_t>(measurement_names[1]), |
| (int64_t)415412); |
| ASSERT_EQ(qds->get_value<bool>(measurement_names[2]), true); |
| ASSERT_EQ(qds->get_value<double>(measurement_names[3]), (double)2.0); |
| ASSERT_EQ(qds->get_value<common::String *>(measurement_names[4]) |
| ->compare(literal_str), |
| 0); |
| ASSERT_EQ(qds->get_value<int64_t>(measurement_names[5]), |
| (int64_t)415412); |
| ASSERT_EQ(qds->get_value<common::String *>(measurement_names[6]) |
| ->compare(literal_str), |
| 0); |
| ASSERT_EQ(qds->get_value<common::String *>(measurement_names[7]) |
| ->compare(literal_str), |
| 0); |
| ASSERT_TRUE(DateConverter::is_tm_ymd_equal( |
| qds->get_value<std::tm>(measurement_names[8]), today)); |
| } while (true); |
| delete[] literal; |
| EXPECT_EQ(cur_record_num, row_num); |
| reader.destroy_query_data_set(qds); |
| ASSERT_EQ(reader.close(), E_OK); |
| } |
| |
| TEST_F(TsFileWriterTest, RegisterTimeSeries) { |
| std::string device_path = "device1"; |
| std::string measurement_name = "temperature"; |
| common::TSDataType data_type = common::TSDataType::INT32; |
| common::TSEncoding encoding = common::TSEncoding::PLAIN; |
| common::CompressionType compression_type = |
| common::CompressionType::UNCOMPRESSED; |
| |
| ASSERT_EQ(tsfile_writer_->register_timeseries( |
| device_path, |
| storage::MeasurementSchema(measurement_name, data_type, |
| encoding, compression_type)), |
| E_OK); |
| ASSERT_EQ(tsfile_writer_->flush(), E_OK); |
| ASSERT_EQ(tsfile_writer_->close(), E_OK); |
| } |
| |
| TEST_F(TsFileWriterTest, WriteMultipleRecords) { |
| std::string device_path = "device1"; |
| std::string measurement_name = "temperature"; |
| common::TSDataType data_type = common::TSDataType::INT32; |
| common::TSEncoding encoding = common::TSEncoding::PLAIN; |
| common::CompressionType compression_type = |
| common::CompressionType::UNCOMPRESSED; |
| tsfile_writer_->register_timeseries( |
| device_path, storage::MeasurementSchema(measurement_name, data_type, |
| encoding, compression_type)); |
| |
| for (int i = 0; i < 50000; ++i) { |
| TsRecord record(1622505600000 + i * 1000, device_path); |
| record.add_point(measurement_name, (int32_t)i); |
| ASSERT_EQ(tsfile_writer_->write_record(record), E_OK); |
| ASSERT_EQ(tsfile_writer_->flush(), E_OK); |
| } |
| ASSERT_EQ(tsfile_writer_->close(), E_OK); |
| } |
| |
| TEST_F(TsFileWriterTest, WriteDiffrentTypeCombination) { |
| std::string device_path = "device1"; |
| std::string measurement_name = "temperature"; |
| std::vector<TSDataType> data_types = {TSDataType::INT32, TSDataType::INT64, |
| TSDataType::FLOAT, |
| TSDataType::DOUBLE}; |
| std::vector<TSEncoding> encodings = {TSEncoding::PLAIN, |
| TSEncoding::TS_2DIFF}; |
| std::vector<CompressionType> compression_types = { |
| CompressionType::UNCOMPRESSED, CompressionType::SNAPPY, |
| CompressionType::GZIP, CompressionType::LZ4}; |
| |
| std::vector<MeasurementSchema> schema_vecs; |
| schema_vecs.reserve(data_types.size() * encodings.size() * |
| compression_types.size()); |
| int idx = 0; |
| for (auto data_type : data_types) { |
| for (auto encoding_type : encodings) { |
| for (auto compression_type : compression_types) { |
| schema_vecs.emplace_back(MeasurementSchema( |
| measurement_name + std::to_string(idx), data_type, |
| encoding_type, compression_type)); |
| tsfile_writer_->register_timeseries(device_path, |
| schema_vecs[idx++]); |
| } |
| } |
| } |
| |
| char *literal = new char[std::strlen("literal") + 1]; |
| std::strcpy(literal, "literal"); |
| String literal_str(literal, std::strlen("literal")); |
| |
| for (int i = 0; i < schema_vecs.size(); ++i) { |
| TsRecord record(1622505600000 + i * 1000, device_path); |
| if (schema_vecs[i].data_type_ == TSDataType::INT32) { |
| record.add_point(schema_vecs[i].measurement_name_, (int32_t)i); |
| } else if (schema_vecs[i].data_type_ == TSDataType::FLOAT) { |
| record.add_point(schema_vecs[i].measurement_name_, 3.14); |
| } else if (schema_vecs[i].data_type_ == TSDataType::DOUBLE) { |
| record.add_point(schema_vecs[i].measurement_name_, 3.1415926); |
| } else if (schema_vecs[i].data_type_ == TSDataType::BOOLEAN) { |
| record.add_point(schema_vecs[i].measurement_name_, true); |
| } else if (schema_vecs[i].data_type_ == TSDataType::STRING) { |
| record.add_point(schema_vecs[i].measurement_name_, literal_str); |
| } |
| ASSERT_EQ(tsfile_writer_->write_record(record), E_OK); |
| } |
| ASSERT_EQ(tsfile_writer_->flush(), E_OK); |
| ASSERT_EQ(tsfile_writer_->close(), E_OK); |
| delete[] literal; |
| } |
| |
| TEST_F(TsFileWriterTest, WriteMultipleTabletsMultiFlush) { |
| common::config_set_max_degree_of_index_node(3); |
| const int device_num = 20; |
| const int measurement_num = 20; |
| int max_tablet_num = 100; |
| std::vector<std::vector<MeasurementSchema>> schema_vecs( |
| device_num, std::vector<MeasurementSchema>(measurement_num)); |
| for (int i = 0; i < device_num; i++) { |
| std::string device_name = "test_device" + std::to_string(i); |
| for (int j = 0; j < measurement_num; j++) { |
| std::string measure_name = "measurement" + std::to_string(j); |
| schema_vecs[i][j] = |
| MeasurementSchema(measure_name, common::TSDataType::INT32, |
| common::TSEncoding::PLAIN, |
| common::CompressionType::UNCOMPRESSED); |
| tsfile_writer_->register_timeseries( |
| device_name, storage::MeasurementSchema( |
| measure_name, common::TSDataType::INT32, |
| common::TSEncoding::PLAIN, |
| common::CompressionType::UNCOMPRESSED)); |
| } |
| } |
| |
| for (int tablet_num = 0; tablet_num < max_tablet_num; tablet_num++) { |
| for (int i = 0; i < device_num; i++) { |
| std::string device_name = "test_device" + std::to_string(i); |
| storage::Tablet tablet( |
| device_name, |
| std::make_shared<std::vector<MeasurementSchema>>( |
| schema_vecs[i]), |
| 1); |
| for (int j = 0; j < measurement_num; j++) { |
| tablet.add_timestamp(0, 16225600000 + tablet_num * 100); |
| tablet.add_value(0, j, static_cast<int32_t>(tablet_num)); |
| } |
| ASSERT_EQ(tsfile_writer_->write_tablet(tablet), E_OK); |
| } |
| ASSERT_EQ(tsfile_writer_->flush(), E_OK); |
| } |
| ASSERT_EQ(tsfile_writer_->close(), E_OK); |
| |
| std::vector<storage::Path> select_list; |
| for (int i = 0; i < device_num; i++) { |
| for (int j = 0; j < measurement_num; ++j) { |
| std::string device_name = "test_device" + std::to_string(i); |
| std::string measure_name = "measurement" + std::to_string(j); |
| storage::Path path(device_name, measure_name); |
| select_list.push_back(path); |
| } |
| } |
| storage::QueryExpression *query_expr = |
| storage::QueryExpression::create(select_list, nullptr); |
| |
| storage::TsFileReader reader; |
| int ret = reader.open(file_name_); |
| ASSERT_EQ(ret, common::E_OK); |
| storage::ResultSet *tmp_qds = nullptr; |
| |
| ret = reader.query(query_expr, tmp_qds); |
| auto *qds = (QDSWithoutTimeGenerator *)tmp_qds; |
| |
| storage::RowRecord *record; |
| int max_rows = max_tablet_num * 1; |
| bool has_next = false; |
| for (int cur_row = 0; cur_row < max_rows; cur_row++) { |
| if (IS_FAIL(qds->next(has_next)) || !has_next) { |
| break; |
| } |
| record = qds->get_row_record(); |
| int size = record->get_fields()->size(); |
| for (int i = 0; i < size; ++i) { |
| if (i == 0) { |
| EXPECT_EQ(std::to_string(record->get_timestamp()), |
| field_to_string(record->get_field(i))); |
| continue; |
| } |
| EXPECT_EQ(std::to_string(cur_row), |
| field_to_string(record->get_field(i))); |
| } |
| } |
| reader.destroy_query_data_set(qds); |
| } |
| |
| TEST_F(TsFileWriterTest, WriteMultipleTabletsAlignedMultiFlush) { |
| common::config_set_max_degree_of_index_node(3); |
| const int device_num = 20; |
| const int measurement_num = 20; |
| int max_tablet_num = 100; |
| std::vector<std::vector<MeasurementSchema>> schema_vecs( |
| device_num, std::vector<MeasurementSchema>(measurement_num)); |
| for (int i = 0; i < device_num; i++) { |
| std::string device_name = "test_device" + std::to_string(i); |
| for (int j = 0; j < measurement_num; j++) { |
| std::string measure_name = "measurement" + std::to_string(j); |
| schema_vecs[i][j] = |
| MeasurementSchema(measure_name, common::TSDataType::INT32, |
| common::TSEncoding::PLAIN, |
| common::CompressionType::UNCOMPRESSED); |
| tsfile_writer_->register_aligned_timeseries( |
| device_name, storage::MeasurementSchema( |
| measure_name, common::TSDataType::INT32, |
| common::TSEncoding::PLAIN, |
| common::CompressionType::UNCOMPRESSED)); |
| } |
| } |
| |
| for (int tablet_num = 0; tablet_num < max_tablet_num; tablet_num++) { |
| for (int i = 0; i < device_num; i++) { |
| std::string device_name = "test_device" + std::to_string(i); |
| storage::Tablet tablet( |
| device_name, |
| std::make_shared<std::vector<MeasurementSchema>>( |
| schema_vecs[i]), |
| 1); |
| for (int j = 0; j < measurement_num; j++) { |
| tablet.add_timestamp(0, 16225600000 + tablet_num * 100); |
| tablet.add_value(0, j, static_cast<int32_t>(tablet_num)); |
| } |
| ASSERT_EQ(tsfile_writer_->write_tablet_aligned(tablet), E_OK); |
| } |
| ASSERT_EQ(tsfile_writer_->flush(), E_OK); |
| } |
| ASSERT_EQ(tsfile_writer_->close(), E_OK); |
| |
| std::vector<storage::Path> select_list; |
| for (int i = 0; i < device_num; i++) { |
| for (int j = 0; j < measurement_num; ++j) { |
| std::string device_name = "test_device" + std::to_string(i); |
| std::string measure_name = "measurement" + std::to_string(j); |
| storage::Path path(device_name, measure_name); |
| select_list.push_back(path); |
| } |
| } |
| storage::QueryExpression *query_expr = |
| storage::QueryExpression::create(select_list, nullptr); |
| |
| storage::TsFileReader reader; |
| int ret = reader.open(file_name_); |
| ASSERT_EQ(ret, common::E_OK); |
| storage::ResultSet *tmp_qds = nullptr; |
| |
| ret = reader.query(query_expr, tmp_qds); |
| ASSERT_EQ(ret, common::E_OK); |
| auto *qds = (QDSWithoutTimeGenerator *)tmp_qds; |
| |
| storage::RowRecord *record; |
| int max_rows = max_tablet_num * 1; |
| bool has_next = false; |
| for (int cur_row = 0; cur_row < max_rows; cur_row++) { |
| if (IS_FAIL(qds->next(has_next)) || !has_next) { |
| break; |
| } |
| record = qds->get_row_record(); |
| int size = record->get_fields()->size(); |
| for (int i = 0; i < size; ++i) { |
| if (i == 0) { |
| ASSERT_EQ(field_to_string(record->get_field(0)), |
| std::to_string(record->get_timestamp())); |
| continue; |
| } |
| EXPECT_EQ(std::to_string(cur_row), |
| field_to_string(record->get_field(i))); |
| } |
| } |
| reader.destroy_query_data_set(qds); |
| } |
| |
| TEST_F(TsFileWriterTest, WriteMultipleTabletsInt64) { |
| const int device_num = 50; |
| const int measurement_num = 50; |
| std::vector<MeasurementSchema> schema_vec[50]; |
| |
| for (int i = 0; i < device_num; i++) { |
| std::string device_name = "test_device" + std::to_string(i); |
| for (int j = 0; j < measurement_num; j++) { |
| std::string measure_name = "measurement" + std::to_string(j); |
| schema_vec[i].push_back( |
| MeasurementSchema(measure_name, common::TSDataType::INT64, |
| common::TSEncoding::PLAIN, |
| common::CompressionType::UNCOMPRESSED)); |
| tsfile_writer_->register_timeseries( |
| device_name, storage::MeasurementSchema( |
| measure_name, common::TSDataType::INT64, |
| common::TSEncoding::PLAIN, |
| common::CompressionType::UNCOMPRESSED)); |
| } |
| } |
| |
| for (int i = 0; i < device_num; i++) { |
| std::string device_name = "test_device" + std::to_string(i); |
| int max_rows = 100; |
| storage::Tablet tablet( |
| device_name, |
| std::make_shared<std::vector<MeasurementSchema>>(schema_vec[i]), |
| max_rows); |
| for (int j = 0; j < measurement_num; j++) { |
| for (int row = 0; row < max_rows; row++) { |
| tablet.add_timestamp(row, 16225600 + row); |
| } |
| for (int row = 0; row < max_rows; row++) { |
| tablet.add_value(row, j, static_cast<int64_t>(row)); |
| } |
| } |
| ASSERT_EQ(tsfile_writer_->write_tablet(tablet), E_OK); |
| } |
| |
| ASSERT_EQ(tsfile_writer_->flush(), E_OK); |
| ASSERT_EQ(tsfile_writer_->close(), E_OK); |
| } |
| |
| TEST_F(TsFileWriterTest, WriteMultipleTabletsDouble) { |
| const int device_num = 50; |
| const int measurement_num = 50; |
| std::vector<MeasurementSchema> schema_vec[50]; |
| |
| for (int i = 0; i < device_num; i++) { |
| std::string device_name = "test_device" + std::to_string(i); |
| for (int j = 0; j < measurement_num; j++) { |
| std::string measure_name = "measurement" + std::to_string(j); |
| schema_vec[i].push_back( |
| MeasurementSchema(measure_name, common::TSDataType::DOUBLE, |
| common::TSEncoding::PLAIN, |
| common::CompressionType::UNCOMPRESSED)); |
| tsfile_writer_->register_timeseries( |
| device_name, storage::MeasurementSchema( |
| measure_name, common::TSDataType::DOUBLE, |
| common::TSEncoding::PLAIN, |
| common::CompressionType::UNCOMPRESSED)); |
| } |
| } |
| |
| for (int i = 0; i < device_num; i++) { |
| std::string device_name = "test_device" + std::to_string(i); |
| int max_rows = 200; |
| storage::Tablet tablet( |
| device_name, |
| std::make_shared<std::vector<MeasurementSchema>>(schema_vec[i]), |
| max_rows); |
| for (int j = 0; j < measurement_num; j++) { |
| for (int row = 0; row < max_rows; row++) { |
| tablet.add_timestamp(row, 16225600 + row); |
| } |
| for (int row = 0; row < max_rows; row++) { |
| tablet.add_value(row, j, static_cast<double>(row) + 1.0); |
| } |
| } |
| ASSERT_EQ(tsfile_writer_->write_tablet(tablet), E_OK); |
| } |
| |
| ASSERT_EQ(tsfile_writer_->flush(), E_OK); |
| ASSERT_EQ(tsfile_writer_->close(), E_OK); |
| } |
| |
| TEST_F(TsFileWriterTest, FlushMultipleDevice) { |
| const int device_num = 50; |
| const int measurement_num = 50; |
| const int max_rows = 100; |
| std::vector<MeasurementSchema> schema_vec[50]; |
| |
| for (int i = 0; i < device_num; i++) { |
| std::string device_name = "test_device" + std::to_string(i); |
| for (int j = 0; j < measurement_num; j++) { |
| std::string measure_name = "measurement" + std::to_string(j); |
| schema_vec[i].emplace_back(measure_name, common::TSDataType::INT64, |
| common::TSEncoding::PLAIN, |
| common::CompressionType::UNCOMPRESSED); |
| tsfile_writer_->register_timeseries( |
| device_name, |
| MeasurementSchema(measure_name, common::TSDataType::INT64, |
| common::TSEncoding::PLAIN, |
| common::CompressionType::UNCOMPRESSED)); |
| } |
| } |
| |
| for (int i = 0; i < device_num; i++) { |
| std::string device_name = "test_device" + std::to_string(i); |
| storage::Tablet tablet( |
| device_name, |
| std::make_shared<std::vector<MeasurementSchema>>(schema_vec[i]), |
| max_rows); |
| for (int j = 0; j < measurement_num; j++) { |
| for (int row = 0; row < max_rows; row++) { |
| tablet.add_timestamp(row, 16225600 + row); |
| } |
| for (int row = 0; row < max_rows; row++) { |
| tablet.add_value(row, j, static_cast<int64_t>(row)); |
| } |
| } |
| ASSERT_EQ(tsfile_writer_->write_tablet(tablet), E_OK); |
| // flush after write tablet to check whether write empty chunk |
| ASSERT_EQ(tsfile_writer_->flush(), E_OK); |
| } |
| ASSERT_EQ(tsfile_writer_->close(), E_OK); |
| |
| std::vector<storage::Path> select_list; |
| for (int i = 0; i < device_num; i++) { |
| std::string device_name = "test_device" + std::to_string(i); |
| for (int j = 0; j < measurement_num; j++) { |
| std::string measurement_name = "measurement" + std::to_string(j); |
| storage::Path path(device_name, measurement_name); |
| select_list.push_back(path); |
| } |
| } |
| storage::QueryExpression *query_expr = |
| storage::QueryExpression::create(select_list, nullptr); |
| |
| storage::TsFileReader reader; |
| int ret = reader.open(file_name_); |
| ASSERT_EQ(ret, common::E_OK); |
| storage::ResultSet *tmp_qds = nullptr; |
| |
| ret = reader.query(query_expr, tmp_qds); |
| auto *qds = (QDSWithoutTimeGenerator *)tmp_qds; |
| |
| storage::RowRecord *record; |
| int64_t cur_record_num = 0; |
| bool has_next = false; |
| do { |
| if (IS_FAIL(qds->next(has_next)) || !has_next) { |
| break; |
| } |
| record = qds->get_row_record(); |
| // if empty chunk is writen, the timestamp should be NULL |
| if (!record) { |
| break; |
| } |
| EXPECT_EQ(record->get_timestamp(), 16225600 + cur_record_num); |
| cur_record_num++; |
| } while (true); |
| EXPECT_EQ(cur_record_num, max_rows); |
| reader.destroy_query_data_set(qds); |
| } |
| |
| TEST_F(TsFileWriterTest, AnalyzeTsfileForload) { |
| const int device_num = 50; |
| const int measurement_num = 50; |
| const int max_rows = 100; |
| std::vector<MeasurementSchema> schema_vec[50]; |
| |
| for (int i = 0; i < device_num; i++) { |
| std::string device_name = "test_device" + std::to_string(i); |
| for (int j = 0; j < measurement_num; j++) { |
| std::string measure_name = "measurement" + std::to_string(j); |
| schema_vec[i].push_back( |
| MeasurementSchema(measure_name, common::TSDataType::INT64, |
| common::TSEncoding::PLAIN, |
| common::CompressionType::UNCOMPRESSED)); |
| tsfile_writer_->register_timeseries( |
| device_name, |
| MeasurementSchema(measure_name, common::TSDataType::INT64, |
| common::TSEncoding::PLAIN, |
| common::CompressionType::UNCOMPRESSED)); |
| } |
| } |
| |
| for (int i = 0; i < device_num; i++) { |
| std::string device_name = "test_device" + std::to_string(i); |
| storage::Tablet tablet( |
| device_name, |
| std::make_shared<std::vector<MeasurementSchema>>(schema_vec[i]), |
| max_rows); |
| for (int j = 0; j < measurement_num; j++) { |
| for (int row = 0; row < max_rows; row++) { |
| tablet.add_timestamp(row, 16225600 + row); |
| } |
| for (int row = 0; row < max_rows; row++) { |
| tablet.add_value(row, j, static_cast<int64_t>(row)); |
| } |
| } |
| ASSERT_EQ(tsfile_writer_->write_tablet(tablet), E_OK); |
| } |
| auto schemas = tsfile_writer_->get_schema_group_map(); |
| ASSERT_EQ(schemas->size(), 50); |
| for (const auto &device_iter : *schemas) { |
| for (const auto &chunk_iter : |
| device_iter.second->measurement_schema_map_) { |
| ASSERT_NE(chunk_iter.second->chunk_writer_, nullptr); |
| ASSERT_TRUE(chunk_iter.second->chunk_writer_->hasData()); |
| } |
| } |
| ASSERT_EQ(tsfile_writer_->flush(), E_OK); |
| ASSERT_EQ(tsfile_writer_->close(), E_OK); |
| } |
| TEST_F(TsFileWriterTest, FlushWithoutWriteAfterRegisterTS) { |
| std::string device_path = "device1"; |
| std::string measurement_name = "temperature"; |
| common::TSDataType data_type = common::TSDataType::INT32; |
| common::TSEncoding encoding = common::TSEncoding::PLAIN; |
| common::CompressionType compression_type = |
| common::CompressionType::UNCOMPRESSED; |
| |
| ASSERT_EQ(tsfile_writer_->register_timeseries( |
| device_path, |
| storage::MeasurementSchema(measurement_name, data_type, |
| encoding, compression_type)), |
| E_OK); |
| ASSERT_EQ(tsfile_writer_->flush(), E_OK); |
| ASSERT_EQ(tsfile_writer_->close(), E_OK); |
| } |
| |
| TEST_F(TsFileWriterTest, WriteAlignedTimeseries) { |
| int measurement_num = 100, row_num = 150; |
| std::string device_name = "device"; |
| std::vector<std::string> measurement_names; |
| for (int i = 0; i < measurement_num; i++) { |
| measurement_names.emplace_back("temperature" + std::to_string(i)); |
| } |
| |
| common::TSDataType data_type = common::TSDataType::INT32; |
| common::TSEncoding encoding = common::TSEncoding::PLAIN; |
| common::CompressionType compression_type = |
| common::CompressionType::UNCOMPRESSED; |
| std::vector<MeasurementSchema *> measurement_schema_vec; |
| for (const auto &measurement_name : measurement_names) { |
| auto *ms = new MeasurementSchema(measurement_name, data_type, encoding, |
| compression_type); |
| measurement_schema_vec.push_back(ms); |
| } |
| tsfile_writer_->register_aligned_timeseries(device_name, |
| measurement_schema_vec); |
| |
| for (int i = 0; i < row_num; ++i) { |
| TsRecord record(1622505600000 + i * 1000, device_name); |
| for (const auto &measurement_name : measurement_names) { |
| record.add_point(measurement_name, (int32_t)i); |
| } |
| ASSERT_EQ(tsfile_writer_->write_record_aligned(record), E_OK); |
| } |
| |
| ASSERT_EQ(tsfile_writer_->flush(), E_OK); |
| ASSERT_EQ(tsfile_writer_->close(), E_OK); |
| |
| std::vector<storage::Path> select_list; |
| for (int i = 0; i < measurement_num; ++i) { |
| std::string measurement_name = "temperature" + std::to_string(i); |
| storage::Path path(device_name, measurement_name); |
| select_list.push_back(path); |
| } |
| storage::QueryExpression *query_expr = |
| storage::QueryExpression::create(select_list, nullptr); |
| |
| storage::TsFileReader reader; |
| int ret = reader.open(file_name_); |
| ASSERT_EQ(ret, common::E_OK); |
| storage::ResultSet *tmp_qds = nullptr; |
| |
| ret = reader.query(query_expr, tmp_qds); |
| auto *qds = (QDSWithoutTimeGenerator *)tmp_qds; |
| |
| storage::RowRecord *record; |
| bool has_next = false; |
| for (int cur_row = 0; cur_row < row_num; cur_row++) { |
| if (IS_FAIL(qds->next(has_next)) || !has_next) { |
| break; |
| } |
| record = qds->get_row_record(); |
| int size = record->get_fields()->size(); |
| for (int i = 0; i < size; ++i) { |
| if (i == 0) { |
| EXPECT_EQ(std::to_string(record->get_timestamp()), |
| field_to_string(record->get_field(i))); |
| continue; |
| } |
| EXPECT_EQ(std::to_string(cur_row), |
| field_to_string(record->get_field(i))); |
| } |
| } |
| reader.destroy_query_data_set(qds); |
| } |
| |
| TEST_F(TsFileWriterTest, WriteAlignedMultiFlush) { |
| int measurement_num = 100, row_num = 100; |
| std::string device_name = "device"; |
| std::vector<std::string> measurement_names; |
| for (int i = 0; i < measurement_num; i++) { |
| measurement_names.emplace_back("temperature" + std::to_string(i)); |
| } |
| |
| common::TSDataType data_type = common::TSDataType::INT32; |
| common::TSEncoding encoding = common::TSEncoding::PLAIN; |
| common::CompressionType compression_type = |
| common::CompressionType::UNCOMPRESSED; |
| std::vector<MeasurementSchema *> measurement_schema_vec; |
| for (const auto &measurement_name : measurement_names) { |
| auto *ms = new MeasurementSchema(measurement_name, data_type, encoding, |
| compression_type); |
| measurement_schema_vec.push_back(ms); |
| } |
| tsfile_writer_->register_aligned_timeseries(device_name, |
| measurement_schema_vec); |
| |
| for (int i = 0; i < row_num; ++i) { |
| TsRecord record(1622505600000 + i * 1000, device_name); |
| for (const auto &measurement_name : measurement_names) { |
| record.add_point(measurement_name, (int32_t)i); |
| } |
| ASSERT_EQ(tsfile_writer_->write_record_aligned(record), E_OK); |
| ASSERT_EQ(tsfile_writer_->flush(), E_OK); |
| } |
| |
| ASSERT_EQ(tsfile_writer_->close(), E_OK); |
| |
| std::vector<storage::Path> select_list; |
| for (int i = 0; i < measurement_num; ++i) { |
| std::string measurement_name = "temperature" + std::to_string(i); |
| storage::Path path(device_name, measurement_name); |
| select_list.push_back(path); |
| } |
| storage::QueryExpression *query_expr = |
| storage::QueryExpression::create(select_list, nullptr); |
| |
| storage::TsFileReader reader; |
| int ret = reader.open(file_name_); |
| ASSERT_EQ(ret, common::E_OK); |
| storage::ResultSet *tmp_qds = nullptr; |
| |
| ret = reader.query(query_expr, tmp_qds); |
| auto *qds = (QDSWithoutTimeGenerator *)tmp_qds; |
| |
| storage::RowRecord *record; |
| bool has_next = false; |
| for (int cur_row = 0; cur_row < row_num; cur_row++) { |
| if (IS_FAIL(qds->next(has_next)) || !has_next) { |
| break; |
| } |
| record = qds->get_row_record(); |
| int size = record->get_fields()->size(); |
| for (int i = 0; i < size; ++i) { |
| if (i == 0) { |
| EXPECT_EQ(std::to_string(record->get_timestamp()), |
| field_to_string(record->get_field(i))); |
| continue; |
| } |
| EXPECT_EQ(std::to_string(cur_row), |
| field_to_string(record->get_field(i))); |
| } |
| } |
| reader.destroy_query_data_set(qds); |
| } |
| |
| TEST_F(TsFileWriterTest, WriteAlignedPartialData) { |
| int measurement_num = 100, row_num = 200; |
| std::string device_name = "device"; |
| std::vector<std::string> measurement_names; |
| for (int i = 0; i < measurement_num; i++) { |
| measurement_names.emplace_back("temperature" + std::to_string(i)); |
| } |
| |
| common::TSDataType data_type = common::TSDataType::INT32; |
| common::TSEncoding encoding = common::TSEncoding::PLAIN; |
| common::CompressionType compression_type = |
| common::CompressionType::UNCOMPRESSED; |
| std::vector<MeasurementSchema *> measurement_schema_vec; |
| for (const auto &measurement_name : measurement_names) { |
| auto *ms = new MeasurementSchema(measurement_name, data_type, encoding, |
| compression_type); |
| measurement_schema_vec.push_back(ms); |
| } |
| tsfile_writer_->register_aligned_timeseries(device_name, |
| measurement_schema_vec); |
| |
| for (int i = 0; i < row_num; ++i) { |
| TsRecord record(1622505600000 + i * 1000, device_name); |
| for (const auto &measurement_name : measurement_names) { |
| record.add_point(measurement_name, (int32_t)i); |
| } |
| ASSERT_EQ(tsfile_writer_->write_record_aligned(record), E_OK); |
| } |
| ASSERT_EQ(tsfile_writer_->flush(), E_OK); |
| ASSERT_EQ(tsfile_writer_->close(), E_OK); |
| |
| std::vector<storage::Path> select_list; |
| for (int i = 0; i < measurement_num; ++i) { |
| std::string measurement_name = "temperature" + std::to_string(i); |
| storage::Path path(device_name, measurement_name); |
| select_list.push_back(path); |
| } |
| storage::QueryExpression *query_expr = |
| storage::QueryExpression::create(select_list, nullptr); |
| |
| storage::TsFileReader reader; |
| int ret = reader.open(file_name_); |
| ASSERT_EQ(ret, common::E_OK); |
| storage::ResultSet *tmp_qds = nullptr; |
| |
| ret = reader.query(query_expr, tmp_qds); |
| auto *qds = (QDSWithoutTimeGenerator *)tmp_qds; |
| |
| storage::RowRecord *record; |
| int64_t cur_row = 0; |
| bool has_next = false; |
| do { |
| if (IS_FAIL(qds->next(has_next)) || !has_next) { |
| break; |
| } |
| record = qds->get_row_record(); |
| int size = record->get_fields()->size(); |
| for (int i = 0; i < size; ++i) { |
| if (i == 0) { |
| EXPECT_EQ(std::to_string(record->get_timestamp()), |
| field_to_string(record->get_field(i))); |
| continue; |
| } |
| EXPECT_EQ(std::to_string(cur_row), |
| field_to_string(record->get_field(i))); |
| } |
| cur_row++; |
| } while (true); |
| reader.destroy_query_data_set(qds); |
| } |
| |
| TEST_F(TsFileWriterTest, WriteTabletDataTypeMismatch) { |
| for (int i = 0; i < 2; i++) { |
| std::string device_name = "test_device" + std::to_string(i); |
| for (int j = 0; j < 3; j++) { |
| std::string measure_name = "measurement" + std::to_string(j); |
| tsfile_writer_->register_timeseries( |
| device_name, storage::MeasurementSchema( |
| measure_name, common::TSDataType::INT32, |
| common::TSEncoding::PLAIN, |
| common::CompressionType::UNCOMPRESSED)); |
| } |
| } |
| |
| std::vector<TSDataType> measurement_types{ |
| TSDataType::INT32, TSDataType::INT64, TSDataType::INT32}; |
| std::vector<std::string> measurement_names{"measurement0", "measurement1", |
| "measurement2"}; |
| |
| Tablet tablet("test_device0", &measurement_names, &measurement_types); |
| for (int row = 0; row < 100; row++) { |
| tablet.add_timestamp(row, row); |
| for (int col = 0; col < 3; col++) { |
| switch (measurement_types[col]) { |
| case TSDataType::INT32: |
| tablet.add_value(row, col, static_cast<int32_t>(row)); |
| break; |
| case TSDataType::INT64: |
| tablet.add_value(row, col, static_cast<int64_t>(row)); |
| break; |
| default:; |
| } |
| } |
| } |
| ASSERT_EQ(E_TYPE_NOT_MATCH, tsfile_writer_->write_tablet(tablet)); |
| std::vector<MeasurementSchema *> measurement_schemas; |
| for (int i = 0; i < 3; i++) { |
| measurement_schemas.push_back(new MeasurementSchema( |
| "measurement" + std::to_string(i), TSDataType::INT32)); |
| } |
| |
| tsfile_writer_->register_aligned_timeseries("device3", measurement_schemas); |
| tablet.set_table_name("device3"); |
| ASSERT_EQ(E_TYPE_NOT_MATCH, tsfile_writer_->write_tablet_aligned(tablet)); |
| ASSERT_EQ(tsfile_writer_->flush(), E_OK); |
| ASSERT_EQ(tsfile_writer_->close(), E_OK); |
| } |