| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include "catch.hpp" |
| #include "Session.h" |
| #include "SessionBuilder.h" |
| |
| using namespace std; |
| |
| extern std::shared_ptr<Session> session; |
| |
| static vector<string> testTimeseries = {"root.test.d1.s1", "root.test.d1.s2", "root.test.d1.s3"}; |
| |
| void prepareTimeseries() { |
| for (const string ×eries: testTimeseries) { |
| if (session->checkTimeseriesExists(timeseries)) { |
| session->deleteTimeseries(timeseries); |
| } |
| session->createTimeseries(timeseries, TSDataType::INT64, TSEncoding::RLE, CompressionType::SNAPPY); |
| } |
| } |
| |
| static int global_test_id = 0; |
| class CaseReporter |
| { |
| public: |
| CaseReporter(const char *caseNameArg) : caseName(caseNameArg) |
| { |
| test_id = global_test_id++; |
| std::cout << "Test " << test_id << ": " << caseName << std::endl; |
| } |
| ~CaseReporter() |
| { |
| std::cout << "Test " << test_id << ": " << caseName << " Done"<< std::endl << std::endl; |
| } |
| private: |
| const char *caseName; |
| int test_id; |
| }; |
| |
| TEST_CASE("Create timeseries success", "[createTimeseries]") { |
| CaseReporter cr("createTimeseries"); |
| if (!session->checkTimeseriesExists("root.test.d1.s1")) { |
| session->createTimeseries("root.test.d1.s1", TSDataType::INT64, TSEncoding::RLE, CompressionType::SNAPPY); |
| } |
| REQUIRE(session->checkTimeseriesExists("root.test.d1.s1") == true); |
| session->deleteTimeseries("root.test.d1.s1"); |
| } |
| |
| TEST_CASE("Login Test - Authentication failed with error code 801", "[Authentication]") { |
| CaseReporter cr("Login Test"); |
| |
| try { |
| Session session("127.0.0.1", 6667, "root", "wrong-password"); |
| session.open(false); |
| FAIL("Expected authentication exception"); // Test fails if no exception |
| } catch (const std::exception& e) { |
| // Verify exception contains error code 801 |
| REQUIRE(std::string(e.what()).find("801") != std::string::npos); |
| } |
| } |
| |
| TEST_CASE("Test Session constructor with nodeUrls", "[SessionInitAndOperate]") { |
| CaseReporter cr("SessionInitWithNodeUrls"); |
| |
| std::vector<std::string> nodeUrls = {"127.0.0.1:6667"}; |
| std::shared_ptr<Session> localSession = std::make_shared<Session>(nodeUrls, "root", "root"); |
| localSession->open(); |
| if (!localSession->checkTimeseriesExists("root.test.d1.s1")) { |
| localSession->createTimeseries("root.test.d1.s1", TSDataType::INT64, TSEncoding::RLE, CompressionType::SNAPPY); |
| } |
| REQUIRE(localSession->checkTimeseriesExists("root.test.d1.s1") == true); |
| localSession->deleteTimeseries("root.test.d1.s1"); |
| localSession->close(); |
| } |
| |
| TEST_CASE("Test Session builder with nodeUrls", "[SessionBuilderInit]") { |
| CaseReporter cr("SessionInitWithNodeUrls"); |
| |
| std::vector<std::string> nodeUrls = {"127.0.0.1:6667"}; |
| auto builder = std::unique_ptr<SessionBuilder>(new SessionBuilder()); |
| std::shared_ptr<Session> session = |
| std::shared_ptr<Session>( |
| builder |
| ->username("root") |
| ->password("root") |
| ->nodeUrls(nodeUrls) |
| ->build() |
| ); |
| session->open(); |
| if (!session->checkTimeseriesExists("root.test.d1.s1")) { |
| session->createTimeseries("root.test.d1.s1", TSDataType::INT64, TSEncoding::RLE, CompressionType::SNAPPY); |
| } |
| REQUIRE(session->checkTimeseriesExists("root.test.d1.s1") == true); |
| session->deleteTimeseries("root.test.d1.s1"); |
| session->close(); |
| } |
| |
| TEST_CASE("Delete timeseries success", "[deleteTimeseries]") { |
| CaseReporter cr("deleteTimeseries"); |
| if (!session->checkTimeseriesExists("root.test.d1.s1")) { |
| session->createTimeseries("root.test.d1.s1", TSDataType::INT64, TSEncoding::RLE, CompressionType::SNAPPY); |
| } |
| REQUIRE(session->checkTimeseriesExists("root.test.d1.s1") == true); |
| session->deleteTimeseries("root.test.d1.s1"); |
| REQUIRE(session->checkTimeseriesExists("root.test.d1.s1") == false); |
| } |
| |
| TEST_CASE("Test insertRecord by string", "[testInsertRecord]") { |
| CaseReporter cr("testInsertRecord"); |
| prepareTimeseries(); |
| string deviceId = "root.test.d1"; |
| vector<string> measurements = {"s1", "s2", "s3"}; |
| |
| for (long time = 0; time < 100; time++) { |
| vector<string> values = {"1", "2", "3"}; |
| session->insertRecord(deviceId, time, measurements, values); |
| } |
| |
| session->executeNonQueryStatement("insert into root.test.d1(timestamp,s1, s2, s3) values(100, 1,2,3)"); |
| |
| unique_ptr<SessionDataSet> sessionDataSet = session->executeQueryStatement("select s1,s2,s3 from root.test.d1"); |
| sessionDataSet->setFetchSize(1024); |
| int count = 0; |
| while (sessionDataSet->hasNext()) { |
| long index = 1; |
| count++; |
| auto fields = sessionDataSet->next()->fields; |
| for (const Field &f: fields) { |
| REQUIRE(f.longV.value() == index); |
| index++; |
| } |
| } |
| REQUIRE(count == 101); |
| } |
| |
| TEST_CASE("Test insertRecords ", "[testInsertRecords]") { |
| CaseReporter cr("testInsertRecords"); |
| prepareTimeseries(); |
| string deviceId = "root.test.d1"; |
| vector<string> measurements = {"s1", "s2", "s3"}; |
| vector<string> deviceIds; |
| vector<vector<string>> measurementsList; |
| vector<vector<string>> valuesList; |
| vector<int64_t> timestamps; |
| |
| int64_t COUNT = 500; |
| for (int64_t time = 1; time <= COUNT; time++) { |
| vector<string> values = {"1", "2", "3"}; |
| |
| deviceIds.push_back(deviceId); |
| measurementsList.push_back(measurements); |
| valuesList.push_back(values); |
| timestamps.push_back(time); |
| if (time != 0 && time % 100 == 0) { |
| session->insertRecords(deviceIds, timestamps, measurementsList, valuesList); |
| deviceIds.clear(); |
| measurementsList.clear(); |
| valuesList.clear(); |
| timestamps.clear(); |
| } |
| } |
| |
| if (timestamps.size() > 0) { |
| session->insertRecords(deviceIds, timestamps, measurementsList, valuesList); |
| } |
| |
| unique_ptr<SessionDataSet> sessionDataSet = session->executeQueryStatement("select s1,s2,s3 from root.test.d1"); |
| sessionDataSet->setFetchSize(1024); |
| int count = 0; |
| while (sessionDataSet->hasNext()) { |
| long index = 1; |
| count++; |
| auto fields = sessionDataSet->next()->fields; |
| for (const Field &f: fields) { |
| REQUIRE(f.longV.value() == index); |
| index++; |
| } |
| } |
| REQUIRE(count == COUNT); |
| } |
| |
| TEST_CASE("Test insertRecord with types ", "[testTypedInsertRecord]") { |
| CaseReporter cr("testTypedInsertRecord"); |
| vector<string> timeseries = {"root.test.d1.s1", "root.test.d1.s2", "root.test.d1.s3"}; |
| vector<TSDataType::TSDataType> types = {TSDataType::INT32, TSDataType::DOUBLE, TSDataType::INT64}; |
| |
| for (size_t i = 0; i < timeseries.size(); i++) { |
| if (session->checkTimeseriesExists(timeseries[i])) { |
| session->deleteTimeseries(timeseries[i]); |
| } |
| session->createTimeseries(timeseries[i], types[i], TSEncoding::RLE, CompressionType::SNAPPY); |
| } |
| string deviceId = "root.test.d1"; |
| vector<string> measurements = {"s1", "s2", "s3"}; |
| vector<int32_t> value1(100, 1); |
| vector<double> value2(100, 2.2); |
| vector<int64_t> value3(100, 3); |
| |
| for (long time = 0; time < 100; time++) { |
| vector<char *> values = {(char *) (&value1[time]), (char *) (&value2[time]), (char *) (&value3[time])}; |
| session->insertRecord(deviceId, time, measurements, types, values); |
| } |
| |
| unique_ptr<SessionDataSet> sessionDataSet = session->executeQueryStatement("select s1,s2,s3 from root.test.d1"); |
| sessionDataSet->setFetchSize(1024); |
| long count = 0; |
| while (sessionDataSet->hasNext()) { |
| sessionDataSet->next(); |
| count++; |
| } |
| REQUIRE(count == 100); |
| } |
| |
| |
| TEST_CASE("Test insertRecord with new datatypes ", "[testTypedInsertRecordNewDatatype]") { |
| CaseReporter cr("testTypedInsertRecordNewDatatype"); |
| vector<string> timeseries = {"root.test.d1.s1", "root.test.d1.s2", "root.test.d1.s3", "root.test.d1.s4"}; |
| std::vector<TSDataType::TSDataType> types = {TSDataType::TIMESTAMP, |
| TSDataType::DATE, TSDataType::BLOB, TSDataType::STRING}; |
| |
| for (size_t i = 0; i < timeseries.size(); i++) { |
| if (session->checkTimeseriesExists(timeseries[i])) { |
| session->deleteTimeseries(timeseries[i]); |
| } |
| session->createTimeseries(timeseries[i], types[i], TSEncoding::PLAIN, CompressionType::SNAPPY); |
| } |
| string deviceId = "root.test.d1"; |
| vector<string> measurements = {"s1", "s2", "s3", "s4"}; |
| int64_t value1 = 20250507; |
| boost::gregorian::date value2 = boost::gregorian::date(2025, 5, 7); |
| string value3 = "20250507"; |
| string value4 = "20250507"; |
| |
| for (long time = 0; time < 100; time++) { |
| vector<char *> values = {(char *) (&value1), (char *) (&value2), |
| const_cast<char*>(value3.c_str()), const_cast<char*>(value4.c_str())}; |
| session->insertRecord(deviceId, time, measurements, types, values); |
| } |
| |
| unique_ptr<SessionDataSet> sessionDataSet = session->executeQueryStatement("select s1,s2,s3,s4 from root.test.d1"); |
| sessionDataSet->setFetchSize(1024); |
| long count = 0; |
| while (sessionDataSet->hasNext()) { |
| auto record = sessionDataSet->next(); |
| REQUIRE(record->fields.size() == 4); |
| for (int i = 0; i < 4; i++) { |
| REQUIRE(types[i] == record->fields[i].dataType); |
| } |
| REQUIRE(record->fields[0].longV.value() == value1); |
| REQUIRE(record->fields[1].dateV.value() == value2); |
| REQUIRE(record->fields[2].stringV.value() == value3); |
| REQUIRE(record->fields[3].stringV.value() == value4); |
| count++; |
| } |
| REQUIRE(count == 100); |
| } |
| |
| TEST_CASE("Test insertRecords with types ", "[testTypedInsertRecords]") { |
| CaseReporter cr("testTypedInsertRecords"); |
| vector<string> timeseries = {"root.test.d1.s1", "root.test.d1.s2", "root.test.d1.s3"}; |
| vector<TSDataType::TSDataType> types = {TSDataType::INT32, TSDataType::DOUBLE, TSDataType::INT64}; |
| |
| for (size_t i = 0; i < timeseries.size(); i++) { |
| if (session->checkTimeseriesExists(timeseries[i])) { |
| session->deleteTimeseries(timeseries[i]); |
| } |
| session->createTimeseries(timeseries[i], types[i], TSEncoding::RLE, CompressionType::SNAPPY); |
| } |
| string deviceId = "root.test.d1"; |
| vector<string> measurements = {"s1", "s2", "s3"}; |
| vector<string> deviceIds; |
| vector<vector<string>> measurementsList; |
| vector<vector<TSDataType::TSDataType>> typesList; |
| vector<vector<char *>> valuesList; |
| vector<int64_t> timestamps; |
| vector<int32_t> value1(100, 1); |
| vector<double> value2(100, 2.2); |
| vector<int64_t> value3(100, 3); |
| |
| for (int64_t time = 0; time < 100; time++) { |
| vector<char *> values = {(char *) (&value1[time]), (char *) (&value2[time]), (char *) (&value3[time])}; |
| deviceIds.push_back(deviceId); |
| measurementsList.push_back(measurements); |
| typesList.push_back(types); |
| valuesList.push_back(values); |
| timestamps.push_back(time); |
| } |
| |
| session->insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList); |
| |
| unique_ptr<SessionDataSet> sessionDataSet = session->executeQueryStatement("select s1,s2,s3 from root.test.d1"); |
| sessionDataSet->setFetchSize(1024); |
| int count = 0; |
| while (sessionDataSet->hasNext()) { |
| sessionDataSet->next(); |
| count++; |
| } |
| REQUIRE(count == 100); |
| } |
| |
| TEST_CASE("Test insertRecordsOfOneDevice", "[testInsertRecordsOfOneDevice]") { |
| CaseReporter cr("testInsertRecordsOfOneDevice"); |
| vector<string> timeseries = {"root.test.d1.s1", "root.test.d1.s2", "root.test.d1.s3"}; |
| vector<TSDataType::TSDataType> types = {TSDataType::INT32, TSDataType::DOUBLE, TSDataType::INT64}; |
| |
| for (size_t i = 0; i < timeseries.size(); i++) { |
| if (session->checkTimeseriesExists(timeseries[i])) { |
| session->deleteTimeseries(timeseries[i]); |
| } |
| session->createTimeseries(timeseries[i], types[i], TSEncoding::RLE, CompressionType::SNAPPY); |
| } |
| string deviceId = "root.test.d1"; |
| vector<string> measurements = {"s1", "s2", "s3"}; |
| vector<vector<string>> measurementsList; |
| vector<vector<TSDataType::TSDataType>> typesList; |
| vector<vector<char *>> valuesList; |
| vector<int64_t> timestamps; |
| vector<int32_t> value1(100, 1); |
| vector<double> value2(100, 2.2); |
| vector<int64_t> value3(100, 3); |
| |
| for (int64_t time = 0; time < 100; time++) { |
| vector<char *> values = {(char *) (&value1[time]), (char *) (&value2[time]), (char *) (&value3[time])}; |
| measurementsList.push_back(measurements); |
| typesList.push_back(types); |
| valuesList.push_back(values); |
| timestamps.push_back(time); |
| } |
| |
| session->insertRecordsOfOneDevice(deviceId, timestamps, measurementsList, typesList, valuesList); |
| |
| unique_ptr<SessionDataSet> sessionDataSet = session->executeQueryStatement("select * from root.test.d1"); |
| sessionDataSet->setFetchSize(1024); |
| int count = 0; |
| while (sessionDataSet->hasNext()) { |
| sessionDataSet->next(); |
| count++; |
| } |
| REQUIRE(count == 100); |
| } |
| |
| TEST_CASE("Test insertTablet ", "[testInsertTablet]") { |
| CaseReporter cr("testInsertTablet"); |
| prepareTimeseries(); |
| string deviceId = "root.test.d1"; |
| vector<pair<string, TSDataType::TSDataType>> schemaList; |
| schemaList.emplace_back("s1", TSDataType::INT64); |
| schemaList.emplace_back("s2", TSDataType::INT64); |
| schemaList.emplace_back("s3", TSDataType::INT64); |
| |
| Tablet tablet(deviceId, schemaList, 100); |
| for (int64_t time = 0; time < 100; time++) { |
| int row = tablet.rowSize++; |
| tablet.timestamps[row] = time; |
| for (int64_t i = 0; i < 3; i++) { |
| tablet.addValue(i, row, i); |
| } |
| if (tablet.rowSize == tablet.maxRowNumber) { |
| session->insertTablet(tablet); |
| tablet.reset(); |
| } |
| } |
| |
| if (tablet.rowSize != 0) { |
| session->insertTablet(tablet); |
| tablet.reset(); |
| } |
| unique_ptr<SessionDataSet> sessionDataSet = session->executeQueryStatement("select s1,s2,s3 from root.test.d1"); |
| sessionDataSet->setFetchSize(1024); |
| int count = 0; |
| while (sessionDataSet->hasNext()) { |
| long index = 0; |
| count++; |
| auto fields = sessionDataSet->next()->fields; |
| for (const Field &f: fields) { |
| REQUIRE(f.longV.value() == index); |
| index++; |
| } |
| } |
| REQUIRE(count == 100); |
| } |
| |
| TEST_CASE("Test insertTablets ", "[testInsertTablets]") { |
| CaseReporter cr("testInsertTablets"); |
| vector<string> testTimeseries = {"root.test.d1.s1", "root.test.d1.s2", "root.test.d1.s3", |
| "root.test.d2.s1", "root.test.d2.s2", "root.test.d2.s3"}; |
| for (const string ×eries: testTimeseries) { |
| if (session->checkTimeseriesExists(timeseries)) { |
| session->deleteTimeseries(timeseries); |
| } |
| session->createTimeseries(timeseries, TSDataType::INT64, TSEncoding::RLE, CompressionType::SNAPPY); |
| } |
| vector<pair<string, TSDataType::TSDataType>> schemaList; |
| schemaList.emplace_back("s1", TSDataType::INT64); |
| schemaList.emplace_back("s2", TSDataType::INT64); |
| schemaList.emplace_back("s3", TSDataType::INT64); |
| |
| int maxRowNumber = 100; |
| vector<string> deviceIds = {"root.test.d1", "root.test.d2"}; |
| vector<Tablet> tablets; |
| for (const auto& deviceId: deviceIds) { |
| tablets.emplace_back(deviceId, schemaList, maxRowNumber); |
| } |
| for (auto& tablet : tablets) { |
| for (int64_t time = 0; time < maxRowNumber; time++) { |
| int row = tablet.rowSize++; |
| tablet.timestamps[row] = time; |
| for (int64_t i = 0; i < 3; i++) { |
| tablet.addValue(i, row, i); |
| } |
| } |
| } |
| unordered_map<string, Tablet*> tabletsMap; |
| for (auto& tablet : tablets) { |
| tabletsMap[tablet.deviceId] = &tablet; |
| } |
| session->insertTablets(tabletsMap); |
| |
| unique_ptr<SessionDataSet> sessionDataSet = session->executeQueryStatement("select s1,s2,s3 from root.test.d2"); |
| sessionDataSet->setFetchSize(1024); |
| int count = 0; |
| while (sessionDataSet->hasNext()) { |
| long index = 0; |
| count++; |
| auto fields = sessionDataSet->next()->fields; |
| for (const Field &f: fields) { |
| REQUIRE(f.longV.value() == index); |
| index++; |
| } |
| } |
| REQUIRE(count == 100); |
| } |
| |
| TEST_CASE("Test insertTablet multi datatype", "[testInsertTabletMultiDatatype]") { |
| CaseReporter cr("testInsertTabletNewDatatype"); |
| string deviceId = "root.test.d2"; |
| vector<pair<string, TSDataType::TSDataType>> schemaList; |
| std::vector<std::string> measurements = {"s1", "s2", "s3", "s4"}; |
| std::vector<TSDataType::TSDataType> dataTypes = {TSDataType::TIMESTAMP, |
| TSDataType::DATE, TSDataType::BLOB, TSDataType::STRING}; |
| for (int i = 0; i < 4; i++) { |
| schemaList.emplace_back(measurements[i], dataTypes[i]); |
| } |
| |
| for (int i = 0; i < 4; i++) { |
| auto timeseries = deviceId + "." + measurements[i]; |
| if (session->checkTimeseriesExists(timeseries)) { |
| session->deleteTimeseries(timeseries); |
| } |
| session->createTimeseries(timeseries, dataTypes[i], TSEncoding::PLAIN, CompressionType::UNCOMPRESSED); |
| } |
| |
| int64_t s1Value = 20250507; |
| boost::gregorian::date s2Value(2025, 5, 7); |
| std::string s3Value("20250507"); |
| std::string s4Value("20250507"); |
| |
| Tablet tablet(deviceId, schemaList, 100); |
| for (int64_t time = 0; time < 100; time++) { |
| int row = tablet.rowSize++; |
| tablet.timestamps[row] = time; |
| tablet.addValue(0, row, s1Value); |
| tablet.addValue(1, row, s2Value); |
| tablet.addValue(2, row, s3Value); |
| tablet.addValue(3, row, s4Value); |
| if (tablet.rowSize == tablet.maxRowNumber) { |
| session->insertTablet(tablet); |
| tablet.reset(); |
| } |
| } |
| |
| if (tablet.rowSize != 0) { |
| session->insertTablet(tablet); |
| tablet.reset(); |
| } |
| unique_ptr<SessionDataSet> sessionDataSet = session->executeQueryStatement("select s1,s2,s3,s4 from root.test.d2"); |
| auto dataIter = sessionDataSet->getIterator(); |
| sessionDataSet->setFetchSize(1024); |
| int count = 0; |
| while (dataIter.next()) { |
| REQUIRE(dataIter.getLongByIndex(2).value() == s1Value); |
| REQUIRE(dataIter.getDateByIndex(3).value() == s2Value); |
| REQUIRE(dataIter.getStringByIndex(4).value() == s3Value); |
| REQUIRE(dataIter.getStringByIndex(5).value() == s4Value); |
| count++; |
| } |
| REQUIRE(count == 100); |
| } |
| |
| TEST_CASE("Test Last query ", "[testLastQuery]") { |
| CaseReporter cr("testLastQuery"); |
| prepareTimeseries(); |
| string deviceId = "root.test.d1"; |
| vector<string> measurements = {"s1", "s2", "s3"}; |
| |
| for (long time = 0; time < 100; time++) { |
| vector<string> values = {"1", "2", "3"}; |
| session->insertRecord(deviceId, time, measurements, values); |
| } |
| |
| vector<string> measurementValues = {"1", "2", "3"}; |
| unique_ptr<SessionDataSet> sessionDataSet = session->executeQueryStatement( |
| "select last s1,s2,s3 from root.test.d1"); |
| sessionDataSet->setFetchSize(1024); |
| long index = 0; |
| while (sessionDataSet->hasNext()) { |
| vector<Field> fields = sessionDataSet->next()->fields; |
| REQUIRE("1" <= fields[1].stringV.value()); |
| REQUIRE(fields[1].stringV.value() <= "3"); |
| index++; |
| } |
| } |
| |
| TEST_CASE("Test Huge query ", "[testHugeQuery]") { |
| CaseReporter cr("testHugeQuery"); |
| prepareTimeseries(); |
| string deviceId = "root.test.d1"; |
| vector<string> measurements = {"s1", "s2", "s3"}; |
| vector<TSDataType::TSDataType> types = {TSDataType::INT64, TSDataType::INT64, TSDataType::INT64}; |
| int64_t value1 = 1, value2 = 2, value3 = 3; |
| vector<char*> values = {(char*)&value1, (char*)&value2, (char*)&value3}; |
| |
| long total_count = 500000; |
| int print_count = 0; |
| std::cout.width(7); |
| std::cout << "inserting " << total_count << " rows:" << std::endl; |
| for (long time = 0; time < total_count; time++) { |
| session->insertRecord(deviceId, time, measurements, types, values); |
| if (time != 0 && time % 1000 == 0) { |
| std::cout << time << "\t" << std::flush; |
| if (++print_count % 20 == 0) { |
| std::cout << std::endl; |
| } |
| } |
| } |
| |
| unique_ptr<SessionDataSet> sessionDataSet = session->executeQueryStatement("select s1,s2,s3 from root.test.d1"); |
| sessionDataSet->setFetchSize(1024); |
| int count = 0; |
| print_count = 0; |
| std::cout << "\n\niterating " << total_count << " rows:" << std::endl; |
| while (sessionDataSet->hasNext()) { |
| auto rowRecord = sessionDataSet->next(); |
| REQUIRE(rowRecord->timestamp == count); |
| REQUIRE(rowRecord->fields[0].longV.value() == 1); |
| REQUIRE(rowRecord->fields[1].longV.value() == 2); |
| REQUIRE(rowRecord->fields[2].longV.value() == 3); |
| count++; |
| if (count % 1000 == 0) { |
| std::cout << count << "\t" << std::flush; |
| if (++print_count % 20 == 0) { |
| std::cout << std::endl; |
| } |
| } |
| } |
| |
| REQUIRE(count == total_count); |
| } |
| |
| |
| TEST_CASE("Test executeRawDataQuery ", "[executeRawDataQuery]") { |
| CaseReporter cr("executeRawDataQuery"); |
| prepareTimeseries(); |
| |
| string deviceId = "root.test.d1"; |
| vector<string> measurements = {"s1", "s2", "s3"}; |
| vector<TSDataType::TSDataType> types = {TSDataType::INT64, TSDataType::INT64, TSDataType::INT64}; |
| |
| long total_count = 5000; |
| vector<char*> values; |
| int64_t valueArray[3]; |
| for (long time = -total_count; time < total_count; time++) { |
| valueArray[0] = time; |
| valueArray[1] = time * 2; |
| valueArray[2] = time * 3; |
| values.clear(); |
| values.push_back((char*)&valueArray[0]); |
| values.push_back((char*)&valueArray[1]); |
| values.push_back((char*)&valueArray[2]); |
| session->insertRecord(deviceId, time, measurements, types, values); |
| if (time == 100) { //insert 1 big timestamp data for generate un-seq data. |
| valueArray[0] = 9; |
| valueArray[2] = 999; |
| values.clear(); |
| values.push_back((char*)&valueArray[0]); |
| values.push_back((char*)&valueArray[2]); |
| vector<string> measurements2 = {"s1", "s3"}; |
| vector<TSDataType::TSDataType> types2 = {TSDataType::INT64, TSDataType::INT64}; |
| session->insertRecord(deviceId, 99999, measurements2, types2, values); |
| } |
| } |
| |
| vector<string> paths; |
| paths.push_back("root.test.d1.s1"); |
| paths.push_back("root.test.d1.s2"); |
| paths.push_back("root.test.d1.s3"); |
| |
| //== Test executeRawDataQuery() with negative timestamp |
| int startTs = -total_count, endTs = total_count; |
| unique_ptr<SessionDataSet> sessionDataSet = session->executeRawDataQuery(paths, startTs, endTs); |
| sessionDataSet->setFetchSize(10); |
| vector<string> columns = sessionDataSet->getColumnNames(); |
| columns = sessionDataSet->getColumnNames(); |
| REQUIRE(columns[0] == "Time"); |
| REQUIRE(columns[1] == paths[0]); |
| REQUIRE(columns[2] == paths[1]); |
| REQUIRE(columns[3] == paths[2]); |
| |
| int ts = startTs; |
| while (sessionDataSet->hasNext()) { |
| auto rowRecordPtr = sessionDataSet->next(); |
| //cout << rowRecordPtr->toString(); |
| |
| vector<Field> fields = rowRecordPtr->fields; |
| REQUIRE(rowRecordPtr->timestamp == ts); |
| REQUIRE(fields[0].dataType == TSDataType::INT64); |
| REQUIRE(fields[0].longV.value() == ts); |
| REQUIRE(fields[1].dataType == TSDataType::INT64); |
| REQUIRE(fields[1].longV.value() == ts * 2); |
| REQUIRE(fields[2].dataType == TSDataType::INT64); |
| REQUIRE(fields[2].longV.value() == ts *3); |
| ts++; |
| } |
| |
| |
| //== Test executeRawDataQuery() with null field |
| startTs = 99999; |
| endTs = 99999 + 10; |
| sessionDataSet = session->executeRawDataQuery(paths, startTs, endTs); |
| |
| sessionDataSet->setFetchSize(10); |
| columns = sessionDataSet->getColumnNames(); |
| for (const string &column : columns) { |
| cout << column << " " ; |
| } |
| cout << endl; |
| REQUIRE(columns[0] == "Time"); |
| REQUIRE(columns[1] == paths[0]); |
| REQUIRE(columns[2] == paths[1]); |
| REQUIRE(columns[3] == paths[2]); |
| ts = startTs; |
| while (sessionDataSet->hasNext()) { |
| auto rowRecordPtr = sessionDataSet->next(); |
| cout << rowRecordPtr->toString(); |
| |
| vector<Field> fields = rowRecordPtr->fields; |
| REQUIRE(rowRecordPtr->timestamp == ts); |
| REQUIRE(fields[0].dataType == TSDataType::INT64); |
| REQUIRE(fields[0].longV.value() == 9); |
| REQUIRE(fields[1].dataType == TSDataType::UNKNOWN); |
| REQUIRE(fields[2].dataType == TSDataType::INT64); |
| REQUIRE(fields[2].longV.value() == 999); |
| } |
| |
| //== Test executeRawDataQuery() with empty data |
| sessionDataSet = session->executeRawDataQuery(paths, 100000, 110000); |
| sessionDataSet->setFetchSize(1); |
| REQUIRE(sessionDataSet->hasNext() == false); |
| } |
| |
| TEST_CASE("Test executeLastDataQuery ", "[testExecuteLastDataQuery]") { |
| CaseReporter cr("testExecuteLastDataQuery"); |
| prepareTimeseries(); |
| |
| string deviceId = "root.test.d1"; |
| vector<string> measurements = {"s1", "s2", "s3"}; |
| vector<TSDataType::TSDataType> types = {TSDataType::INT64, TSDataType::INT64, TSDataType::INT64}; |
| |
| long total_count = 5000; |
| vector<char*> values; |
| int64_t valueArray[3]; |
| for (long time = -total_count; time < total_count; time++) { |
| valueArray[0] = time; |
| valueArray[1] = time * 2; |
| valueArray[2] = time * 3; |
| values.clear(); |
| values.push_back((char*)&valueArray[0]); |
| values.push_back((char*)&valueArray[1]); |
| values.push_back((char*)&valueArray[2]); |
| session->insertRecord(deviceId, time, measurements, types, values); |
| if (time == 100) { //insert 1 big timestamp data for gen unseq data. |
| valueArray[0] = 9; |
| valueArray[2] = 999; |
| values.clear(); |
| values.push_back((char*)&valueArray[0]); |
| values.push_back((char*)&valueArray[2]); |
| vector<string> measurements2 = {"s1", "s3"}; |
| vector<TSDataType::TSDataType> types2 = {TSDataType::INT64, TSDataType::INT64}; |
| session->insertRecord(deviceId, 99999, measurements2, types2, values); |
| } |
| } |
| |
| int64_t tsCheck[3] = {99999, 4999, 99999}; |
| std::vector<std::string> valueCheck = {"9", "9998", "999"}; |
| |
| vector<string> paths; |
| paths.push_back("root.test.d1.s1"); |
| paths.push_back("root.test.d1.s2"); |
| paths.push_back("root.test.d1.s3"); |
| |
| //== Test executeLastDataQuery() without lastTime |
| unique_ptr<SessionDataSet> sessionDataSet = session->executeLastDataQuery(paths); |
| sessionDataSet->setFetchSize(1); |
| |
| vector<string> columns = sessionDataSet->getColumnNames(); |
| for (const string &column : columns) { |
| cout << column << " " ; |
| } |
| cout << endl; |
| |
| int index = 0; |
| while (sessionDataSet->hasNext()) { |
| auto rowRecordPtr = sessionDataSet->next(); |
| cout << rowRecordPtr->toString(); |
| |
| vector<Field> fields = rowRecordPtr->fields; |
| REQUIRE(rowRecordPtr->timestamp == tsCheck[index]); |
| REQUIRE(fields[0].stringV.value() == paths[index]); |
| REQUIRE(fields[1].stringV.value() == valueCheck[index]); |
| REQUIRE(fields[2].stringV.value() == "INT64"); |
| index++; |
| } |
| |
| //== Test executeLastDataQuery() with negative lastTime |
| sessionDataSet = session->executeLastDataQuery(paths, -200); |
| sessionDataSet->setFetchSize(1); |
| columns = sessionDataSet->getColumnNames(); |
| for (const string &column : columns) { |
| cout << column << " " ; |
| } |
| cout << endl; |
| |
| index = 0; |
| while (sessionDataSet->hasNext()) { |
| auto rowRecordPtr = sessionDataSet->next(); |
| cout << rowRecordPtr->toString(); |
| |
| vector<Field> fields = rowRecordPtr->fields; |
| REQUIRE(rowRecordPtr->timestamp == tsCheck[index]); |
| REQUIRE(fields[0].stringV.value() == paths[index]); |
| REQUIRE(fields[1].stringV.value() == valueCheck[index]); |
| REQUIRE(fields[2].stringV.value() == "INT64"); |
| index++; |
| } |
| |
| //== Test executeLastDataQuery() with the lastTime that is > largest timestamp. |
| sessionDataSet = session->executeLastDataQuery(paths, 100000); |
| sessionDataSet->setFetchSize(1024); |
| REQUIRE(sessionDataSet->hasNext() == false); |
| } |
| |
| // Helper function for comparing TEndPoint with detailed error message |
| void assertTEndPointEqual(const TEndPoint& actual, |
| const std::string& expectedIp, |
| int expectedPort, |
| const char* file, |
| int line) { |
| if (actual.ip != expectedIp || actual.port != expectedPort) { |
| std::stringstream ss; |
| ss << "\nTEndPoint mismatch:\nExpected: " << expectedIp << ":" << expectedPort |
| << "\nActual: " << actual.ip << ":" << actual.port; |
| Catch::SourceLineInfo location(file, line); |
| Catch::AssertionHandler handler("TEndPoint comparison", location, ss.str(), Catch::ResultDisposition::Normal); |
| handler.handleMessage(Catch::ResultWas::ExplicitFailure, ss.str()); |
| handler.complete(); |
| } |
| } |
| |
| // Macro to simplify test assertions |
| #define REQUIRE_TENDPOINT(actual, expectedIp, expectedPort) \ |
| assertTEndPointEqual(actual, expectedIp, expectedPort, __FILE__, __LINE__) |
| |
| TEST_CASE("UrlUtils - parseTEndPointIpv4AndIpv6Url", "[UrlUtils]") { |
| // Test valid IPv4 addresses |
| SECTION("Valid IPv4") { |
| REQUIRE_TENDPOINT(UrlUtils::parseTEndPointIpv4AndIpv6Url("192.168.1.1:8080"), "192.168.1.1", 8080); |
| REQUIRE_TENDPOINT(UrlUtils::parseTEndPointIpv4AndIpv6Url("10.0.0.1:80"), "10.0.0.1", 80); |
| } |
| |
| // Test valid IPv6 addresses |
| SECTION("Valid IPv6") { |
| REQUIRE_TENDPOINT(UrlUtils::parseTEndPointIpv4AndIpv6Url("[2001:db8::1]:8080"), "2001:db8::1", 8080); |
| REQUIRE_TENDPOINT(UrlUtils::parseTEndPointIpv4AndIpv6Url("[::1]:80"), "::1", 80); |
| } |
| |
| // Test hostnames |
| SECTION("Hostnames") { |
| REQUIRE_TENDPOINT(UrlUtils::parseTEndPointIpv4AndIpv6Url("localhost:8080"), "localhost", 8080); |
| REQUIRE_TENDPOINT(UrlUtils::parseTEndPointIpv4AndIpv6Url("example.com:443"), "example.com", 443); |
| } |
| |
| // Test edge cases |
| SECTION("Edge cases") { |
| REQUIRE_TENDPOINT(UrlUtils::parseTEndPointIpv4AndIpv6Url(""), "", 0); |
| REQUIRE_TENDPOINT(UrlUtils::parseTEndPointIpv4AndIpv6Url("127.0.0.1"), "127.0.0.1", 0); |
| } |
| |
| // Test invalid inputs |
| SECTION("Invalid inputs") { |
| REQUIRE_TENDPOINT(UrlUtils::parseTEndPointIpv4AndIpv6Url("192.168.1.1:abc"), "192.168.1.1:abc", 0); |
| REQUIRE_TENDPOINT(UrlUtils::parseTEndPointIpv4AndIpv6Url("]invalid[:80"), "]invalid[", 80); |
| } |
| |
| // Test port ranges |
| SECTION("Port ranges") { |
| REQUIRE_TENDPOINT(UrlUtils::parseTEndPointIpv4AndIpv6Url("localhost:0"), "localhost", 0); |
| REQUIRE_TENDPOINT(UrlUtils::parseTEndPointIpv4AndIpv6Url("127.0.0.1:65535"), "127.0.0.1", 65535); |
| } |
| } |