blob: 1e421eca74c50960468bafd0f6747a6307d8beac [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License a
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "reader/tsfile_reader.h"
#include <gtest/gtest.h>
#include <random>
#include <vector>
#include "common/path.h"
#include "common/record.h"
#include "common/schema.h"
#include "common/tablet.h"
#include "file/tsfile_io_writer.h"
#include "file/write_file.h"
#include "reader/qds_without_timegenerator.h"
#include "writer/tsfile_writer.h"
using namespace storage;
using namespace common;
class TsFileReaderTest : public ::testing::Test {
protected:
void SetUp() override {
tsfile_writer_ = new TsFileWriter();
libtsfile_init();
file_name_ = std::string("tsfile_writer_test_") +
generate_random_string(10) + std::string(".tsfile");
remove(file_name_.c_str());
int flags = O_WRONLY | O_CREAT | O_TRUNC;
#ifdef _WIN32
flags |= O_BINARY;
#endif
mode_t mode = 0666;
EXPECT_EQ(tsfile_writer_->open(file_name_, flags, mode), common::E_OK);
}
void TearDown() override {
delete tsfile_writer_;
remove(file_name_.c_str());
}
std::string file_name_;
TsFileWriter* tsfile_writer_ = nullptr;
public:
static std::string generate_random_string(int length) {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(0, 61);
const std::string chars =
"0123456789"
"abcdefghijklmnopqrstuvwxyz"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
std::string random_string;
for (int i = 0; i < length; ++i) {
random_string += chars[dis(gen)];
}
return random_string;
}
static std::string field_to_string(storage::Field* value) {
if (value->type_ == common::TEXT) {
return std::string(value->value_.sval_);
} else {
std::stringstream ss;
switch (value->type_) {
case common::BOOLEAN:
ss << (value->value_.bval_ ? "true" : "false");
break;
case common::INT32:
ss << value->value_.ival_;
break;
case common::INT64:
ss << value->value_.lval_;
break;
case common::FLOAT:
ss << value->value_.fval_;
break;
case common::DOUBLE:
ss << value->value_.dval_;
break;
case common::NULL_TYPE:
ss << "NULL";
break;
default:
ASSERT(false);
break;
}
return ss.str();
}
}
};
TEST_F(TsFileReaderTest, ResultSetMetadata) {
std::string device_path = "device1";
std::string measurement_name = "temperature";
common::TSDataType data_type = common::TSDataType::INT32;
common::TSEncoding encoding = common::TSEncoding::PLAIN;
common::CompressionType compression_type =
common::CompressionType::UNCOMPRESSED;
tsfile_writer_->register_timeseries(
device_path, storage::MeasurementSchema(measurement_name, data_type,
encoding, compression_type));
for (int i = 0; i < 50000; ++i) {
TsRecord record(1622505600000 + i * 1000, device_path);
record.add_point(measurement_name, (int32_t)i);
ASSERT_EQ(tsfile_writer_->write_record(record), E_OK);
ASSERT_EQ(tsfile_writer_->flush(), E_OK);
}
ASSERT_EQ(tsfile_writer_->close(), E_OK);
std::vector<std::string> select_list = {"device1.temperature"};
storage::TsFileReader reader;
int ret = reader.open(file_name_);
ASSERT_EQ(ret, common::E_OK);
storage::ResultSet* tmp_qds = nullptr;
ret = reader.query(select_list, 1622505600000, 1622505600000 + 50000 * 1000,
tmp_qds);
auto* qds = (QDSWithoutTimeGenerator*)tmp_qds;
ResultSetMetadata* result_set_metadaa = qds->get_metadata();
ASSERT_EQ(result_set_metadaa->get_column_type(0), data_type);
ASSERT_EQ(result_set_metadaa->get_column_name(0),
device_path + "." + measurement_name);
reader.destroy_query_data_set(qds);
reader.close();
}
TEST_F(TsFileReaderTest, GetAllDevice) {
std::string measurement_name = "temperature";
common::TSDataType data_type = common::TSDataType::INT32;
common::TSEncoding encoding = common::TSEncoding::PLAIN;
common::CompressionType compression_type =
common::CompressionType::UNCOMPRESSED;
for (size_t i = 0; i < 1024; i++) {
tsfile_writer_->register_timeseries(
"device.ln" + to_string(i),
storage::MeasurementSchema(measurement_name, data_type, encoding,
compression_type));
}
for (size_t i = 0; i < 1024; i++) {
TsRecord record(1622505600000, "device.ln" + to_string(i));
record.add_point(measurement_name, (int32_t)0);
ASSERT_EQ(tsfile_writer_->write_record(record), E_OK);
}
ASSERT_EQ(tsfile_writer_->flush(), E_OK);
ASSERT_EQ(tsfile_writer_->close(), E_OK);
storage::TsFileReader reader;
int ret = reader.open(file_name_);
ASSERT_EQ(ret, common::E_OK);
auto devices = reader.get_all_devices("device");
ASSERT_EQ(devices.size(), 1024);
std::vector<std::shared_ptr<IDeviceID> > devices_name_expected;
for (size_t i = 0; i < 1024; i++) {
devices_name_expected.push_back(
std::make_shared<StringArrayDeviceID>(
"device.ln" + std::to_string(i)));
}
std::sort(devices_name_expected.begin(), devices_name_expected.end(),
[](const std::shared_ptr<IDeviceID>& left_str, const std::shared_ptr<IDeviceID>& right_str) {
return left_str->operator<(*right_str);
});
for (size_t i = 0; i < devices.size(); i++) {
ASSERT_TRUE(devices[i]->operator==(*devices_name_expected[i]));
}
}
TEST_F(TsFileReaderTest, GetTimeseriesSchema) {
std::vector<std::string> device_path = {"device", "device.ln"};
std::vector<std::string> measurement_name = {"temperature", "humidity"};
common::TSDataType data_type = common::TSDataType::INT32;
common::TSEncoding encoding = common::TSEncoding::PLAIN;
common::CompressionType compression_type =
common::CompressionType::UNCOMPRESSED;
tsfile_writer_->register_timeseries(
device_path[0],
storage::MeasurementSchema(measurement_name[0], data_type, encoding,
compression_type));
tsfile_writer_->register_timeseries(
device_path[1],
storage::MeasurementSchema(measurement_name[1], data_type, encoding,
compression_type));
TsRecord record_0(1622505600000, device_path[0]);
record_0.add_point(measurement_name[0], (int32_t)0);
TsRecord record_1(1622505600000, device_path[1]);
record_1.add_point(measurement_name[1], (int32_t)1);
ASSERT_EQ(tsfile_writer_->write_record(record_0), E_OK);
ASSERT_EQ(tsfile_writer_->write_record(record_1), E_OK);
ASSERT_EQ(tsfile_writer_->flush(), E_OK);
ASSERT_EQ(tsfile_writer_->close(), E_OK);
storage::TsFileReader reader;
int ret = reader.open(file_name_);
ASSERT_EQ(ret, common::E_OK);
std::vector<MeasurementSchema> measurement_schemas;
reader.get_timeseries_schema(
std::make_shared<StringArrayDeviceID>(device_path[0]),
measurement_schemas);
ASSERT_EQ(measurement_schemas[0].measurement_name_, measurement_name[0]);
ASSERT_EQ(measurement_schemas[0].data_type_, TSDataType::INT32);
reader.get_timeseries_schema(
std::make_shared<StringArrayDeviceID>(device_path[1]),
measurement_schemas);
ASSERT_EQ(measurement_schemas[1].measurement_name_, measurement_name[1]);
ASSERT_EQ(measurement_schemas[1].data_type_, TSDataType::INT32);
reader.close();
}