| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| #ifndef IOTDB_SESSION_H |
| #define IOTDB_SESSION_H |
| |
| #include <memory> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| #include <exception> |
| #include <iostream> |
| #include <algorithm> |
| #include <map> |
| #include <unordered_map> |
| #include <unordered_set> |
| #include <stack> |
| #include <new> |
| #include <thread> |
| #include <mutex> |
| #include <stdexcept> |
| #include <cstdlib> |
| #include <thrift/protocol/TBinaryProtocol.h> |
| #include <thrift/protocol/TCompactProtocol.h> |
| #include <thrift/transport/TSocket.h> |
| #include <thrift/transport/TTransportException.h> |
| #include <thrift/transport/TBufferTransports.h> |
| #include "IClientRPCService.h" |
| |
| using namespace std; |
| |
| using ::apache::thrift::protocol::TBinaryProtocol; |
| using ::apache::thrift::protocol::TCompactProtocol; |
| using ::apache::thrift::transport::TSocket; |
| using ::apache::thrift::transport::TTransport; |
| using ::apache::thrift::transport::TTransportException; |
| using ::apache::thrift::transport::TBufferedTransport; |
| using ::apache::thrift::transport::TFramedTransport; |
| using ::apache::thrift::TException; |
| |
| |
| enum LogLevelType { |
| LEVEL_DEBUG = 0, |
| LEVEL_INFO, |
| LEVEL_WARN, |
| LEVEL_ERROR |
| }; |
| extern LogLevelType LOG_LEVEL; |
| |
| #define log_debug(fmt,...) do {if(LOG_LEVEL <= LEVEL_DEBUG) {string s=string("[DEBUG] %s:%d (%s) - ") + fmt + "\n"; printf(s.c_str(), __FILE__, __LINE__, __FUNCTION__, ##__VA_ARGS__);}} while(0) |
| #define log_info(fmt,...) do {if(LOG_LEVEL <= LEVEL_INFO) {string s=string("[INFO] %s:%d (%s) - ") + fmt + "\n"; printf(s.c_str(), __FILE__, __LINE__, __FUNCTION__, ##__VA_ARGS__);}} while(0) |
| #define log_warn(fmt,...) do {if(LOG_LEVEL <= LEVEL_WARN) {string s=string("[WARN] %s:%d (%s) - ") + fmt + "\n"; printf(s.c_str(), __FILE__, __LINE__, __FUNCTION__, ##__VA_ARGS__);}} while(0) |
| #define log_error(fmt,...) do {if(LOG_LEVEL <= LEVEL_ERROR) {string s=string("[ERROR] %s:%d (%s) - ") + fmt + "\n"; printf(s.c_str(), __FILE__, __LINE__, __FUNCTION__, ##__VA_ARGS__);}} while(0) |
| |
| |
| class IoTDBException : public std::exception { |
| public: |
| IoTDBException() {} |
| |
| explicit IoTDBException(const std::string &m) : message(m) {} |
| |
| explicit IoTDBException(const char *m) : message(m) {} |
| |
| virtual const char *what() const noexcept override { |
| return message.c_str(); |
| } |
| private: |
| std::string message; |
| }; |
| |
| class IoTDBConnectionException : public IoTDBException { |
| public: |
| IoTDBConnectionException() {} |
| |
| explicit IoTDBConnectionException(const char *m) : IoTDBException(m) {} |
| |
| explicit IoTDBConnectionException(const std::string &m) : IoTDBException(m) {} |
| }; |
| |
| class ExecutionException : public IoTDBException { |
| public: |
| ExecutionException() {} |
| |
| explicit ExecutionException(const char *m) : IoTDBException(m) {} |
| |
| explicit ExecutionException(const std::string &m) : IoTDBException(m) {} |
| }; |
| |
| class BatchExecutionException : public IoTDBException { |
| public: |
| BatchExecutionException() {} |
| |
| explicit BatchExecutionException(const char *m) : IoTDBException(m) {} |
| |
| explicit BatchExecutionException(const std::string &m) : IoTDBException(m) {} |
| |
| explicit BatchExecutionException(const std::vector <TSStatus> &statusList) : statusList(statusList) {} |
| |
| BatchExecutionException(const std::string &m, const std::vector <TSStatus> &statusList) : IoTDBException(m), statusList(statusList) {} |
| |
| std::vector<TSStatus> statusList; |
| }; |
| |
| class UnSupportedDataTypeException : public IoTDBException { |
| public: |
| UnSupportedDataTypeException() {} |
| |
| explicit UnSupportedDataTypeException(const char *m) : IoTDBException(m) {} |
| |
| explicit UnSupportedDataTypeException(const std::string &m) : IoTDBException("UnSupported dataType: " + m) {} |
| }; |
| |
| namespace Version { |
| enum Version { |
| V_0_12, V_0_13 |
| }; |
| } |
| |
| namespace CompressionType { |
| enum CompressionType { |
| UNCOMPRESSED = (char) 0, |
| SNAPPY = (char) 1, |
| GZIP = (char) 2, |
| LZO = (char) 3, |
| SDT = (char) 4, |
| PAA = (char) 5, |
| PLA = (char) 6, |
| LZ4 = (char) 7 |
| }; |
| } |
| |
| namespace TSDataType { |
| enum TSDataType { |
| BOOLEAN = (char) 0, |
| INT32 = (char) 1, |
| INT64 = (char) 2, |
| FLOAT = (char) 3, |
| DOUBLE = (char) 4, |
| TEXT = (char) 5, |
| VECTOR = (char) 6, |
| NULLTYPE = (char) 7 |
| }; |
| } |
| |
| namespace TSEncoding { |
| enum TSEncoding { |
| PLAIN = (char) 0, |
| DICTIONARY = (char) 1, |
| RLE = (char) 2, |
| DIFF = (char) 3, |
| TS_2DIFF = (char) 4, |
| BITMAP = (char) 5, |
| GORILLA_V1 = (char) 6, |
| REGULAR = (char) 7, |
| GORILLA = (char) 8, |
| ZIGZAG = (char) 9, |
| FREQ = (char) 10 |
| }; |
| } |
| |
| namespace TSStatusCode { |
| enum TSStatusCode { |
| SUCCESS_STATUS = 200, |
| STILL_EXECUTING_STATUS = 201, |
| INVALID_HANDLE_STATUS = 202, |
| |
| NODE_DELETE_FAILED_ERROR = 298, |
| ALIAS_ALREADY_EXIST_ERROR = 299, |
| PATH_ALREADY_EXIST_ERROR = 300, |
| PATH_NOT_EXIST_ERROR = 301, |
| UNSUPPORTED_FETCH_METADATA_OPERATION_ERROR = 302, |
| METADATA_ERROR = 303, |
| OUT_OF_TTL_ERROR = 305, |
| CONFIG_ADJUSTER = 306, |
| MERGE_ERROR = 307, |
| SYSTEM_CHECK_ERROR = 308, |
| SYNC_DEVICE_OWNER_CONFLICT_ERROR = 309, |
| SYNC_CONNECTION_EXCEPTION = 310, |
| STORAGE_GROUP_PROCESSOR_ERROR = 311, |
| STORAGE_GROUP_ERROR = 312, |
| STORAGE_ENGINE_ERROR = 313, |
| TSFILE_PROCESSOR_ERROR = 314, |
| PATH_ILLEGAL = 315, |
| LOAD_FILE_ERROR = 316, |
| STORAGE_GROUP_NOT_READY = 317, |
| |
| EXECUTE_STATEMENT_ERROR = 400, |
| SQL_PARSE_ERROR = 401, |
| GENERATE_TIME_ZONE_ERROR = 402, |
| SET_TIME_ZONE_ERROR = 403, |
| NOT_STORAGE_GROUP_ERROR = 404, |
| QUERY_NOT_ALLOWED = 405, |
| AST_FORMAT_ERROR = 406, |
| LOGICAL_OPERATOR_ERROR = 407, |
| LOGICAL_OPTIMIZE_ERROR = 408, |
| UNSUPPORTED_FILL_TYPE_ERROR = 409, |
| PATH_ERROR = 410, |
| QUERY_PROCESS_ERROR = 411, |
| WRITE_PROCESS_ERROR = 412, |
| |
| INTERNAL_SERVER_ERROR = 500, |
| CLOSE_OPERATION_ERROR = 501, |
| READ_ONLY_SYSTEM_ERROR = 502, |
| DISK_SPACE_INSUFFICIENT_ERROR = 503, |
| START_UP_ERROR = 504, |
| SHUT_DOWN_ERROR = 505, |
| MULTIPLE_ERROR = 506, |
| WRONG_LOGIN_PASSWORD_ERROR = 600, |
| NOT_LOGIN_ERROR = 601, |
| NO_PERMISSION_ERROR = 602, |
| UNINITIALIZED_AUTH_ERROR = 603, |
| PARTITION_NOT_READY = 700, |
| TIME_OUT = 701, |
| NO_LEADER = 702, |
| UNSUPPORTED_OPERATION = 703, |
| NODE_READ_ONLY = 704, |
| INCOMPATIBLE_VERSION = 203, |
| }; |
| } |
| |
| class RpcUtils { |
| public: |
| std::shared_ptr<TSStatus> SUCCESS_STATUS; |
| |
| RpcUtils() { |
| SUCCESS_STATUS = std::make_shared<TSStatus>(); |
| SUCCESS_STATUS->__set_code(TSStatusCode::SUCCESS_STATUS); |
| } |
| |
| static void verifySuccess(const TSStatus &status); |
| |
| static void verifySuccess(const std::vector<TSStatus> &statuses); |
| |
| static TSStatus getStatus(TSStatusCode::TSStatusCode tsStatusCode); |
| |
| static TSStatus getStatus(int code, const std::string &message); |
| |
| static std::shared_ptr<TSExecuteStatementResp> getTSExecuteStatementResp(TSStatusCode::TSStatusCode tsStatusCode); |
| |
| static std::shared_ptr<TSExecuteStatementResp> |
| getTSExecuteStatementResp(TSStatusCode::TSStatusCode tsStatusCode, const std::string &message); |
| |
| static std::shared_ptr<TSExecuteStatementResp> getTSExecuteStatementResp(const TSStatus &status); |
| |
| static std::shared_ptr<TSFetchResultsResp> getTSFetchResultsResp(TSStatusCode::TSStatusCode tsStatusCode); |
| |
| static std::shared_ptr<TSFetchResultsResp> |
| getTSFetchResultsResp(TSStatusCode::TSStatusCode tsStatusCode, const std::string &appendMessage); |
| |
| static std::shared_ptr<TSFetchResultsResp> getTSFetchResultsResp(const TSStatus &status); |
| }; |
| |
| // Simulate the ByteBuffer class in Java |
| class MyStringBuffer { |
| public: |
| MyStringBuffer() : pos(0) { |
| checkBigEndian(); |
| } |
| |
| explicit MyStringBuffer(const std::string& str) : str(str), pos(0) { |
| checkBigEndian(); |
| } |
| |
| void reserve(size_t n) { |
| str.reserve(n); |
| } |
| |
| void clear() { |
| str.clear(); |
| pos = 0; |
| } |
| |
| bool hasRemaining() { |
| return pos < str.size(); |
| } |
| |
| int getInt() { |
| return *(int *) getOrderedByte(4); |
| } |
| |
| int64_t getInt64() { |
| return *(int64_t *) getOrderedByte(8); |
| } |
| |
| float getFloat() { |
| return *(float *) getOrderedByte(4); |
| } |
| |
| double getDouble() { |
| return *(double *) getOrderedByte(8); |
| } |
| |
| char getChar() { |
| return str[pos++]; |
| } |
| |
| bool getBool() { |
| return getChar() == 1; |
| } |
| |
| std::string getString() { |
| size_t len = getInt(); |
| size_t tmpPos = pos; |
| pos += len; |
| return str.substr(tmpPos, len); |
| } |
| |
| void putInt(int ins) { |
| putOrderedByte((char *) &ins, 4); |
| } |
| |
| void putInt64(int64_t ins) { |
| putOrderedByte((char *) &ins, 8); |
| } |
| |
| void putFloat(float ins) { |
| putOrderedByte((char *) &ins, 4); |
| } |
| |
| void putDouble(double ins) { |
| putOrderedByte((char *) &ins, 8); |
| } |
| |
| void putChar(char ins) { |
| str += ins; |
| } |
| |
| void putBool(bool ins) { |
| char tmp = ins ? 1 : 0; |
| str += tmp; |
| } |
| |
| void putString(const std::string &ins) { |
| putInt((int)(ins.size())); |
| str += ins; |
| } |
| |
| void concat(const std::string &ins) { |
| str.append(ins); |
| } |
| |
| public: |
| std::string str; |
| size_t pos; |
| |
| private: |
| void checkBigEndian() { |
| static int chk = 0x0201; //used to distinguish CPU's type (BigEndian or LittleEndian) |
| isBigEndian = (0x01 != *(char *) (&chk)); |
| } |
| |
| const char *getOrderedByte(size_t len) { |
| const char *p = nullptr; |
| if (isBigEndian) { |
| p = str.c_str() + pos; |
| } else { |
| const char *tmp = str.c_str(); |
| for (size_t i = pos; i < pos + len; i++) { |
| numericBuf[pos + len - 1 - i] = tmp[i]; |
| } |
| p = numericBuf; |
| } |
| pos += len; |
| return p; |
| } |
| |
| void putOrderedByte(char *buf, int len) { |
| if (isBigEndian) { |
| str.assign(buf, len); |
| } else { |
| for (int i = len - 1; i > -1; i--) { |
| str += buf[i]; |
| } |
| } |
| } |
| |
| private: |
| bool isBigEndian{}; |
| char numericBuf[8]{}; //only be used by int, long, float, double etc. |
| }; |
| |
| class BitMap { |
| public: |
| /** Initialize a BitMap with given size. */ |
| explicit BitMap(size_t size = 0) { |
| resize(size); |
| } |
| |
| /** change the size */ |
| void resize(size_t size) { |
| this->size = size; |
| this->bits.resize((size >> 3) + 1); // equal to "size/8 + 1" |
| reset(); |
| } |
| |
| /** mark as 1 at the given bit position. */ |
| bool mark(size_t position) { |
| if (position >= size) |
| return false; |
| |
| bits[position >> 3] |= (char) 1 << (position % 8); |
| return true; |
| } |
| |
| /** mark as 0 at the given bit position. */ |
| bool unmark(size_t position) { |
| if (position >= size) |
| return false; |
| |
| bits[position >> 3] &= ~((char) 1 << (position % 8)); |
| return true; |
| } |
| |
| /** mark as 1 at all positions. */ |
| void markAll() { |
| std::fill(bits.begin(), bits.end(), (char) 0XFF); |
| } |
| |
| /** mark as 0 at all positions. */ |
| void reset() { |
| std::fill(bits.begin(), bits.end(), (char) 0); |
| } |
| |
| /** returns the value of the bit with the specified index. */ |
| bool isMarked(size_t position) const { |
| if (position >= size) |
| return false; |
| |
| return (bits[position >> 3] & ((char) 1 << (position % 8))) != 0; |
| } |
| |
| /** whether all bits are zero, i.e., no Null value */ |
| bool isAllUnmarked() const { |
| size_t j; |
| for (j = 0; j < size >> 3; j++) { |
| if (bits[j] != (char) 0) { |
| return false; |
| } |
| } |
| for (j = 0; j < size % 8; j++) { |
| if ((bits[size >> 3] & ((char) 1 << j)) != 0) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| /** whether all bits are one, i.e., all are Null */ |
| bool isAllMarked() const { |
| size_t j; |
| for (j = 0; j < size >> 3; j++) { |
| if (bits[j] != (char) 0XFF) { |
| return false; |
| } |
| } |
| for (j = 0; j < size % 8; j++) { |
| if ((bits[size >> 3] & ((char) 1 << j)) == 0) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| const std::vector<char>& getByteArray() const { |
| return this->bits; |
| } |
| |
| size_t getSize() const { |
| return this->size; |
| } |
| |
| private: |
| size_t size; |
| std::vector<char> bits; |
| }; |
| |
| class Field { |
| public: |
| TSDataType::TSDataType dataType; |
| bool boolV; |
| int intV; |
| int64_t longV; |
| float floatV; |
| double doubleV; |
| std::string stringV; |
| |
| explicit Field(TSDataType::TSDataType a) { |
| dataType = a; |
| } |
| |
| Field() = default; |
| }; |
| |
| /* |
| * A tablet data of one device, the tablet contains multiple measurements of this device that share |
| * the same time column. |
| * |
| * for example: device root.sg1.d1 |
| * |
| * time, m1, m2, m3 |
| * 1, 1, 2, 3 |
| * 2, 1, 2, 3 |
| * 3, 1, 2, 3 |
| * |
| * Notice: The tablet should not have empty cell |
| * |
| */ |
| class Tablet { |
| private: |
| static const int DEFAULT_ROW_SIZE = 1024; |
| |
| void createColumns(); |
| void deleteColumns(); |
| |
| public: |
| std::string deviceId; // deviceId of this tablet |
| std::vector<std::pair<std::string, TSDataType::TSDataType>> schemas; // the list of measurement schemas for creating the tablet |
| std::vector<int64_t> timestamps; // timestamps in this tablet |
| std::vector<void*> values; // each object is a primitive type array, which represents values of one measurement |
| std::vector<BitMap> bitMaps; // each bitmap represents the existence of each value in the current column |
| size_t rowSize; //the number of rows to include in this tablet |
| size_t maxRowNumber; // the maximum number of rows for this tablet |
| bool isAligned; // whether this tablet store data of aligned timeseries or not |
| |
| Tablet() = default; |
| |
| /** |
| * Return a tablet with default specified row number. This is the standard |
| * constructor (all Tablet should be the same size). |
| * |
| * @param deviceId the name of the device specified to be written in |
| * @param timeseries the list of measurement schemas for creating the tablet |
| */ |
| Tablet(const std::string &deviceId, |
| const std::vector<std::pair<std::string, TSDataType::TSDataType>> ×eries) { |
| Tablet(deviceId, timeseries, DEFAULT_ROW_SIZE); |
| } |
| |
| /** |
| * Return a tablet with the specified number of rows (maxBatchSize). Only |
| * call this constructor directly for testing purposes. Tablet should normally |
| * always be default size. |
| * |
| * @param deviceId the name of the device specified to be written in |
| * @param schemas the list of measurement schemas for creating the row |
| * batch |
| * @param maxRowNumber the maximum number of rows for this tablet |
| */ |
| Tablet(const std::string &deviceId, const std::vector<std::pair<std::string, TSDataType::TSDataType>> &schemas, |
| size_t maxRowNumber, bool _isAligned = false) : deviceId(deviceId), schemas(schemas), |
| maxRowNumber(maxRowNumber), isAligned(_isAligned) { |
| // create timestamp column |
| timestamps.resize(maxRowNumber); |
| // create value columns |
| values.resize(schemas.size()); |
| createColumns(); |
| // create bitMaps |
| bitMaps.resize(schemas.size()); |
| for (size_t i = 0; i < schemas.size(); i++) { |
| bitMaps[i].resize(maxRowNumber); |
| } |
| this->rowSize = 0; |
| } |
| |
| ~Tablet() { |
| try { |
| deleteColumns(); |
| } catch (exception &e) { |
| log_debug(string("Tablet::~Tablet(), ") + e.what()); |
| } |
| } |
| |
| void addValue(size_t schemaId, size_t rowIndex, void *value); |
| |
| void reset(); // Reset Tablet to the default state - set the rowSize to 0 |
| |
| size_t getTimeBytesSize(); |
| |
| size_t getValueByteSize(); // total byte size that values occupies |
| |
| void setAligned(bool isAligned); |
| }; |
| |
| class SessionUtils { |
| public: |
| static std::string getTime(const Tablet &tablet); |
| |
| static std::string getValue(const Tablet &tablet); |
| }; |
| |
| class RowRecord { |
| public: |
| int64_t timestamp; |
| std::vector<Field> fields; |
| |
| explicit RowRecord(int64_t timestamp) { |
| this->timestamp = timestamp; |
| } |
| |
| RowRecord(int64_t timestamp, const std::vector<Field> &fields) |
| : timestamp(timestamp), fields(fields) { |
| } |
| |
| explicit RowRecord(const std::vector<Field> &fields) |
| : timestamp(-1), fields(fields) { |
| } |
| |
| RowRecord() { |
| this->timestamp = -1; |
| } |
| |
| void addField(const Field &f) { |
| this->fields.push_back(f); |
| } |
| |
| std::string toString() { |
| std::string ret; |
| if (this->timestamp != -1) { |
| ret.append(std::to_string(timestamp)); |
| ret.append("\t"); |
| } |
| for (size_t i = 0; i < fields.size(); i++) { |
| if (i != 0) { |
| ret.append("\t"); |
| } |
| TSDataType::TSDataType dataType = fields[i].dataType; |
| switch (dataType) { |
| case TSDataType::BOOLEAN: |
| ret.append(fields[i].boolV ? "true" : "false"); |
| break; |
| case TSDataType::INT32: |
| ret.append(std::to_string(fields[i].intV)); |
| break; |
| case TSDataType::INT64: |
| ret.append(std::to_string(fields[i].longV)); |
| break; |
| case TSDataType::FLOAT: |
| ret.append(std::to_string(fields[i].floatV)); |
| break; |
| case TSDataType::DOUBLE: |
| ret.append(std::to_string(fields[i].doubleV)); |
| break; |
| case TSDataType::TEXT: |
| ret.append(fields[i].stringV); |
| break; |
| case TSDataType::NULLTYPE: |
| ret.append("NULL"); |
| break; |
| default: |
| break; |
| } |
| } |
| ret.append("\n"); |
| return ret; |
| } |
| }; |
| |
| class SessionDataSet { |
| private: |
| bool hasCachedRecord = false; |
| std::string sql; |
| int64_t queryId; |
| int64_t statementId; |
| int64_t sessionId; |
| std::shared_ptr<IClientRPCServiceIf> client; |
| int batchSize = 1024; |
| std::vector<std::string> columnNameList; |
| std::vector<std::string> columnTypeDeduplicatedList; |
| // duplicated column index -> origin index |
| std::unordered_map<int, int> duplicateLocation; |
| // column name -> column location |
| std::unordered_map<std::string, int> columnMap; |
| // column size |
| int columnSize = 0; |
| bool isIgnoreTimeStamp = false; |
| |
| int rowsIndex = 0; // used to record the row index in current TSQueryDataSet |
| std::shared_ptr<TSQueryDataSet> tsQueryDataSet; |
| MyStringBuffer tsQueryDataSetTimeBuffer; |
| std::vector<std::unique_ptr<MyStringBuffer>> valueBuffers; |
| std::vector<std::unique_ptr<MyStringBuffer>> bitmapBuffers; |
| RowRecord rowRecord; |
| char *currentBitmap = nullptr; // used to cache the current bitmap for every column |
| static const int flag = 0x80; // used to do `or` operation with bitmap to judge whether the value is null |
| |
| bool operationIsOpen = false; |
| |
| public: |
| SessionDataSet(const std::string &sql, |
| const std::vector<std::string> &columnNameList, |
| const std::vector<std::string> &columnTypeList, |
| std::map<std::string, int> &columnNameIndexMap, |
| bool isIgnoreTimeStamp, |
| int64_t queryId, int64_t statementId, |
| std::shared_ptr<IClientRPCServiceIf> client, int64_t sessionId, |
| const std::shared_ptr<TSQueryDataSet> &queryDataSet) : tsQueryDataSetTimeBuffer(queryDataSet->time) { |
| this->sessionId = sessionId; |
| this->sql = sql; |
| this->queryId = queryId; |
| this->statementId = statementId; |
| this->client = client; |
| this->columnNameList = columnNameList; |
| this->currentBitmap = new char[columnNameList.size()]; |
| this->columnSize = (int)columnNameList.size(); |
| this->isIgnoreTimeStamp = isIgnoreTimeStamp; |
| |
| // column name -> column location |
| for (int i = 0; i < (int) columnNameList.size(); i++) { |
| std::string name = columnNameList[i]; |
| if (this->columnMap.find(name) != this->columnMap.end()) { |
| duplicateLocation[i] = columnMap[name]; |
| } else { |
| this->columnMap[name] = i; |
| this->columnTypeDeduplicatedList.push_back(columnTypeList[i]); |
| } |
| if (!columnNameIndexMap.empty()) { |
| this->valueBuffers.push_back( |
| std::unique_ptr<MyStringBuffer>( |
| new MyStringBuffer(queryDataSet->valueList[columnNameIndexMap[name]]))); |
| this->bitmapBuffers.push_back( |
| std::unique_ptr<MyStringBuffer>( |
| new MyStringBuffer(queryDataSet->bitmapList[columnNameIndexMap[name]]))); |
| } else { |
| this->valueBuffers.push_back( |
| std::unique_ptr<MyStringBuffer>(new MyStringBuffer(queryDataSet->valueList[columnMap[name]]))); |
| this->bitmapBuffers.push_back( |
| std::unique_ptr<MyStringBuffer>(new MyStringBuffer(queryDataSet->bitmapList[columnMap[name]]))); |
| } |
| } |
| this->tsQueryDataSet = queryDataSet; |
| |
| operationIsOpen = true; |
| } |
| |
| ~SessionDataSet() { |
| try { |
| closeOperationHandle(); |
| } catch (exception &e) { |
| log_debug(string("SessionDataSet::~SessionDataSet(), ") + e.what()); |
| } |
| |
| if (currentBitmap != nullptr) { |
| delete[] currentBitmap; |
| currentBitmap = nullptr; |
| } |
| } |
| |
| int getBatchSize(); |
| |
| void setBatchSize(int batchSize); |
| |
| std::vector<std::string> getColumnNames(); |
| |
| bool hasNext(); |
| |
| void constructOneRow(); |
| |
| bool isNull(int index, int rowNum); |
| |
| RowRecord *next(); |
| |
| void closeOperationHandle(bool forceClose = false); |
| }; |
| |
| class TemplateNode { |
| public: |
| explicit TemplateNode(const std::string &name) : name_(name) {} |
| |
| const std::string &getName() const { |
| return name_; |
| } |
| |
| virtual const std::unordered_map<std::string, std::shared_ptr<TemplateNode>> &getChildren() const { |
| throw BatchExecutionException("Should call exact sub class!"); |
| } |
| |
| virtual bool isMeasurement() = 0; |
| |
| virtual bool isAligned() { |
| throw BatchExecutionException("Should call exact sub class!"); |
| } |
| |
| virtual std::string serialize() const { |
| throw BatchExecutionException("Should call exact sub class!"); |
| } |
| |
| private: |
| std::string name_; |
| }; |
| |
| class MeasurementNode : public TemplateNode { |
| public: |
| |
| MeasurementNode(const std::string &name_, TSDataType::TSDataType data_type_, TSEncoding::TSEncoding encoding_, |
| CompressionType::CompressionType compression_type_) : TemplateNode(name_) { |
| this->data_type_ = data_type_; |
| this->encoding_ = encoding_; |
| this->compression_type_ = compression_type_; |
| } |
| |
| TSDataType::TSDataType getDataType() const { |
| return data_type_; |
| } |
| |
| TSEncoding::TSEncoding getEncoding() const { |
| return encoding_; |
| } |
| |
| CompressionType::CompressionType getCompressionType() const { |
| return compression_type_; |
| } |
| |
| bool isMeasurement() override { |
| return true; |
| } |
| |
| std::string serialize() const override; |
| |
| private: |
| TSDataType::TSDataType data_type_; |
| TSEncoding::TSEncoding encoding_; |
| CompressionType::CompressionType compression_type_; |
| }; |
| |
| class InternalNode : public TemplateNode { |
| public: |
| |
| InternalNode(const std::string &name, bool is_aligned) : TemplateNode(name), is_aligned_(is_aligned) {} |
| |
| void addChild(const InternalNode &node) { |
| if (this->children_.count(node.getName())) { |
| throw BatchExecutionException("Duplicated child of node in template."); |
| } |
| this->children_[node.getName()] = std::make_shared<InternalNode>(node); |
| } |
| |
| void addChild(const MeasurementNode &node) { |
| if (this->children_.count(node.getName())) { |
| throw BatchExecutionException("Duplicated child of node in template."); |
| } |
| this->children_[node.getName()] = std::make_shared<MeasurementNode>(node); |
| } |
| |
| void deleteChild(const TemplateNode &node) { |
| this->children_.erase(node.getName()); |
| } |
| |
| const std::unordered_map<std::string, std::shared_ptr<TemplateNode>> &getChildren() const override { |
| return children_; |
| } |
| |
| bool isMeasurement() override { |
| return false; |
| } |
| |
| bool isAligned() override { |
| return is_aligned_; |
| } |
| |
| private: |
| std::unordered_map<std::string, std::shared_ptr<TemplateNode>> children_; |
| bool is_aligned_; |
| }; |
| |
| namespace TemplateQueryType { |
| enum TemplateQueryType { |
| COUNT_MEASUREMENTS, IS_MEASUREMENT, PATH_EXIST, SHOW_MEASUREMENTS |
| }; |
| } |
| |
| class Template { |
| public: |
| |
| Template(const std::string &name, bool is_aligned) : name_(name), is_aligned_(is_aligned) {} |
| |
| const std::string &getName() const { |
| return name_; |
| } |
| |
| bool isAligned() const { |
| return is_aligned_; |
| } |
| |
| void addToTemplate(const InternalNode &child) { |
| if (this->children_.count(child.getName())) { |
| throw BatchExecutionException("Duplicated child of node in template."); |
| } |
| this->children_[child.getName()] = std::make_shared<InternalNode>(child); |
| } |
| |
| void addToTemplate(const MeasurementNode &child) { |
| if (this->children_.count(child.getName())) { |
| throw BatchExecutionException("Duplicated child of node in template."); |
| } |
| this->children_[child.getName()] = std::make_shared<MeasurementNode>(child); |
| } |
| |
| std::string serialize() const; |
| |
| private: |
| std::string name_; |
| std::unordered_map<std::string, std::shared_ptr<TemplateNode>> children_; |
| bool is_aligned_; |
| }; |
| |
| class Session { |
| private: |
| std::string host; |
| int rpcPort; |
| std::string username; |
| std::string password; |
| const TSProtocolVersion::type protocolVersion = TSProtocolVersion::IOTDB_SERVICE_PROTOCOL_V3; |
| std::shared_ptr<IClientRPCServiceIf> client; |
| std::shared_ptr<TTransport> transport; |
| bool isClosed = true; |
| int64_t sessionId; |
| int64_t statementId; |
| std::string zoneId; |
| int fetchSize; |
| const static int DEFAULT_FETCH_SIZE = 10000; |
| const static int DEFAULT_TIMEOUT_MS = 0; |
| Version::Version version; |
| |
| static bool checkSorted(const Tablet &tablet); |
| |
| static bool checkSorted(const std::vector<int64_t> ×); |
| |
| static void sortTablet(Tablet &tablet); |
| |
| static void sortIndexByTimestamp(int *index, std::vector<int64_t> ×tamps, int length); |
| |
| std::string getTimeZone(); |
| |
| void setTimeZone(const std::string &zoneId); |
| |
| void appendValues(std::string &buffer, const char *value, int size); |
| |
| void |
| putValuesIntoBuffer(const std::vector<TSDataType::TSDataType> &types, const std::vector<char *> &values, |
| std::string &buf); |
| |
| int8_t getDataTypeNumber(TSDataType::TSDataType type); |
| |
| struct TsCompare { |
| std::vector<int64_t> ×tamps; |
| |
| explicit TsCompare(std::vector<int64_t> &inTimestamps) : timestamps(inTimestamps) {}; |
| |
| bool operator()(int i, int j) { return (timestamps[i] < timestamps[j]); }; |
| }; |
| |
| std::string getVersionString(Version::Version version); |
| |
| public: |
| Session(const std::string &host, int rpcPort) : username("user"), password("password"), version(Version::V_0_13) { |
| this->host = host; |
| this->rpcPort = rpcPort; |
| } |
| |
| Session(const std::string &host, int rpcPort, const std::string &username, const std::string &password) |
| : fetchSize(10000) { |
| this->host = host; |
| this->rpcPort = rpcPort; |
| this->username = username; |
| this->password = password; |
| this->zoneId = "UTC+08:00"; |
| this->version = Version::V_0_13; |
| } |
| |
| Session(const std::string &host, int rpcPort, const std::string &username, const std::string &password, |
| int fetchSize) { |
| this->host = host; |
| this->rpcPort = rpcPort; |
| this->username = username; |
| this->password = password; |
| this->fetchSize = fetchSize; |
| this->zoneId = "UTC+08:00"; |
| this->version = Version::V_0_13; |
| } |
| |
| Session(const std::string &host, const std::string &rpcPort, const std::string &username = "user", |
| const std::string &password = "password", int fetchSize = 10000) { |
| this->host = host; |
| this->rpcPort = stoi(rpcPort); |
| this->username = username; |
| this->password = password; |
| this->fetchSize = fetchSize; |
| this->zoneId = "UTC+08:00"; |
| this->version = Version::V_0_13; |
| } |
| |
| ~Session(); |
| |
| int64_t getSessionId(); |
| |
| void open(); |
| |
| void open(bool enableRPCCompression); |
| |
| void open(bool enableRPCCompression, int connectionTimeoutInMs); |
| |
| void close(); |
| |
| void insertRecord(const std::string &deviceId, int64_t time, const std::vector<std::string> &measurements, |
| const std::vector<std::string> &values); |
| |
| void insertRecord(const std::string &deviceId, int64_t time, const std::vector<std::string> &measurements, |
| const std::vector<TSDataType::TSDataType> &types, const std::vector<char *> &values); |
| |
| void insertAlignedRecord(const std::string &deviceId, int64_t time, const std::vector<std::string> &measurements, |
| const std::vector<std::string> &values); |
| |
| void insertAlignedRecord(const std::string &deviceId, int64_t time, const std::vector<std::string> &measurements, |
| const std::vector<TSDataType::TSDataType> &types, const std::vector<char *> &values); |
| |
| void insertRecords(const std::vector<std::string> &deviceIds, |
| const std::vector<int64_t> ×, |
| const std::vector<std::vector<std::string>> &measurementsList, |
| const std::vector<std::vector<std::string>> &valuesList); |
| |
| void insertRecords(const std::vector<std::string> &deviceIds, |
| const std::vector<int64_t> ×, |
| const std::vector<std::vector<std::string>> &measurementsList, |
| const std::vector<std::vector<TSDataType::TSDataType>> &typesList, |
| const std::vector<std::vector<char *>> &valuesList); |
| |
| void insertAlignedRecords(const std::vector<std::string> &deviceIds, |
| const std::vector<int64_t> ×, |
| const std::vector<std::vector<std::string>> &measurementsList, |
| const std::vector<std::vector<std::string>> &valuesList); |
| |
| void insertAlignedRecords(const std::vector<std::string> &deviceIds, |
| const std::vector<int64_t> ×, |
| const std::vector<std::vector<std::string>> &measurementsList, |
| const std::vector<std::vector<TSDataType::TSDataType>> &typesList, |
| const std::vector<std::vector<char *>> &valuesList); |
| |
| void insertRecordsOfOneDevice(const std::string &deviceId, |
| std::vector<int64_t> ×, |
| std::vector<std::vector<std::string>> &measurementsList, |
| std::vector<std::vector<TSDataType::TSDataType>> &typesList, |
| std::vector<std::vector<char *>> &valuesList); |
| |
| void insertRecordsOfOneDevice(const std::string &deviceId, |
| std::vector<int64_t> ×, |
| std::vector<std::vector<std::string>> &measurementsList, |
| std::vector<std::vector<TSDataType::TSDataType>> &typesList, |
| std::vector<std::vector<char *>> &valuesList, |
| bool sorted); |
| |
| void insertAlignedRecordsOfOneDevice(const std::string &deviceId, |
| std::vector<int64_t> ×, |
| std::vector<std::vector<std::string>> &measurementsList, |
| std::vector<std::vector<TSDataType::TSDataType>> &typesList, |
| std::vector<std::vector<char *>> &valuesList); |
| |
| void insertAlignedRecordsOfOneDevice(const std::string &deviceId, |
| std::vector<int64_t> ×, |
| std::vector<std::vector<std::string>> &measurementsList, |
| std::vector<std::vector<TSDataType::TSDataType>> &typesList, |
| std::vector<std::vector<char *>> &valuesList, |
| bool sorted); |
| |
| void insertTablet(Tablet &tablet); |
| |
| void insertTablet(Tablet &tablet, bool sorted); |
| |
| static void buildInsertTabletReq(TSInsertTabletReq &request, int64_t sessionId, Tablet &tablet, bool sorted); |
| |
| void insertTablet(const TSInsertTabletReq &request); |
| |
| void insertAlignedTablet(Tablet &tablet); |
| |
| void insertAlignedTablet(Tablet &tablet, bool sorted); |
| |
| void insertTablets(std::unordered_map<std::string, Tablet *> &tablets); |
| |
| void insertTablets(std::unordered_map<std::string, Tablet *> &tablets, bool sorted); |
| |
| void insertAlignedTablets(std::unordered_map<std::string, Tablet *> &tablets, bool sorted = false); |
| |
| void testInsertRecord(const std::string &deviceId, int64_t time, |
| const std::vector<std::string> &measurements, |
| const std::vector<std::string> &values); |
| |
| void testInsertTablet(const Tablet &tablet); |
| |
| void testInsertRecords(const std::vector<std::string> &deviceIds, |
| const std::vector<int64_t> ×, |
| const std::vector<std::vector<std::string>> &measurementsList, |
| const std::vector<std::vector<std::string>> &valuesList); |
| |
| void deleteTimeseries(const std::string &path); |
| |
| void deleteTimeseries(const std::vector<std::string> &paths); |
| |
| void deleteData(const std::string &path, int64_t time); |
| |
| void deleteData(const std::vector<std::string> &deviceId, int64_t time); |
| |
| void setStorageGroup(const std::string &storageGroupId); |
| |
| void deleteStorageGroup(const std::string &storageGroup); |
| |
| void deleteStorageGroups(const std::vector<std::string> &storageGroups); |
| |
| void createTimeseries(const std::string &path, TSDataType::TSDataType dataType, TSEncoding::TSEncoding encoding, |
| CompressionType::CompressionType compressor); |
| |
| void createTimeseries(const std::string &path, TSDataType::TSDataType dataType, TSEncoding::TSEncoding encoding, |
| CompressionType::CompressionType compressor, |
| std::map<std::string, std::string> *props, std::map<std::string, std::string> *tags, |
| std::map<std::string, std::string> *attributes, |
| const std::string &measurementAlias); |
| |
| void createMultiTimeseries(const std::vector<std::string> &paths, |
| const std::vector<TSDataType::TSDataType> &dataTypes, |
| const std::vector<TSEncoding::TSEncoding> &encodings, |
| const std::vector<CompressionType::CompressionType> &compressors, |
| std::vector<std::map<std::string, std::string>> *propsList, |
| std::vector<std::map<std::string, std::string>> *tagsList, |
| std::vector<std::map<std::string, std::string>> *attributesList, |
| std::vector<std::string> *measurementAliasList); |
| |
| void createAlignedTimeseries(const std::string &deviceId, |
| const std::vector<std::string> &measurements, |
| const std::vector<TSDataType::TSDataType> &dataTypes, |
| const std::vector<TSEncoding::TSEncoding> &encodings, |
| const std::vector<CompressionType::CompressionType> &compressors); |
| |
| bool checkTimeseriesExists(const std::string &path); |
| |
| std::unique_ptr<SessionDataSet> executeQueryStatement(const std::string &sql); |
| |
| void executeNonQueryStatement(const std::string &sql); |
| |
| void createSchemaTemplate(const Template &templ); |
| |
| void setSchemaTemplate(const std::string &template_name, const std::string &prefix_path); |
| |
| void unsetSchemaTemplate(const std::string &prefix_path, const std::string &template_name); |
| |
| void addAlignedMeasurementsInTemplate(const std::string &template_name, |
| const std::vector<std::string> &measurements, |
| const std::vector<TSDataType::TSDataType> &dataTypes, |
| const std::vector<TSEncoding::TSEncoding> &encodings, |
| const std::vector<CompressionType::CompressionType> &compressors); |
| |
| void addAlignedMeasurementsInTemplate(const std::string &template_name, |
| const std::string &measurement, |
| TSDataType::TSDataType dataType, |
| TSEncoding::TSEncoding encoding, |
| CompressionType::CompressionType compressor); |
| |
| void addUnalignedMeasurementsInTemplate(const std::string &template_name, |
| const std::vector<std::string> &measurements, |
| const std::vector<TSDataType::TSDataType> &dataTypes, |
| const std::vector<TSEncoding::TSEncoding> &encodings, |
| const std::vector<CompressionType::CompressionType> &compressors); |
| |
| void addUnalignedMeasurementsInTemplate(const std::string &template_name, |
| const std::string &measurement, |
| TSDataType::TSDataType dataType, |
| TSEncoding::TSEncoding encoding, |
| CompressionType::CompressionType compressor); |
| |
| void deleteNodeInTemplate(const std::string &template_name, const std::string &path); |
| |
| int countMeasurementsInTemplate(const std::string &template_name); |
| |
| bool isMeasurementInTemplate(const std::string &template_name, const std::string &path); |
| |
| bool isPathExistInTemplate(const std::string &template_name, const std::string &path); |
| |
| std::vector<std::string> showMeasurementsInTemplate(const std::string &template_name); |
| |
| std::vector<std::string> showMeasurementsInTemplate(const std::string &template_name, const std::string &pattern); |
| |
| bool checkTemplateExists(const std::string &template_name); |
| }; |
| |
| #endif // IOTDB_SESSION_H |