blob: 9d9dc665f52fb3178906fd1b5c32819fb82697ee [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef IOTDB_SESSION_H
#define IOTDB_SESSION_H
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include <exception>
#include <iostream>
#include <algorithm>
#include <map>
#include <unordered_map>
#include <unordered_set>
#include <stack>
#include <new>
#include <thread>
#include <mutex>
#include <stdexcept>
#include <cstdlib>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/protocol/TCompactProtocol.h>
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TTransportException.h>
#include <thrift/transport/TBufferTransports.h>
#include "IClientRPCService.h"
using namespace std;
using ::apache::thrift::protocol::TBinaryProtocol;
using ::apache::thrift::protocol::TCompactProtocol;
using ::apache::thrift::transport::TSocket;
using ::apache::thrift::transport::TTransport;
using ::apache::thrift::transport::TTransportException;
using ::apache::thrift::transport::TBufferedTransport;
using ::apache::thrift::transport::TFramedTransport;
using ::apache::thrift::TException;
enum LogLevelType {
LEVEL_DEBUG = 0,
LEVEL_INFO,
LEVEL_WARN,
LEVEL_ERROR
};
extern LogLevelType LOG_LEVEL;
#define log_debug(fmt,...) do {if(LOG_LEVEL <= LEVEL_DEBUG) {string s=string("[DEBUG] %s:%d (%s) - ") + fmt + "\n"; printf(s.c_str(), __FILE__, __LINE__, __FUNCTION__, ##__VA_ARGS__);}} while(0)
#define log_info(fmt,...) do {if(LOG_LEVEL <= LEVEL_INFO) {string s=string("[INFO] %s:%d (%s) - ") + fmt + "\n"; printf(s.c_str(), __FILE__, __LINE__, __FUNCTION__, ##__VA_ARGS__);}} while(0)
#define log_warn(fmt,...) do {if(LOG_LEVEL <= LEVEL_WARN) {string s=string("[WARN] %s:%d (%s) - ") + fmt + "\n"; printf(s.c_str(), __FILE__, __LINE__, __FUNCTION__, ##__VA_ARGS__);}} while(0)
#define log_error(fmt,...) do {if(LOG_LEVEL <= LEVEL_ERROR) {string s=string("[ERROR] %s:%d (%s) - ") + fmt + "\n"; printf(s.c_str(), __FILE__, __LINE__, __FUNCTION__, ##__VA_ARGS__);}} while(0)
class IoTDBException : public std::exception {
public:
IoTDBException() {}
explicit IoTDBException(const std::string &m) : message(m) {}
explicit IoTDBException(const char *m) : message(m) {}
virtual const char *what() const noexcept override {
return message.c_str();
}
private:
std::string message;
};
class IoTDBConnectionException : public IoTDBException {
public:
IoTDBConnectionException() {}
explicit IoTDBConnectionException(const char *m) : IoTDBException(m) {}
explicit IoTDBConnectionException(const std::string &m) : IoTDBException(m) {}
};
class ExecutionException : public IoTDBException {
public:
ExecutionException() {}
explicit ExecutionException(const char *m) : IoTDBException(m) {}
explicit ExecutionException(const std::string &m) : IoTDBException(m) {}
};
class BatchExecutionException : public IoTDBException {
public:
BatchExecutionException() {}
explicit BatchExecutionException(const char *m) : IoTDBException(m) {}
explicit BatchExecutionException(const std::string &m) : IoTDBException(m) {}
explicit BatchExecutionException(const std::vector <TSStatus> &statusList) : statusList(statusList) {}
BatchExecutionException(const std::string &m, const std::vector <TSStatus> &statusList) : IoTDBException(m), statusList(statusList) {}
std::vector<TSStatus> statusList;
};
class UnSupportedDataTypeException : public IoTDBException {
public:
UnSupportedDataTypeException() {}
explicit UnSupportedDataTypeException(const char *m) : IoTDBException(m) {}
explicit UnSupportedDataTypeException(const std::string &m) : IoTDBException("UnSupported dataType: " + m) {}
};
namespace Version {
enum Version {
V_0_12, V_0_13
};
}
namespace CompressionType {
enum CompressionType {
UNCOMPRESSED = (char) 0,
SNAPPY = (char) 1,
GZIP = (char) 2,
LZO = (char) 3,
SDT = (char) 4,
PAA = (char) 5,
PLA = (char) 6,
LZ4 = (char) 7
};
}
namespace TSDataType {
enum TSDataType {
BOOLEAN = (char) 0,
INT32 = (char) 1,
INT64 = (char) 2,
FLOAT = (char) 3,
DOUBLE = (char) 4,
TEXT = (char) 5,
VECTOR = (char) 6,
NULLTYPE = (char) 7
};
}
namespace TSEncoding {
enum TSEncoding {
PLAIN = (char) 0,
DICTIONARY = (char) 1,
RLE = (char) 2,
DIFF = (char) 3,
TS_2DIFF = (char) 4,
BITMAP = (char) 5,
GORILLA_V1 = (char) 6,
REGULAR = (char) 7,
GORILLA = (char) 8,
ZIGZAG = (char) 9,
FREQ = (char) 10
};
}
namespace TSStatusCode {
enum TSStatusCode {
SUCCESS_STATUS = 200,
STILL_EXECUTING_STATUS = 201,
INVALID_HANDLE_STATUS = 202,
NODE_DELETE_FAILED_ERROR = 298,
ALIAS_ALREADY_EXIST_ERROR = 299,
PATH_ALREADY_EXIST_ERROR = 300,
PATH_NOT_EXIST_ERROR = 301,
UNSUPPORTED_FETCH_METADATA_OPERATION_ERROR = 302,
METADATA_ERROR = 303,
OUT_OF_TTL_ERROR = 305,
CONFIG_ADJUSTER = 306,
MERGE_ERROR = 307,
SYSTEM_CHECK_ERROR = 308,
SYNC_DEVICE_OWNER_CONFLICT_ERROR = 309,
SYNC_CONNECTION_EXCEPTION = 310,
STORAGE_GROUP_PROCESSOR_ERROR = 311,
STORAGE_GROUP_ERROR = 312,
STORAGE_ENGINE_ERROR = 313,
TSFILE_PROCESSOR_ERROR = 314,
PATH_ILLEGAL = 315,
LOAD_FILE_ERROR = 316,
STORAGE_GROUP_NOT_READY = 317,
EXECUTE_STATEMENT_ERROR = 400,
SQL_PARSE_ERROR = 401,
GENERATE_TIME_ZONE_ERROR = 402,
SET_TIME_ZONE_ERROR = 403,
NOT_STORAGE_GROUP_ERROR = 404,
QUERY_NOT_ALLOWED = 405,
AST_FORMAT_ERROR = 406,
LOGICAL_OPERATOR_ERROR = 407,
LOGICAL_OPTIMIZE_ERROR = 408,
UNSUPPORTED_FILL_TYPE_ERROR = 409,
PATH_ERROR = 410,
QUERY_PROCESS_ERROR = 411,
WRITE_PROCESS_ERROR = 412,
INTERNAL_SERVER_ERROR = 500,
CLOSE_OPERATION_ERROR = 501,
READ_ONLY_SYSTEM_ERROR = 502,
DISK_SPACE_INSUFFICIENT_ERROR = 503,
START_UP_ERROR = 504,
SHUT_DOWN_ERROR = 505,
MULTIPLE_ERROR = 506,
WRONG_LOGIN_PASSWORD_ERROR = 600,
NOT_LOGIN_ERROR = 601,
NO_PERMISSION_ERROR = 602,
UNINITIALIZED_AUTH_ERROR = 603,
PARTITION_NOT_READY = 700,
TIME_OUT = 701,
NO_LEADER = 702,
UNSUPPORTED_OPERATION = 703,
NODE_READ_ONLY = 704,
INCOMPATIBLE_VERSION = 203,
};
}
class RpcUtils {
public:
std::shared_ptr<TSStatus> SUCCESS_STATUS;
RpcUtils() {
SUCCESS_STATUS = std::make_shared<TSStatus>();
SUCCESS_STATUS->__set_code(TSStatusCode::SUCCESS_STATUS);
}
static void verifySuccess(const TSStatus &status);
static void verifySuccess(const std::vector<TSStatus> &statuses);
static TSStatus getStatus(TSStatusCode::TSStatusCode tsStatusCode);
static TSStatus getStatus(int code, const std::string &message);
static std::shared_ptr<TSExecuteStatementResp> getTSExecuteStatementResp(TSStatusCode::TSStatusCode tsStatusCode);
static std::shared_ptr<TSExecuteStatementResp>
getTSExecuteStatementResp(TSStatusCode::TSStatusCode tsStatusCode, const std::string &message);
static std::shared_ptr<TSExecuteStatementResp> getTSExecuteStatementResp(const TSStatus &status);
static std::shared_ptr<TSFetchResultsResp> getTSFetchResultsResp(TSStatusCode::TSStatusCode tsStatusCode);
static std::shared_ptr<TSFetchResultsResp>
getTSFetchResultsResp(TSStatusCode::TSStatusCode tsStatusCode, const std::string &appendMessage);
static std::shared_ptr<TSFetchResultsResp> getTSFetchResultsResp(const TSStatus &status);
};
// Simulate the ByteBuffer class in Java
class MyStringBuffer {
public:
MyStringBuffer() : pos(0) {
checkBigEndian();
}
explicit MyStringBuffer(const std::string& str) : str(str), pos(0) {
checkBigEndian();
}
void reserve(size_t n) {
str.reserve(n);
}
void clear() {
str.clear();
pos = 0;
}
bool hasRemaining() {
return pos < str.size();
}
int getInt() {
return *(int *) getOrderedByte(4);
}
int64_t getInt64() {
return *(int64_t *) getOrderedByte(8);
}
float getFloat() {
return *(float *) getOrderedByte(4);
}
double getDouble() {
return *(double *) getOrderedByte(8);
}
char getChar() {
return str[pos++];
}
bool getBool() {
return getChar() == 1;
}
std::string getString() {
size_t len = getInt();
size_t tmpPos = pos;
pos += len;
return str.substr(tmpPos, len);
}
void putInt(int ins) {
putOrderedByte((char *) &ins, 4);
}
void putInt64(int64_t ins) {
putOrderedByte((char *) &ins, 8);
}
void putFloat(float ins) {
putOrderedByte((char *) &ins, 4);
}
void putDouble(double ins) {
putOrderedByte((char *) &ins, 8);
}
void putChar(char ins) {
str += ins;
}
void putBool(bool ins) {
char tmp = ins ? 1 : 0;
str += tmp;
}
void putString(const std::string &ins) {
putInt((int)(ins.size()));
str += ins;
}
void concat(const std::string &ins) {
str.append(ins);
}
public:
std::string str;
size_t pos;
private:
void checkBigEndian() {
static int chk = 0x0201; //used to distinguish CPU's type (BigEndian or LittleEndian)
isBigEndian = (0x01 != *(char *) (&chk));
}
const char *getOrderedByte(size_t len) {
const char *p = nullptr;
if (isBigEndian) {
p = str.c_str() + pos;
} else {
const char *tmp = str.c_str();
for (size_t i = pos; i < pos + len; i++) {
numericBuf[pos + len - 1 - i] = tmp[i];
}
p = numericBuf;
}
pos += len;
return p;
}
void putOrderedByte(char *buf, int len) {
if (isBigEndian) {
str.assign(buf, len);
} else {
for (int i = len - 1; i > -1; i--) {
str += buf[i];
}
}
}
private:
bool isBigEndian{};
char numericBuf[8]{}; //only be used by int, long, float, double etc.
};
class BitMap {
public:
/** Initialize a BitMap with given size. */
explicit BitMap(size_t size = 0) {
resize(size);
}
/** change the size */
void resize(size_t size) {
this->size = size;
this->bits.resize((size >> 3) + 1); // equal to "size/8 + 1"
reset();
}
/** mark as 1 at the given bit position. */
bool mark(size_t position) {
if (position >= size)
return false;
bits[position >> 3] |= (char) 1 << (position % 8);
return true;
}
/** mark as 0 at the given bit position. */
bool unmark(size_t position) {
if (position >= size)
return false;
bits[position >> 3] &= ~((char) 1 << (position % 8));
return true;
}
/** mark as 1 at all positions. */
void markAll() {
std::fill(bits.begin(), bits.end(), (char) 0XFF);
}
/** mark as 0 at all positions. */
void reset() {
std::fill(bits.begin(), bits.end(), (char) 0);
}
/** returns the value of the bit with the specified index. */
bool isMarked(size_t position) const {
if (position >= size)
return false;
return (bits[position >> 3] & ((char) 1 << (position % 8))) != 0;
}
/** whether all bits are zero, i.e., no Null value */
bool isAllUnmarked() const {
size_t j;
for (j = 0; j < size >> 3; j++) {
if (bits[j] != (char) 0) {
return false;
}
}
for (j = 0; j < size % 8; j++) {
if ((bits[size >> 3] & ((char) 1 << j)) != 0) {
return false;
}
}
return true;
}
/** whether all bits are one, i.e., all are Null */
bool isAllMarked() const {
size_t j;
for (j = 0; j < size >> 3; j++) {
if (bits[j] != (char) 0XFF) {
return false;
}
}
for (j = 0; j < size % 8; j++) {
if ((bits[size >> 3] & ((char) 1 << j)) == 0) {
return false;
}
}
return true;
}
const std::vector<char>& getByteArray() const {
return this->bits;
}
size_t getSize() const {
return this->size;
}
private:
size_t size;
std::vector<char> bits;
};
class Field {
public:
TSDataType::TSDataType dataType;
bool boolV;
int intV;
int64_t longV;
float floatV;
double doubleV;
std::string stringV;
explicit Field(TSDataType::TSDataType a) {
dataType = a;
}
Field() = default;
};
/*
* A tablet data of one device, the tablet contains multiple measurements of this device that share
* the same time column.
*
* for example: device root.sg1.d1
*
* time, m1, m2, m3
* 1, 1, 2, 3
* 2, 1, 2, 3
* 3, 1, 2, 3
*
* Notice: The tablet should not have empty cell
*
*/
class Tablet {
private:
static const int DEFAULT_ROW_SIZE = 1024;
void createColumns();
void deleteColumns();
public:
std::string deviceId; // deviceId of this tablet
std::vector<std::pair<std::string, TSDataType::TSDataType>> schemas; // the list of measurement schemas for creating the tablet
std::vector<int64_t> timestamps; // timestamps in this tablet
std::vector<void*> values; // each object is a primitive type array, which represents values of one measurement
std::vector<BitMap> bitMaps; // each bitmap represents the existence of each value in the current column
size_t rowSize; //the number of rows to include in this tablet
size_t maxRowNumber; // the maximum number of rows for this tablet
bool isAligned; // whether this tablet store data of aligned timeseries or not
Tablet() = default;
/**
* Return a tablet with default specified row number. This is the standard
* constructor (all Tablet should be the same size).
*
* @param deviceId the name of the device specified to be written in
* @param timeseries the list of measurement schemas for creating the tablet
*/
Tablet(const std::string &deviceId,
const std::vector<std::pair<std::string, TSDataType::TSDataType>> &timeseries) {
Tablet(deviceId, timeseries, DEFAULT_ROW_SIZE);
}
/**
* Return a tablet with the specified number of rows (maxBatchSize). Only
* call this constructor directly for testing purposes. Tablet should normally
* always be default size.
*
* @param deviceId the name of the device specified to be written in
* @param schemas the list of measurement schemas for creating the row
* batch
* @param maxRowNumber the maximum number of rows for this tablet
*/
Tablet(const std::string &deviceId, const std::vector<std::pair<std::string, TSDataType::TSDataType>> &schemas,
size_t maxRowNumber, bool _isAligned = false) : deviceId(deviceId), schemas(schemas),
maxRowNumber(maxRowNumber), isAligned(_isAligned) {
// create timestamp column
timestamps.resize(maxRowNumber);
// create value columns
values.resize(schemas.size());
createColumns();
// create bitMaps
bitMaps.resize(schemas.size());
for (size_t i = 0; i < schemas.size(); i++) {
bitMaps[i].resize(maxRowNumber);
}
this->rowSize = 0;
}
~Tablet() {
try {
deleteColumns();
} catch (exception &e) {
log_debug(string("Tablet::~Tablet(), ") + e.what());
}
}
void addValue(size_t schemaId, size_t rowIndex, void *value);
void reset(); // Reset Tablet to the default state - set the rowSize to 0
size_t getTimeBytesSize();
size_t getValueByteSize(); // total byte size that values occupies
void setAligned(bool isAligned);
};
class SessionUtils {
public:
static std::string getTime(const Tablet &tablet);
static std::string getValue(const Tablet &tablet);
};
class RowRecord {
public:
int64_t timestamp;
std::vector<Field> fields;
explicit RowRecord(int64_t timestamp) {
this->timestamp = timestamp;
}
RowRecord(int64_t timestamp, const std::vector<Field> &fields)
: timestamp(timestamp), fields(fields) {
}
explicit RowRecord(const std::vector<Field> &fields)
: timestamp(-1), fields(fields) {
}
RowRecord() {
this->timestamp = -1;
}
void addField(const Field &f) {
this->fields.push_back(f);
}
std::string toString() {
std::string ret;
if (this->timestamp != -1) {
ret.append(std::to_string(timestamp));
ret.append("\t");
}
for (size_t i = 0; i < fields.size(); i++) {
if (i != 0) {
ret.append("\t");
}
TSDataType::TSDataType dataType = fields[i].dataType;
switch (dataType) {
case TSDataType::BOOLEAN:
ret.append(fields[i].boolV ? "true" : "false");
break;
case TSDataType::INT32:
ret.append(std::to_string(fields[i].intV));
break;
case TSDataType::INT64:
ret.append(std::to_string(fields[i].longV));
break;
case TSDataType::FLOAT:
ret.append(std::to_string(fields[i].floatV));
break;
case TSDataType::DOUBLE:
ret.append(std::to_string(fields[i].doubleV));
break;
case TSDataType::TEXT:
ret.append(fields[i].stringV);
break;
case TSDataType::NULLTYPE:
ret.append("NULL");
break;
default:
break;
}
}
ret.append("\n");
return ret;
}
};
class SessionDataSet {
private:
bool hasCachedRecord = false;
std::string sql;
int64_t queryId;
int64_t statementId;
int64_t sessionId;
std::shared_ptr<IClientRPCServiceIf> client;
int batchSize = 1024;
std::vector<std::string> columnNameList;
std::vector<std::string> columnTypeDeduplicatedList;
// duplicated column index -> origin index
std::unordered_map<int, int> duplicateLocation;
// column name -> column location
std::unordered_map<std::string, int> columnMap;
// column size
int columnSize = 0;
bool isIgnoreTimeStamp = false;
int rowsIndex = 0; // used to record the row index in current TSQueryDataSet
std::shared_ptr<TSQueryDataSet> tsQueryDataSet;
MyStringBuffer tsQueryDataSetTimeBuffer;
std::vector<std::unique_ptr<MyStringBuffer>> valueBuffers;
std::vector<std::unique_ptr<MyStringBuffer>> bitmapBuffers;
RowRecord rowRecord;
char *currentBitmap = nullptr; // used to cache the current bitmap for every column
static const int flag = 0x80; // used to do `or` operation with bitmap to judge whether the value is null
bool operationIsOpen = false;
public:
SessionDataSet(const std::string &sql,
const std::vector<std::string> &columnNameList,
const std::vector<std::string> &columnTypeList,
std::map<std::string, int> &columnNameIndexMap,
bool isIgnoreTimeStamp,
int64_t queryId, int64_t statementId,
std::shared_ptr<IClientRPCServiceIf> client, int64_t sessionId,
const std::shared_ptr<TSQueryDataSet> &queryDataSet) : tsQueryDataSetTimeBuffer(queryDataSet->time) {
this->sessionId = sessionId;
this->sql = sql;
this->queryId = queryId;
this->statementId = statementId;
this->client = client;
this->columnNameList = columnNameList;
this->currentBitmap = new char[columnNameList.size()];
this->columnSize = (int)columnNameList.size();
this->isIgnoreTimeStamp = isIgnoreTimeStamp;
// column name -> column location
for (int i = 0; i < (int) columnNameList.size(); i++) {
std::string name = columnNameList[i];
if (this->columnMap.find(name) != this->columnMap.end()) {
duplicateLocation[i] = columnMap[name];
} else {
this->columnMap[name] = i;
this->columnTypeDeduplicatedList.push_back(columnTypeList[i]);
}
if (!columnNameIndexMap.empty()) {
this->valueBuffers.push_back(
std::unique_ptr<MyStringBuffer>(
new MyStringBuffer(queryDataSet->valueList[columnNameIndexMap[name]])));
this->bitmapBuffers.push_back(
std::unique_ptr<MyStringBuffer>(
new MyStringBuffer(queryDataSet->bitmapList[columnNameIndexMap[name]])));
} else {
this->valueBuffers.push_back(
std::unique_ptr<MyStringBuffer>(new MyStringBuffer(queryDataSet->valueList[columnMap[name]])));
this->bitmapBuffers.push_back(
std::unique_ptr<MyStringBuffer>(new MyStringBuffer(queryDataSet->bitmapList[columnMap[name]])));
}
}
this->tsQueryDataSet = queryDataSet;
operationIsOpen = true;
}
~SessionDataSet() {
try {
closeOperationHandle();
} catch (exception &e) {
log_debug(string("SessionDataSet::~SessionDataSet(), ") + e.what());
}
if (currentBitmap != nullptr) {
delete[] currentBitmap;
currentBitmap = nullptr;
}
}
int getBatchSize();
void setBatchSize(int batchSize);
std::vector<std::string> getColumnNames();
bool hasNext();
void constructOneRow();
bool isNull(int index, int rowNum);
RowRecord *next();
void closeOperationHandle(bool forceClose = false);
};
class TemplateNode {
public:
explicit TemplateNode(const std::string &name) : name_(name) {}
const std::string &getName() const {
return name_;
}
virtual const std::unordered_map<std::string, std::shared_ptr<TemplateNode>> &getChildren() const {
throw BatchExecutionException("Should call exact sub class!");
}
virtual bool isMeasurement() = 0;
virtual bool isAligned() {
throw BatchExecutionException("Should call exact sub class!");
}
virtual std::string serialize() const {
throw BatchExecutionException("Should call exact sub class!");
}
private:
std::string name_;
};
class MeasurementNode : public TemplateNode {
public:
MeasurementNode(const std::string &name_, TSDataType::TSDataType data_type_, TSEncoding::TSEncoding encoding_,
CompressionType::CompressionType compression_type_) : TemplateNode(name_) {
this->data_type_ = data_type_;
this->encoding_ = encoding_;
this->compression_type_ = compression_type_;
}
TSDataType::TSDataType getDataType() const {
return data_type_;
}
TSEncoding::TSEncoding getEncoding() const {
return encoding_;
}
CompressionType::CompressionType getCompressionType() const {
return compression_type_;
}
bool isMeasurement() override {
return true;
}
std::string serialize() const override;
private:
TSDataType::TSDataType data_type_;
TSEncoding::TSEncoding encoding_;
CompressionType::CompressionType compression_type_;
};
class InternalNode : public TemplateNode {
public:
InternalNode(const std::string &name, bool is_aligned) : TemplateNode(name), is_aligned_(is_aligned) {}
void addChild(const InternalNode &node) {
if (this->children_.count(node.getName())) {
throw BatchExecutionException("Duplicated child of node in template.");
}
this->children_[node.getName()] = std::make_shared<InternalNode>(node);
}
void addChild(const MeasurementNode &node) {
if (this->children_.count(node.getName())) {
throw BatchExecutionException("Duplicated child of node in template.");
}
this->children_[node.getName()] = std::make_shared<MeasurementNode>(node);
}
void deleteChild(const TemplateNode &node) {
this->children_.erase(node.getName());
}
const std::unordered_map<std::string, std::shared_ptr<TemplateNode>> &getChildren() const override {
return children_;
}
bool isMeasurement() override {
return false;
}
bool isAligned() override {
return is_aligned_;
}
private:
std::unordered_map<std::string, std::shared_ptr<TemplateNode>> children_;
bool is_aligned_;
};
namespace TemplateQueryType {
enum TemplateQueryType {
COUNT_MEASUREMENTS, IS_MEASUREMENT, PATH_EXIST, SHOW_MEASUREMENTS
};
}
class Template {
public:
Template(const std::string &name, bool is_aligned) : name_(name), is_aligned_(is_aligned) {}
const std::string &getName() const {
return name_;
}
bool isAligned() const {
return is_aligned_;
}
void addToTemplate(const InternalNode &child) {
if (this->children_.count(child.getName())) {
throw BatchExecutionException("Duplicated child of node in template.");
}
this->children_[child.getName()] = std::make_shared<InternalNode>(child);
}
void addToTemplate(const MeasurementNode &child) {
if (this->children_.count(child.getName())) {
throw BatchExecutionException("Duplicated child of node in template.");
}
this->children_[child.getName()] = std::make_shared<MeasurementNode>(child);
}
std::string serialize() const;
private:
std::string name_;
std::unordered_map<std::string, std::shared_ptr<TemplateNode>> children_;
bool is_aligned_;
};
class Session {
private:
std::string host;
int rpcPort;
std::string username;
std::string password;
const TSProtocolVersion::type protocolVersion = TSProtocolVersion::IOTDB_SERVICE_PROTOCOL_V3;
std::shared_ptr<IClientRPCServiceIf> client;
std::shared_ptr<TTransport> transport;
bool isClosed = true;
int64_t sessionId;
int64_t statementId;
std::string zoneId;
int fetchSize;
const static int DEFAULT_FETCH_SIZE = 10000;
const static int DEFAULT_TIMEOUT_MS = 0;
Version::Version version;
static bool checkSorted(const Tablet &tablet);
static bool checkSorted(const std::vector<int64_t> &times);
static void sortTablet(Tablet &tablet);
static void sortIndexByTimestamp(int *index, std::vector<int64_t> &timestamps, int length);
std::string getTimeZone();
void setTimeZone(const std::string &zoneId);
void appendValues(std::string &buffer, const char *value, int size);
void
putValuesIntoBuffer(const std::vector<TSDataType::TSDataType> &types, const std::vector<char *> &values,
std::string &buf);
int8_t getDataTypeNumber(TSDataType::TSDataType type);
struct TsCompare {
std::vector<int64_t> &timestamps;
explicit TsCompare(std::vector<int64_t> &inTimestamps) : timestamps(inTimestamps) {};
bool operator()(int i, int j) { return (timestamps[i] < timestamps[j]); };
};
std::string getVersionString(Version::Version version);
public:
Session(const std::string &host, int rpcPort) : username("user"), password("password"), version(Version::V_0_13) {
this->host = host;
this->rpcPort = rpcPort;
}
Session(const std::string &host, int rpcPort, const std::string &username, const std::string &password)
: fetchSize(10000) {
this->host = host;
this->rpcPort = rpcPort;
this->username = username;
this->password = password;
this->zoneId = "UTC+08:00";
this->version = Version::V_0_13;
}
Session(const std::string &host, int rpcPort, const std::string &username, const std::string &password,
int fetchSize) {
this->host = host;
this->rpcPort = rpcPort;
this->username = username;
this->password = password;
this->fetchSize = fetchSize;
this->zoneId = "UTC+08:00";
this->version = Version::V_0_13;
}
Session(const std::string &host, const std::string &rpcPort, const std::string &username = "user",
const std::string &password = "password", int fetchSize = 10000) {
this->host = host;
this->rpcPort = stoi(rpcPort);
this->username = username;
this->password = password;
this->fetchSize = fetchSize;
this->zoneId = "UTC+08:00";
this->version = Version::V_0_13;
}
~Session();
int64_t getSessionId();
void open();
void open(bool enableRPCCompression);
void open(bool enableRPCCompression, int connectionTimeoutInMs);
void close();
void insertRecord(const std::string &deviceId, int64_t time, const std::vector<std::string> &measurements,
const std::vector<std::string> &values);
void insertRecord(const std::string &deviceId, int64_t time, const std::vector<std::string> &measurements,
const std::vector<TSDataType::TSDataType> &types, const std::vector<char *> &values);
void insertAlignedRecord(const std::string &deviceId, int64_t time, const std::vector<std::string> &measurements,
const std::vector<std::string> &values);
void insertAlignedRecord(const std::string &deviceId, int64_t time, const std::vector<std::string> &measurements,
const std::vector<TSDataType::TSDataType> &types, const std::vector<char *> &values);
void insertRecords(const std::vector<std::string> &deviceIds,
const std::vector<int64_t> &times,
const std::vector<std::vector<std::string>> &measurementsList,
const std::vector<std::vector<std::string>> &valuesList);
void insertRecords(const std::vector<std::string> &deviceIds,
const std::vector<int64_t> &times,
const std::vector<std::vector<std::string>> &measurementsList,
const std::vector<std::vector<TSDataType::TSDataType>> &typesList,
const std::vector<std::vector<char *>> &valuesList);
void insertAlignedRecords(const std::vector<std::string> &deviceIds,
const std::vector<int64_t> &times,
const std::vector<std::vector<std::string>> &measurementsList,
const std::vector<std::vector<std::string>> &valuesList);
void insertAlignedRecords(const std::vector<std::string> &deviceIds,
const std::vector<int64_t> &times,
const std::vector<std::vector<std::string>> &measurementsList,
const std::vector<std::vector<TSDataType::TSDataType>> &typesList,
const std::vector<std::vector<char *>> &valuesList);
void insertRecordsOfOneDevice(const std::string &deviceId,
std::vector<int64_t> &times,
std::vector<std::vector<std::string>> &measurementsList,
std::vector<std::vector<TSDataType::TSDataType>> &typesList,
std::vector<std::vector<char *>> &valuesList);
void insertRecordsOfOneDevice(const std::string &deviceId,
std::vector<int64_t> &times,
std::vector<std::vector<std::string>> &measurementsList,
std::vector<std::vector<TSDataType::TSDataType>> &typesList,
std::vector<std::vector<char *>> &valuesList,
bool sorted);
void insertAlignedRecordsOfOneDevice(const std::string &deviceId,
std::vector<int64_t> &times,
std::vector<std::vector<std::string>> &measurementsList,
std::vector<std::vector<TSDataType::TSDataType>> &typesList,
std::vector<std::vector<char *>> &valuesList);
void insertAlignedRecordsOfOneDevice(const std::string &deviceId,
std::vector<int64_t> &times,
std::vector<std::vector<std::string>> &measurementsList,
std::vector<std::vector<TSDataType::TSDataType>> &typesList,
std::vector<std::vector<char *>> &valuesList,
bool sorted);
void insertTablet(Tablet &tablet);
void insertTablet(Tablet &tablet, bool sorted);
static void buildInsertTabletReq(TSInsertTabletReq &request, int64_t sessionId, Tablet &tablet, bool sorted);
void insertTablet(const TSInsertTabletReq &request);
void insertAlignedTablet(Tablet &tablet);
void insertAlignedTablet(Tablet &tablet, bool sorted);
void insertTablets(std::unordered_map<std::string, Tablet *> &tablets);
void insertTablets(std::unordered_map<std::string, Tablet *> &tablets, bool sorted);
void insertAlignedTablets(std::unordered_map<std::string, Tablet *> &tablets, bool sorted = false);
void testInsertRecord(const std::string &deviceId, int64_t time,
const std::vector<std::string> &measurements,
const std::vector<std::string> &values);
void testInsertTablet(const Tablet &tablet);
void testInsertRecords(const std::vector<std::string> &deviceIds,
const std::vector<int64_t> &times,
const std::vector<std::vector<std::string>> &measurementsList,
const std::vector<std::vector<std::string>> &valuesList);
void deleteTimeseries(const std::string &path);
void deleteTimeseries(const std::vector<std::string> &paths);
void deleteData(const std::string &path, int64_t time);
void deleteData(const std::vector<std::string> &deviceId, int64_t time);
void setStorageGroup(const std::string &storageGroupId);
void deleteStorageGroup(const std::string &storageGroup);
void deleteStorageGroups(const std::vector<std::string> &storageGroups);
void createTimeseries(const std::string &path, TSDataType::TSDataType dataType, TSEncoding::TSEncoding encoding,
CompressionType::CompressionType compressor);
void createTimeseries(const std::string &path, TSDataType::TSDataType dataType, TSEncoding::TSEncoding encoding,
CompressionType::CompressionType compressor,
std::map<std::string, std::string> *props, std::map<std::string, std::string> *tags,
std::map<std::string, std::string> *attributes,
const std::string &measurementAlias);
void createMultiTimeseries(const std::vector<std::string> &paths,
const std::vector<TSDataType::TSDataType> &dataTypes,
const std::vector<TSEncoding::TSEncoding> &encodings,
const std::vector<CompressionType::CompressionType> &compressors,
std::vector<std::map<std::string, std::string>> *propsList,
std::vector<std::map<std::string, std::string>> *tagsList,
std::vector<std::map<std::string, std::string>> *attributesList,
std::vector<std::string> *measurementAliasList);
void createAlignedTimeseries(const std::string &deviceId,
const std::vector<std::string> &measurements,
const std::vector<TSDataType::TSDataType> &dataTypes,
const std::vector<TSEncoding::TSEncoding> &encodings,
const std::vector<CompressionType::CompressionType> &compressors);
bool checkTimeseriesExists(const std::string &path);
std::unique_ptr<SessionDataSet> executeQueryStatement(const std::string &sql);
void executeNonQueryStatement(const std::string &sql);
void createSchemaTemplate(const Template &templ);
void setSchemaTemplate(const std::string &template_name, const std::string &prefix_path);
void unsetSchemaTemplate(const std::string &prefix_path, const std::string &template_name);
void addAlignedMeasurementsInTemplate(const std::string &template_name,
const std::vector<std::string> &measurements,
const std::vector<TSDataType::TSDataType> &dataTypes,
const std::vector<TSEncoding::TSEncoding> &encodings,
const std::vector<CompressionType::CompressionType> &compressors);
void addAlignedMeasurementsInTemplate(const std::string &template_name,
const std::string &measurement,
TSDataType::TSDataType dataType,
TSEncoding::TSEncoding encoding,
CompressionType::CompressionType compressor);
void addUnalignedMeasurementsInTemplate(const std::string &template_name,
const std::vector<std::string> &measurements,
const std::vector<TSDataType::TSDataType> &dataTypes,
const std::vector<TSEncoding::TSEncoding> &encodings,
const std::vector<CompressionType::CompressionType> &compressors);
void addUnalignedMeasurementsInTemplate(const std::string &template_name,
const std::string &measurement,
TSDataType::TSDataType dataType,
TSEncoding::TSEncoding encoding,
CompressionType::CompressionType compressor);
void deleteNodeInTemplate(const std::string &template_name, const std::string &path);
int countMeasurementsInTemplate(const std::string &template_name);
bool isMeasurementInTemplate(const std::string &template_name, const std::string &path);
bool isPathExistInTemplate(const std::string &template_name, const std::string &path);
std::vector<std::string> showMeasurementsInTemplate(const std::string &template_name);
std::vector<std::string> showMeasurementsInTemplate(const std::string &template_name, const std::string &pattern);
bool checkTemplateExists(const std::string &template_name);
};
#endif // IOTDB_SESSION_H