| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include "catch.hpp" |
| #include "Session.h" |
| |
| using namespace std; |
| |
| extern Session *session; |
| |
| static vector<string> testTimeseries = {"root.test.d1.s1", "root.test.d1.s2", "root.test.d1.s3"}; |
| |
| void prepareTimeseries() { |
| for (const string ×eries: testTimeseries) { |
| if (session->checkTimeseriesExists(timeseries)) { |
| session->deleteTimeseries(timeseries); |
| } |
| session->createTimeseries(timeseries, TSDataType::INT64, TSEncoding::RLE, CompressionType::SNAPPY); |
| } |
| } |
| |
| TEST_CASE("Create timeseries success", "[createTimeseries]") { |
| if (!session->checkTimeseriesExists("root.test.d1.s1")) { |
| session->createTimeseries("root.test.d1.s1", TSDataType::INT64, TSEncoding::RLE, CompressionType::SNAPPY); |
| } |
| REQUIRE(session->checkTimeseriesExists("root.test.d1.s1") == true); |
| session->deleteTimeseries("root.test.d1.s1"); |
| } |
| |
| TEST_CASE("Delete timeseries success", "[deleteTimeseries]") { |
| if (!session->checkTimeseriesExists("root.test.d1.s1")) { |
| session->createTimeseries("root.test.d1.s1", TSDataType::INT64, TSEncoding::RLE, CompressionType::SNAPPY); |
| } |
| REQUIRE(session->checkTimeseriesExists("root.test.d1.s1") == true); |
| session->deleteTimeseries("root.test.d1.s1"); |
| REQUIRE(session->checkTimeseriesExists("root.test.d1.s1") == false); |
| } |
| |
| TEST_CASE("Test insertRecord by string", "[testInsertRecord]") { |
| prepareTimeseries(); |
| string deviceId = "root.test.d1"; |
| vector<string> measurements = {"s1", "s2", "s3"}; |
| |
| for (long time = 0; time < 100; time++) { |
| vector<string> values = {"1", "2", "3"}; |
| session->insertRecord(deviceId, time, measurements, values); |
| } |
| |
| session->executeNonQueryStatement("insert into root.test.d1(timestamp,s1, s2, s3) values(100, 1,2,3)"); |
| |
| unique_ptr<SessionDataSet> sessionDataSet = session->executeQueryStatement("select * from root.test.d1"); |
| sessionDataSet->setBatchSize(1024); |
| int count = 0; |
| while (sessionDataSet->hasNext()) { |
| long index = 1; |
| count++; |
| for (const Field &f: sessionDataSet->next()->fields) { |
| REQUIRE(f.longV == index); |
| index++; |
| } |
| } |
| REQUIRE(count == 101); |
| } |
| |
| TEST_CASE("Test insertRecords ", "[testInsertRecords]") { |
| prepareTimeseries(); |
| string deviceId = "root.test.d1"; |
| vector<string> measurements = {"s1", "s2", "s3"}; |
| vector<string> deviceIds; |
| vector<vector<string>> measurementsList; |
| vector<vector<string>> valuesList; |
| vector<int64_t> timestamps; |
| |
| for (int64_t time = 0; time < 500; time++) { |
| vector<string> values = {"1", "2", "3"}; |
| |
| deviceIds.push_back(deviceId); |
| measurementsList.push_back(measurements); |
| valuesList.push_back(values); |
| timestamps.push_back(time); |
| if (time != 0 && time % 100 == 0) { |
| session->insertRecords(deviceIds, timestamps, measurementsList, valuesList); |
| deviceIds.clear(); |
| measurementsList.clear(); |
| valuesList.clear(); |
| timestamps.clear(); |
| } |
| } |
| |
| session->insertRecords(deviceIds, timestamps, measurementsList, valuesList); |
| |
| unique_ptr<SessionDataSet> sessionDataSet = session->executeQueryStatement("select * from root.test.d1"); |
| sessionDataSet->setBatchSize(1024); |
| int count = 0; |
| while (sessionDataSet->hasNext()) { |
| long index = 1; |
| count++; |
| for (const Field &f: sessionDataSet->next()->fields) { |
| REQUIRE(f.longV == index); |
| index++; |
| } |
| } |
| REQUIRE(count == 500); |
| } |
| |
| TEST_CASE("Test insertRecord with types ", "[testTypedInsertRecord]") { |
| vector<string> timeseries = {"root.test.d1.s1", "root.test.d1.s2", "root.test.d1.s3"}; |
| vector<TSDataType::TSDataType> types = {TSDataType::INT32, TSDataType::DOUBLE, TSDataType::INT64}; |
| |
| for (int i = 0; i < timeseries.size(); i++) { |
| if (session->checkTimeseriesExists(timeseries[i])) { |
| session->deleteTimeseries(timeseries[i]); |
| } |
| session->createTimeseries(timeseries[i], types[i], TSEncoding::RLE, CompressionType::SNAPPY); |
| } |
| string deviceId = "root.test.d1"; |
| vector<string> measurements = {"s1", "s2", "s3"}; |
| vector<int32_t> value1(100, 1); |
| vector<double> value2(100, 2.2); |
| vector<int64_t> value3(100, 3); |
| |
| for (long time = 0; time < 100; time++) { |
| vector<char *> values = {(char *) (&value1[time]), (char *) (&value2[time]), (char *) (&value3[time])}; |
| session->insertRecord(deviceId, time, measurements, types, values); |
| } |
| |
| unique_ptr<SessionDataSet> sessionDataSet = session->executeQueryStatement("select s1,s2,s3 from root.test.d1"); |
| sessionDataSet->setBatchSize(1024); |
| long count = 0; |
| while (sessionDataSet->hasNext()) { |
| sessionDataSet->next(); |
| count++; |
| } |
| REQUIRE(count == 100); |
| } |
| |
| TEST_CASE("Test insertRecords with types ", "[testTypedInsertRecords]") { |
| vector<string> timeseries = {"root.test.d1.s1", "root.test.d1.s2", "root.test.d1.s3"}; |
| vector<TSDataType::TSDataType> types = {TSDataType::INT32, TSDataType::DOUBLE, TSDataType::INT64}; |
| |
| for (int i = 0; i < timeseries.size(); i++) { |
| if (session->checkTimeseriesExists(timeseries[i])) { |
| session->deleteTimeseries(timeseries[i]); |
| } |
| session->createTimeseries(timeseries[i], types[i], TSEncoding::RLE, CompressionType::SNAPPY); |
| } |
| string deviceId = "root.test.d1"; |
| vector<string> measurements = {"s1", "s2", "s3"}; |
| vector<string> deviceIds; |
| vector<vector<string>> measurementsList; |
| vector<vector<TSDataType::TSDataType>> typesList; |
| vector<vector<char *>> valuesList; |
| vector<int64_t> timestamps; |
| vector<int32_t> value1(100, 1); |
| vector<double> value2(100, 2.2); |
| vector<int64_t> value3(100, 3); |
| |
| for (int64_t time = 0; time < 100; time++) { |
| vector<char *> values = {(char *) (&value1[time]), (char *) (&value2[time]), (char *) (&value3[time])}; |
| deviceIds.push_back(deviceId); |
| measurementsList.push_back(measurements); |
| typesList.push_back(types); |
| valuesList.push_back(values); |
| timestamps.push_back(time); |
| } |
| |
| session->insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList); |
| |
| unique_ptr<SessionDataSet> sessionDataSet = session->executeQueryStatement("select * from root.test.d1"); |
| sessionDataSet->setBatchSize(1024); |
| int count = 0; |
| while (sessionDataSet->hasNext()) { |
| sessionDataSet->next(); |
| count++; |
| } |
| REQUIRE(count == 100); |
| } |
| |
| TEST_CASE("Test insertRecordsOfOneDevice", "[testInsertRecordsOfOneDevice]") { |
| vector<string> timeseries = {"root.test.d1.s1", "root.test.d1.s2", "root.test.d1.s3"}; |
| vector<TSDataType::TSDataType> types = {TSDataType::INT32, TSDataType::DOUBLE, TSDataType::INT64}; |
| |
| for (int i = 0; i < timeseries.size(); i++) { |
| if (session->checkTimeseriesExists(timeseries[i])) { |
| session->deleteTimeseries(timeseries[i]); |
| } |
| session->createTimeseries(timeseries[i], types[i], TSEncoding::RLE, CompressionType::SNAPPY); |
| } |
| string deviceId = "root.test.d1"; |
| vector<string> measurements = {"s1", "s2", "s3"}; |
| vector<vector<string>> measurementsList; |
| vector<vector<TSDataType::TSDataType>> typesList; |
| vector<vector<char *>> valuesList; |
| vector<int64_t> timestamps; |
| vector<int32_t> value1(100, 1); |
| vector<double> value2(100, 2.2); |
| vector<int64_t> value3(100, 3); |
| |
| for (int64_t time = 0; time < 100; time++) { |
| vector<char *> values = {(char *) (&value1[time]), (char *) (&value2[time]), (char *) (&value3[time])}; |
| measurementsList.push_back(measurements); |
| typesList.push_back(types); |
| valuesList.push_back(values); |
| timestamps.push_back(time); |
| } |
| |
| session->insertRecordsOfOneDevice(deviceId, timestamps, measurementsList, typesList, valuesList); |
| |
| unique_ptr<SessionDataSet> sessionDataSet = session->executeQueryStatement("select * from root.test.d1"); |
| sessionDataSet->setBatchSize(1024); |
| int count = 0; |
| while (sessionDataSet->hasNext()) { |
| sessionDataSet->next(); |
| count++; |
| } |
| REQUIRE(count == 100); |
| } |
| |
| TEST_CASE("Test insertTablet ", "[testInsertTablet]") { |
| prepareTimeseries(); |
| string deviceId = "root.test.d1"; |
| vector<pair<string, TSDataType::TSDataType>> schemaList; |
| schemaList.emplace_back("s1", TSDataType::INT64); |
| schemaList.emplace_back("s2", TSDataType::INT64); |
| schemaList.emplace_back("s3", TSDataType::INT64); |
| |
| Tablet tablet(deviceId, schemaList, 100); |
| for (int64_t time = 0; time < 100; time++) { |
| int row = tablet.rowSize++; |
| tablet.timestamps[row] = time; |
| for (int i = 0; i < 3; i++) { |
| int64_t randVal = rand(); |
| tablet.addValue(i, row, &randVal); |
| } |
| if (tablet.rowSize == tablet.maxRowNumber) { |
| session->insertTablet(tablet); |
| tablet.reset(); |
| } |
| } |
| |
| if (tablet.rowSize != 0) { |
| session->insertTablet(tablet); |
| tablet.reset(); |
| } |
| unique_ptr<SessionDataSet> sessionDataSet = session->executeQueryStatement("select * from root.test.d1"); |
| sessionDataSet->setBatchSize(1024); |
| int count = 0; |
| while (sessionDataSet->hasNext()) { |
| long index = 0; |
| count++; |
| for (const Field& f: sessionDataSet->next()->fields) { |
| REQUIRE(f.longV == index); |
| index++; |
| } |
| } |
| REQUIRE(count == 100); |
| } |
| |
| TEST_CASE("Test Last query ", "[testLastQuery]") { |
| prepareTimeseries(); |
| string deviceId = "root.test.d1"; |
| vector<string> measurements = {"s1", "s2", "s3"}; |
| |
| for (long time = 0; time < 100; time++) { |
| vector<string> values = {"1", "2", "3"}; |
| session->insertRecord(deviceId, time, measurements, values); |
| } |
| |
| vector<string> measurementValues = {"1", "2", "3"}; |
| unique_ptr<SessionDataSet> sessionDataSet = session->executeQueryStatement( |
| "select last s1,s2,s3 from root.test.d1"); |
| sessionDataSet->setBatchSize(1024); |
| long index = 0; |
| while (sessionDataSet->hasNext()) { |
| vector<Field> fields = sessionDataSet->next()->fields; |
| REQUIRE(fields[0].stringV == deviceId + "." + measurements[index]); |
| REQUIRE(fields[1].stringV == measurementValues[index]); |
| index++; |
| } |
| } |
| |
| TEST_CASE("Test Huge query ", "[testHugeQuery]") { |
| prepareTimeseries(); |
| string deviceId = "root.test.d1"; |
| vector<string> measurements = {"s1", "s2", "s3"}; |
| vector<TSDataType::TSDataType> types = {TSDataType::INT32, TSDataType::INT32, TSDataType::INT32}; |
| int value1 = 1, value2 = 2, value3 = 3; |
| vector<char*> values = {(char*)&value1, (char*)&value2, (char*)&value3}; |
| |
| for (long time = 0; time < 1000000; time++) { |
| session->insertRecord(deviceId, time, measurements, types, values); |
| } |
| |
| unique_ptr<SessionDataSet> sessionDataSet = session->executeQueryStatement("select * from root.test.d1"); |
| sessionDataSet->setBatchSize(1024); |
| RowRecord* rowRecord; |
| int count = 0; |
| while (sessionDataSet->hasNext()) { |
| rowRecord = sessionDataSet->next(); |
| REQUIRE(rowRecord->timestamp == count); |
| REQUIRE(rowRecord->fields[0].intV == 1); |
| REQUIRE(rowRecord->fields[1].intV == 2); |
| REQUIRE(rowRecord->fields[2].intV == 3); |
| count++; |
| } |
| |
| REQUIRE(count == 1000000); |
| } |