blob: 32da78397094afebfa075f1ab16127cebbb21e73 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef IOTDB_SESSION_H
#define IOTDB_SESSION_H
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include <exception>
#include <iostream>
#include <algorithm>
#include <map>
#include <unordered_map>
#include <unordered_set>
#include <stack>
#include <new>
#include <thread>
#include <mutex>
#include <stdexcept>
#include <cstdlib>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/protocol/TCompactProtocol.h>
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TTransportException.h>
#include <thrift/transport/TBufferTransports.h>
#include "IClientRPCService.h"
#include "AbstractSessionBuilder.h"
//== For compatible with Windows OS ==
#ifndef LONG_LONG_MIN
#define LONG_LONG_MIN 0x8000000000000000
#endif
using namespace std;
using ::apache::thrift::protocol::TBinaryProtocol;
using ::apache::thrift::protocol::TCompactProtocol;
using ::apache::thrift::transport::TSocket;
using ::apache::thrift::transport::TTransport;
using ::apache::thrift::transport::TTransportException;
using ::apache::thrift::transport::TBufferedTransport;
using ::apache::thrift::transport::TFramedTransport;
using ::apache::thrift::TException;
enum LogLevelType {
LEVEL_DEBUG = 0,
LEVEL_INFO,
LEVEL_WARN,
LEVEL_ERROR
};
extern LogLevelType LOG_LEVEL;
#define log_debug(fmt,...) do {if(LOG_LEVEL <= LEVEL_DEBUG) {string s=string("[DEBUG] %s:%d (%s) - ") + fmt + "\n"; printf(s.c_str(), __FILE__, __LINE__, __FUNCTION__, ##__VA_ARGS__);}} while(0)
#define log_info(fmt,...) do {if(LOG_LEVEL <= LEVEL_INFO) {string s=string("[INFO] %s:%d (%s) - ") + fmt + "\n"; printf(s.c_str(), __FILE__, __LINE__, __FUNCTION__, ##__VA_ARGS__);}} while(0)
#define log_warn(fmt,...) do {if(LOG_LEVEL <= LEVEL_WARN) {string s=string("[WARN] %s:%d (%s) - ") + fmt + "\n"; printf(s.c_str(), __FILE__, __LINE__, __FUNCTION__, ##__VA_ARGS__);}} while(0)
#define log_error(fmt,...) do {if(LOG_LEVEL <= LEVEL_ERROR) {string s=string("[ERROR] %s:%d (%s) - ") + fmt + "\n"; printf(s.c_str(), __FILE__, __LINE__, __FUNCTION__, ##__VA_ARGS__);}} while(0)
class IoTDBException : public std::exception {
public:
IoTDBException() {}
explicit IoTDBException(const std::string &m) : message(m) {}
explicit IoTDBException(const char *m) : message(m) {}
virtual const char *what() const noexcept override {
return message.c_str();
}
private:
std::string message;
};
class IoTDBConnectionException : public IoTDBException {
public:
IoTDBConnectionException() {}
explicit IoTDBConnectionException(const char *m) : IoTDBException(m) {}
explicit IoTDBConnectionException(const std::string &m) : IoTDBException(m) {}
};
class ExecutionException : public IoTDBException {
public:
ExecutionException() {}
explicit ExecutionException(const char *m) : IoTDBException(m) {}
explicit ExecutionException(const std::string &m) : IoTDBException(m) {}
explicit ExecutionException(const std::string &m, const TSStatus &tsStatus) : IoTDBException(m), status(tsStatus) {}
TSStatus status;
};
class BatchExecutionException : public IoTDBException {
public:
BatchExecutionException() {}
explicit BatchExecutionException(const char *m) : IoTDBException(m) {}
explicit BatchExecutionException(const std::string &m) : IoTDBException(m) {}
explicit BatchExecutionException(const std::vector <TSStatus> &statusList) : statusList(statusList) {}
BatchExecutionException(const std::string &m, const std::vector <TSStatus> &statusList) : IoTDBException(m), statusList(statusList) {}
std::vector<TSStatus> statusList;
};
class UnSupportedDataTypeException : public IoTDBException {
public:
UnSupportedDataTypeException() {}
explicit UnSupportedDataTypeException(const char *m) : IoTDBException(m) {}
explicit UnSupportedDataTypeException(const std::string &m) : IoTDBException("UnSupported dataType: " + m) {}
};
class SchemaNotFoundException : public IoTDBException {
public:
SchemaNotFoundException() {}
explicit SchemaNotFoundException(const char *m) : IoTDBException(m) {}
explicit SchemaNotFoundException(const std::string &m) : IoTDBException(m) {}
};
namespace Version {
enum Version {
V_0_12, V_0_13, V_1_0
};
}
namespace CompressionType {
enum CompressionType {
UNCOMPRESSED = (char) 0,
SNAPPY = (char) 1,
GZIP = (char) 2,
LZO = (char) 3,
SDT = (char) 4,
PAA = (char) 5,
PLA = (char) 6,
LZ4 = (char) 7,
ZSTD = (char) 8,
LZMA2 = (char) 9,
};
}
namespace TSDataType {
enum TSDataType {
BOOLEAN = (char) 0,
INT32 = (char) 1,
INT64 = (char) 2,
FLOAT = (char) 3,
DOUBLE = (char) 4,
TEXT = (char) 5,
VECTOR = (char) 6,
NULLTYPE = (char) 254,
INVALID_DATATYPE = (char) 255
};
}
namespace TSEncoding {
enum TSEncoding {
PLAIN = (char) 0,
DICTIONARY = (char) 1,
RLE = (char) 2,
DIFF = (char) 3,
TS_2DIFF = (char) 4,
BITMAP = (char) 5,
GORILLA_V1 = (char) 6,
REGULAR = (char) 7,
GORILLA = (char) 8,
ZIGZAG = (char) 9,
FREQ = (char) 10,
INVALID_ENCODING = (char) 255
};
}
namespace TSStatusCode {
enum TSStatusCode {
SUCCESS_STATUS = 200,
// System level
INCOMPATIBLE_VERSION = 201,
CONFIGURATION_ERROR = 202,
START_UP_ERROR = 203,
SHUT_DOWN_ERROR = 204,
// General Error
UNSUPPORTED_OPERATION = 300,
EXECUTE_STATEMENT_ERROR = 301,
MULTIPLE_ERROR = 302,
ILLEGAL_PARAMETER = 303,
OVERLAP_WITH_EXISTING_TASK = 304,
INTERNAL_SERVER_ERROR = 305,
// Client,
REDIRECTION_RECOMMEND = 400,
// Schema Engine
DATABASE_NOT_EXIST = 500,
DATABASE_ALREADY_EXISTS = 501,
SERIES_OVERFLOW = 502,
TIMESERIES_ALREADY_EXIST = 503,
TIMESERIES_IN_BLACK_LIST = 504,
ALIAS_ALREADY_EXIST = 505,
PATH_ALREADY_EXIST = 506,
METADATA_ERROR = 507,
PATH_NOT_EXIST = 508,
ILLEGAL_PATH = 509,
CREATE_TEMPLATE_ERROR = 510,
DUPLICATED_TEMPLATE = 511,
UNDEFINED_TEMPLATE = 512,
TEMPLATE_NOT_SET = 513,
DIFFERENT_TEMPLATE = 514,
TEMPLATE_IS_IN_USE = 515,
TEMPLATE_INCOMPATIBLE = 516,
SEGMENT_NOT_FOUND = 517,
PAGE_OUT_OF_SPACE = 518,
RECORD_DUPLICATED=519,
SEGMENT_OUT_OF_SPACE = 520,
PBTREE_FILE_NOT_EXISTS = 521,
OVERSIZE_RECORD = 522,
PBTREE_FILE_REDO_LOG_BROKEN = 523,
TEMPLATE_NOT_ACTIVATED = 524,
// Storage Engine
SYSTEM_READ_ONLY = 600,
STORAGE_ENGINE_ERROR = 601,
STORAGE_ENGINE_NOT_READY = 602,
};
}
class RpcUtils {
public:
std::shared_ptr<TSStatus> SUCCESS_STATUS;
RpcUtils() {
SUCCESS_STATUS = std::make_shared<TSStatus>();
SUCCESS_STATUS->__set_code(TSStatusCode::SUCCESS_STATUS);
}
static void verifySuccess(const TSStatus &status);
static void verifySuccess(const std::vector<TSStatus> &statuses);
static TSStatus getStatus(TSStatusCode::TSStatusCode tsStatusCode);
static TSStatus getStatus(int code, const std::string &message);
static std::shared_ptr<TSExecuteStatementResp> getTSExecuteStatementResp(TSStatusCode::TSStatusCode tsStatusCode);
static std::shared_ptr<TSExecuteStatementResp>
getTSExecuteStatementResp(TSStatusCode::TSStatusCode tsStatusCode, const std::string &message);
static std::shared_ptr<TSExecuteStatementResp> getTSExecuteStatementResp(const TSStatus &status);
static std::shared_ptr<TSFetchResultsResp> getTSFetchResultsResp(TSStatusCode::TSStatusCode tsStatusCode);
static std::shared_ptr<TSFetchResultsResp>
getTSFetchResultsResp(TSStatusCode::TSStatusCode tsStatusCode, const std::string &appendMessage);
static std::shared_ptr<TSFetchResultsResp> getTSFetchResultsResp(const TSStatus &status);
};
// Simulate the ByteBuffer class in Java
class MyStringBuffer {
public:
MyStringBuffer() : pos(0) {
checkBigEndian();
}
explicit MyStringBuffer(const std::string& str) : str(str), pos(0) {
checkBigEndian();
}
void reserve(size_t n) {
str.reserve(n);
}
void clear() {
str.clear();
pos = 0;
}
bool hasRemaining() {
return pos < str.size();
}
int getInt() {
return *(int *) getOrderedByte(4);
}
int64_t getInt64() {
#ifdef ARCH32
const char *buf_addr = getOrderedByte(8);
if (reinterpret_cast<uint32_t>(buf_addr) % 4 == 0) {
return *(int64_t *)buf_addr;
} else {
char tmp_buf[8];
memcpy(tmp_buf, buf_addr, 8);
return *(int64_t*)tmp_buf;
}
#else
return *(int64_t *) getOrderedByte(8);
#endif
}
float getFloat() {
return *(float *) getOrderedByte(4);
}
double getDouble() {
#ifdef ARCH32
const char *buf_addr = getOrderedByte(8);
if (reinterpret_cast<uint32_t>(buf_addr) % 4 == 0) {
return *(double*)buf_addr;
} else {
char tmp_buf[8];
memcpy(tmp_buf, buf_addr, 8);
return *(double*)tmp_buf;
}
#else
return *(double *) getOrderedByte(8);
#endif
}
char getChar() {
return str[pos++];
}
bool getBool() {
return getChar() == 1;
}
std::string getString() {
size_t len = getInt();
size_t tmpPos = pos;
pos += len;
return str.substr(tmpPos, len);
}
void putInt(int ins) {
putOrderedByte((char *) &ins, 4);
}
void putInt64(int64_t ins) {
putOrderedByte((char *) &ins, 8);
}
void putFloat(float ins) {
putOrderedByte((char *) &ins, 4);
}
void putDouble(double ins) {
putOrderedByte((char *) &ins, 8);
}
void putChar(char ins) {
str += ins;
}
void putBool(bool ins) {
char tmp = ins ? 1 : 0;
str += tmp;
}
void putString(const std::string &ins) {
putInt((int)(ins.size()));
str += ins;
}
void concat(const std::string &ins) {
str.append(ins);
}
public:
std::string str;
size_t pos;
private:
void checkBigEndian() {
static int chk = 0x0201; //used to distinguish CPU's type (BigEndian or LittleEndian)
isBigEndian = (0x01 != *(char *) (&chk));
}
const char *getOrderedByte(size_t len) {
const char *p = nullptr;
if (isBigEndian) {
p = str.c_str() + pos;
} else {
const char *tmp = str.c_str();
for (size_t i = pos; i < pos + len; i++) {
numericBuf[pos + len - 1 - i] = tmp[i];
}
p = numericBuf;
}
pos += len;
return p;
}
void putOrderedByte(char *buf, int len) {
if (isBigEndian) {
str.assign(buf, len);
} else {
for (int i = len - 1; i > -1; i--) {
str += buf[i];
}
}
}
private:
bool isBigEndian{};
char numericBuf[8]{}; //only be used by int, long, float, double etc.
};
class BitMap {
public:
/** Initialize a BitMap with given size. */
explicit BitMap(size_t size = 0) {
resize(size);
}
/** change the size */
void resize(size_t size) {
this->size = size;
this->bits.resize((size >> 3) + 1); // equal to "size/8 + 1"
reset();
}
/** mark as 1 at the given bit position. */
bool mark(size_t position) {
if (position >= size)
return false;
bits[position >> 3] |= (char) 1 << (position % 8);
return true;
}
/** mark as 0 at the given bit position. */
bool unmark(size_t position) {
if (position >= size)
return false;
bits[position >> 3] &= ~((char) 1 << (position % 8));
return true;
}
/** mark as 1 at all positions. */
void markAll() {
std::fill(bits.begin(), bits.end(), (char) 0XFF);
}
/** mark as 0 at all positions. */
void reset() {
std::fill(bits.begin(), bits.end(), (char) 0);
}
/** returns the value of the bit with the specified index. */
bool isMarked(size_t position) const {
if (position >= size)
return false;
return (bits[position >> 3] & ((char) 1 << (position % 8))) != 0;
}
/** whether all bits are zero, i.e., no Null value */
bool isAllUnmarked() const {
size_t j;
for (j = 0; j < size >> 3; j++) {
if (bits[j] != (char) 0) {
return false;
}
}
for (j = 0; j < size % 8; j++) {
if ((bits[size >> 3] & ((char) 1 << j)) != 0) {
return false;
}
}
return true;
}
/** whether all bits are one, i.e., all are Null */
bool isAllMarked() const {
size_t j;
for (j = 0; j < size >> 3; j++) {
if (bits[j] != (char) 0XFF) {
return false;
}
}
for (j = 0; j < size % 8; j++) {
if ((bits[size >> 3] & ((char) 1 << j)) == 0) {
return false;
}
}
return true;
}
const std::vector<char>& getByteArray() const {
return this->bits;
}
size_t getSize() const {
return this->size;
}
private:
size_t size;
std::vector<char> bits;
};
class Field {
public:
TSDataType::TSDataType dataType;
bool boolV;
int intV;
int64_t longV;
float floatV;
double doubleV;
std::string stringV;
explicit Field(TSDataType::TSDataType a) {
dataType = a;
}
Field() = default;
};
enum class ColumnCategory {
TAG,
FIELD,
ATTRIBUTE
};
template<typename T, typename Target>
void safe_cast(const T& value, Target &target) {
/*
Target Allowed Source Types
BOOLEAN BOOLEAN
INT32 INT32
INT64 INT32 INT64
FLOAT INT32 FLOAT
DOUBLE INT32 INT64 FLOAT DOUBLE
TEXT TEXT
*/
if (std::is_same<Target, T>::value) {
target = *(Target*)&value;
} else if (std::is_same<Target, string>::value && std::is_array<T>::value && std::is_same<char, typename std::remove_extent<T>::type>::value) {
string tmp((const char*)&value);
target = *(Target*)&tmp;
} else if (std::is_same<Target, int64_t>::value && std::is_same<T, int32_t>::value) {
int64_t tmp = *(int32_t*)&value;
target = *(Target*)&tmp;
} else if (std::is_same<Target, float>::value && std::is_same<T, int32_t>::value) {
float tmp = *(int32_t*)&value;
target = *(Target*)&tmp;
} else if (std::is_same<Target, double>::value && std::is_same<T, int32_t>::value) {
double tmp = *(int32_t*)&value;
target = *(Target*)&tmp;
} else if (std::is_same<Target, double>::value && std::is_same<T, int64_t>::value) {
double tmp = *(int64_t*)&value;
target = *(Target*)&tmp;
} else if (std::is_same<Target, double>::value && std::is_same<T, float>::value) {
double tmp = *(float*)&value;
target = *(Target*)&tmp;
} else {
throw UnSupportedDataTypeException("Error: Parameter type " +
std::string(typeid(T).name()) + " cannot be converted to DataType" +
std::string(typeid(Target).name()));
}
}
/*
* A tablet data of one device, the tablet contains multiple measurements of this device that share
* the same time column.
*
* for example: device root.sg1.d1
*
* time, m1, m2, m3
* 1, 1, 2, 3
* 2, 1, 2, 3
* 3, 1, 2, 3
*
* Notice: The tablet should not have empty cell
*
*/
class Tablet {
private:
static const int DEFAULT_ROW_SIZE = 1024;
void createColumns();
void deleteColumns();
public:
std::string deviceId; // deviceId of this tablet
std::vector<std::pair<std::string, TSDataType::TSDataType>> schemas; // the list of measurement schemas for creating the tablet
std::map<std::string, size_t> schemaNameIndex; // the map of schema name to index
std::vector<ColumnCategory> columnTypes; // the list of column types (used in table model)
std::vector<int64_t> timestamps; // timestamps in this tablet
std::vector<void*> values; // each object is a primitive type array, which represents values of one measurement
std::vector<BitMap> bitMaps; // each bitmap represents the existence of each value in the current column
size_t rowSize; //the number of rows to include in this tablet
size_t maxRowNumber; // the maximum number of rows for this tablet
bool isAligned; // whether this tablet store data of aligned timeseries or not
Tablet() = default;
/**
* Return a tablet with default specified row number. This is the standard
* constructor (all Tablet should be the same size).
*
* @param deviceId the name of the device specified to be written in
* @param timeseries the list of measurement schemas for creating the tablet
*/
Tablet(const std::string &deviceId,
const std::vector<std::pair<std::string, TSDataType::TSDataType>> &timeseries)
: Tablet(deviceId, timeseries, DEFAULT_ROW_SIZE) {}
Tablet(const std::string &deviceId,
const std::vector<std::pair<std::string, TSDataType::TSDataType>> &timeseries,
const std::vector<ColumnCategory> &columnTypes)
: Tablet(deviceId, timeseries, columnTypes, DEFAULT_ROW_SIZE) {}
/**
* Return a tablet with the specified number of rows (maxBatchSize). Only
* call this constructor directly for testing purposes. Tablet should normally
* always be default size.
*
* @param deviceId the name of the device specified to be written in
* @param schemas the list of measurement schemas for creating the row
* batch
* @param columnTypes the list of column types (used in table model)
* @param maxRowNumber the maximum number of rows for this tablet
*/
Tablet(const std::string &deviceId,
const std::vector<std::pair<std::string, TSDataType::TSDataType>> &schemas,
int maxRowNumber)
: Tablet(deviceId, schemas, std::vector<ColumnCategory>(schemas.size(), ColumnCategory::FIELD), maxRowNumber) {}
Tablet(const std::string &deviceId, const std::vector<std::pair<std::string, TSDataType::TSDataType>> &schemas,
const std::vector<ColumnCategory> columnTypes,
size_t maxRowNumber, bool _isAligned = false) : deviceId(deviceId), schemas(schemas), columnTypes(columnTypes),
maxRowNumber(maxRowNumber), isAligned(_isAligned) {
// create timestamp column
timestamps.resize(maxRowNumber);
// create value columns
values.resize(schemas.size());
createColumns();
// create bitMaps
bitMaps.resize(schemas.size());
for (size_t i = 0; i < schemas.size(); i++) {
bitMaps[i].resize(maxRowNumber);
}
// create schemaNameIndex
for (size_t i = 0; i < schemas.size(); i++) {
schemaNameIndex[schemas[i].first] = i;
}
this->rowSize = 0;
}
~Tablet() {
try {
deleteColumns();
} catch (exception &e) {
log_debug(string("Tablet::~Tablet(), ") + e.what());
}
}
template<typename T>
void addValue(size_t schemaId, size_t rowIndex, const T& value) {
if (schemaId >= schemas.size()) {
char tmpStr[100];
sprintf(tmpStr, "Tablet::addValue(), schemaId >= schemas.size(). schemaId=%ld, schemas.size()=%ld.", schemaId, schemas.size());
throw std::out_of_range(tmpStr);
}
if (rowIndex >= rowSize) {
char tmpStr[100];
sprintf(tmpStr, "Tablet::addValue(), rowIndex >= rowSize. rowIndex=%ld, rowSize.size()=%ld.", rowIndex, rowSize);
throw std::out_of_range(tmpStr);
}
TSDataType::TSDataType dataType = schemas[schemaId].second;
switch (dataType) {
case TSDataType::BOOLEAN: {
safe_cast<T, bool>(value, ((bool*)values[schemaId])[rowIndex]);
break;
}
case TSDataType::INT32: {
safe_cast<T, int>(value, ((int*)values[schemaId])[rowIndex]);
break;
}
case TSDataType::INT64: {
safe_cast<T, int64_t>(value, ((int64_t*)values[schemaId])[rowIndex]);
break;
}
case TSDataType::FLOAT: {
safe_cast<T, float>(value, ((float*)values[schemaId])[rowIndex]);
break;
}
case TSDataType::DOUBLE: {
safe_cast<T, double>(value, ((double*)values[schemaId])[rowIndex]);
break;
}
case TSDataType::TEXT: {
safe_cast<T, string>(value, ((string*)values[schemaId])[rowIndex]);
break;
}
default:
throw UnSupportedDataTypeException(string("Data type ") + to_string(dataType) + " is not supported.");
}
}
template<typename T>
void addValue(const string &schemaName, size_t rowIndex, const T& value) {
if (schemaNameIndex.find(schemaName) == schemaNameIndex.end()) {
throw SchemaNotFoundException(string("Schema ") + schemaName + " not found.");
}
size_t schemaId = schemaNameIndex[schemaName];
addValue(schemaId, rowIndex, value);
}
void reset(); // Reset Tablet to the default state - set the rowSize to 0
size_t getTimeBytesSize();
size_t getValueByteSize(); // total byte size that values occupies
void setAligned(bool isAligned);
};
class SessionUtils {
public:
static std::string getTime(const Tablet &tablet);
static std::string getValue(const Tablet &tablet);
};
class RowRecord {
public:
int64_t timestamp;
std::vector<Field> fields;
explicit RowRecord(int64_t timestamp) {
this->timestamp = timestamp;
}
RowRecord(int64_t timestamp, const std::vector<Field> &fields)
: timestamp(timestamp), fields(fields) {
}
explicit RowRecord(const std::vector<Field> &fields)
: timestamp(-1), fields(fields) {
}
RowRecord() {
this->timestamp = -1;
}
void addField(const Field &f) {
this->fields.push_back(f);
}
std::string toString() {
std::string ret;
if (this->timestamp != -1) {
ret.append(std::to_string(timestamp));
ret.append("\t");
}
for (size_t i = 0; i < fields.size(); i++) {
if (i != 0) {
ret.append("\t");
}
TSDataType::TSDataType dataType = fields[i].dataType;
switch (dataType) {
case TSDataType::BOOLEAN:
ret.append(fields[i].boolV ? "true" : "false");
break;
case TSDataType::INT32:
ret.append(std::to_string(fields[i].intV));
break;
case TSDataType::INT64:
ret.append(std::to_string(fields[i].longV));
break;
case TSDataType::FLOAT:
ret.append(std::to_string(fields[i].floatV));
break;
case TSDataType::DOUBLE:
ret.append(std::to_string(fields[i].doubleV));
break;
case TSDataType::TEXT:
ret.append(fields[i].stringV);
break;
case TSDataType::NULLTYPE:
ret.append("NULL");
break;
default:
break;
}
}
ret.append("\n");
return ret;
}
};
class SessionDataSet {
private:
const string TIMESTAMP_STR = "Time";
bool hasCachedRecord = false;
std::string sql;
int64_t queryId;
int64_t statementId;
int64_t sessionId;
std::shared_ptr<IClientRPCServiceIf> client;
int fetchSize = 1024;
std::vector<std::string> columnNameList;
std::vector<std::string> columnTypeList;
// duplicated column index -> origin index
std::unordered_map<int, int> duplicateLocation;
// column name -> column location
std::unordered_map<std::string, int> columnMap;
// column size
int columnSize = 0;
int columnFieldStartIndex = 0; //Except Timestamp column, 1st field's pos in columnNameList
bool isIgnoreTimeStamp = false;
int rowsIndex = 0; // used to record the row index in current TSQueryDataSet
std::shared_ptr<TSQueryDataSet> tsQueryDataSet;
MyStringBuffer tsQueryDataSetTimeBuffer;
std::vector<std::unique_ptr<MyStringBuffer>> valueBuffers;
std::vector<std::unique_ptr<MyStringBuffer>> bitmapBuffers;
RowRecord rowRecord;
char *currentBitmap = nullptr; // used to cache the current bitmap for every column
static const int flag = 0x80; // used to do `or` operation with bitmap to judge whether the value is null
bool operationIsOpen = false;
public:
SessionDataSet(const std::string &sql,
const std::vector<std::string> &columnNameList,
const std::vector<std::string> &columnTypeList,
std::map<std::string, int> &columnNameIndexMap,
bool isIgnoreTimeStamp,
int64_t queryId, int64_t statementId,
std::shared_ptr<IClientRPCServiceIf> client, int64_t sessionId,
const std::shared_ptr<TSQueryDataSet> &queryDataSet) : tsQueryDataSetTimeBuffer(queryDataSet->time) {
this->sessionId = sessionId;
this->sql = sql;
this->queryId = queryId;
this->statementId = statementId;
this->client = client;
this->currentBitmap = new char[columnNameList.size()];
this->isIgnoreTimeStamp = isIgnoreTimeStamp;
if (!isIgnoreTimeStamp) {
columnFieldStartIndex = 1;
this->columnNameList.push_back(TIMESTAMP_STR);
this->columnTypeList.push_back("INT64");
}
this->columnNameList.insert(this->columnNameList.end(), columnNameList.begin(), columnNameList.end());
this->columnTypeList.insert(this->columnTypeList.end(), columnTypeList.begin(), columnTypeList.end());
valueBuffers.reserve(queryDataSet->valueList.size());
bitmapBuffers.reserve(queryDataSet->bitmapList.size());
int deduplicateIdx = 0;
std::unordered_map<std::string, int> columnToFirstIndexMap;
for (size_t i = columnFieldStartIndex; i < this->columnNameList.size(); i++) {
std::string name = this->columnNameList[i];
if (this->columnMap.find(name) != this->columnMap.end()) {
duplicateLocation[i] = columnToFirstIndexMap[name];
} else {
columnToFirstIndexMap[name] = i;
if (!columnNameIndexMap.empty()) {
int valueIndex = columnNameIndexMap[name];
this->columnMap[name] = valueIndex;
this->valueBuffers.emplace_back(new MyStringBuffer(queryDataSet->valueList[valueIndex]));
this->bitmapBuffers.emplace_back(new MyStringBuffer(queryDataSet->bitmapList[valueIndex]));
} else {
this->columnMap[name] = deduplicateIdx;
this->valueBuffers.emplace_back(new MyStringBuffer(queryDataSet->valueList[deduplicateIdx]));
this->bitmapBuffers.emplace_back(new MyStringBuffer(queryDataSet->bitmapList[deduplicateIdx]));
}
deduplicateIdx++;
}
}
this->tsQueryDataSet = queryDataSet;
operationIsOpen = true;
}
~SessionDataSet() {
try {
closeOperationHandle();
} catch (exception &e) {
log_debug(string("SessionDataSet::~SessionDataSet(), ") + e.what());
}
if (currentBitmap != nullptr) {
delete[] currentBitmap;
currentBitmap = nullptr;
}
}
int getFetchSize();
void setFetchSize(int fetchSize);
std::vector<std::string> getColumnNames();
std::vector<std::string> getColumnTypeList();
bool hasNext();
void constructOneRow();
bool isNull(int index, int rowNum);
RowRecord *next();
void closeOperationHandle(bool forceClose = false);
};
class TemplateNode {
public:
explicit TemplateNode(const std::string &name) : name_(name) {}
const std::string &getName() const {
return name_;
}
virtual const std::unordered_map<std::string, std::shared_ptr<TemplateNode>> &getChildren() const {
throw BatchExecutionException("Should call exact sub class!");
}
virtual bool isMeasurement() = 0;
virtual bool isAligned() {
throw BatchExecutionException("Should call exact sub class!");
}
virtual std::string serialize() const {
throw BatchExecutionException("Should call exact sub class!");
}
private:
std::string name_;
};
class MeasurementNode : public TemplateNode {
public:
MeasurementNode(const std::string &name_, TSDataType::TSDataType data_type_, TSEncoding::TSEncoding encoding_,
CompressionType::CompressionType compression_type_) : TemplateNode(name_) {
this->data_type_ = data_type_;
this->encoding_ = encoding_;
this->compression_type_ = compression_type_;
}
TSDataType::TSDataType getDataType() const {
return data_type_;
}
TSEncoding::TSEncoding getEncoding() const {
return encoding_;
}
CompressionType::CompressionType getCompressionType() const {
return compression_type_;
}
bool isMeasurement() override {
return true;
}
std::string serialize() const override;
private:
TSDataType::TSDataType data_type_;
TSEncoding::TSEncoding encoding_;
CompressionType::CompressionType compression_type_;
};
class InternalNode : public TemplateNode {
public:
InternalNode(const std::string &name, bool is_aligned) : TemplateNode(name), is_aligned_(is_aligned) {}
void addChild(const InternalNode &node) {
if (this->children_.count(node.getName())) {
throw BatchExecutionException("Duplicated child of node in template.");
}
this->children_[node.getName()] = std::make_shared<InternalNode>(node);
}
void addChild(const MeasurementNode &node) {
if (this->children_.count(node.getName())) {
throw BatchExecutionException("Duplicated child of node in template.");
}
this->children_[node.getName()] = std::make_shared<MeasurementNode>(node);
}
void deleteChild(const TemplateNode &node) {
this->children_.erase(node.getName());
}
const std::unordered_map<std::string, std::shared_ptr<TemplateNode>> &getChildren() const override {
return children_;
}
bool isMeasurement() override {
return false;
}
bool isAligned() override {
return is_aligned_;
}
private:
std::unordered_map<std::string, std::shared_ptr<TemplateNode>> children_;
bool is_aligned_;
};
namespace TemplateQueryType {
enum TemplateQueryType {
COUNT_MEASUREMENTS, IS_MEASUREMENT, PATH_EXIST, SHOW_MEASUREMENTS
};
}
class Template {
public:
Template(const std::string &name, bool is_aligned) : name_(name), is_aligned_(is_aligned) {}
const std::string &getName() const {
return name_;
}
bool isAligned() const {
return is_aligned_;
}
void addToTemplate(const InternalNode &child) {
if (this->children_.count(child.getName())) {
throw BatchExecutionException("Duplicated child of node in template.");
}
this->children_[child.getName()] = std::make_shared<InternalNode>(child);
}
void addToTemplate(const MeasurementNode &child) {
if (this->children_.count(child.getName())) {
throw BatchExecutionException("Duplicated child of node in template.");
}
this->children_[child.getName()] = std::make_shared<MeasurementNode>(child);
}
std::string serialize() const;
private:
std::string name_;
std::unordered_map<std::string, std::shared_ptr<TemplateNode>> children_;
bool is_aligned_;
};
class Session {
private:
std::string host;
int rpcPort;
std::string username;
std::string password;
const TSProtocolVersion::type protocolVersion = TSProtocolVersion::IOTDB_SERVICE_PROTOCOL_V3;
std::shared_ptr<IClientRPCServiceIf> client;
std::shared_ptr<TTransport> transport;
bool isClosed = true;
int64_t sessionId;
int64_t statementId;
std::string zoneId;
int fetchSize;
const static int DEFAULT_FETCH_SIZE = 10000;
const static int DEFAULT_TIMEOUT_MS = 0;
Version::Version version;
std::string sqlDialect = "tree"; // default sql dialect
std::string database;
private:
static bool checkSorted(const Tablet &tablet);
static bool checkSorted(const std::vector<int64_t> &times);
static void sortTablet(Tablet &tablet);
static void sortIndexByTimestamp(int *index, std::vector<int64_t> &timestamps, int length);
void appendValues(std::string &buffer, const char *value, int size);
void
putValuesIntoBuffer(const std::vector<TSDataType::TSDataType> &types, const std::vector<char *> &values,
std::string &buf);
int8_t getDataTypeNumber(TSDataType::TSDataType type);
struct TsCompare {
std::vector<int64_t> &timestamps;
explicit TsCompare(std::vector<int64_t> &inTimestamps) : timestamps(inTimestamps) {};
bool operator()(int i, int j) { return (timestamps[i] < timestamps[j]); };
};
std::string getVersionString(Version::Version version);
void initZoneId();
public:
Session(const std::string &host, int rpcPort) : username("root"), password("root"), version(Version::V_1_0) {
this->host = host;
this->rpcPort = rpcPort;
initZoneId();
}
Session(const std::string &host, int rpcPort, const std::string &username, const std::string &password)
: fetchSize(DEFAULT_FETCH_SIZE) {
this->host = host;
this->rpcPort = rpcPort;
this->username = username;
this->password = password;
this->version = Version::V_1_0;
initZoneId();
}
Session(const std::string &host, int rpcPort, const std::string &username, const std::string &password,
const std::string &zoneId, int fetchSize = DEFAULT_FETCH_SIZE) {
this->host = host;
this->rpcPort = rpcPort;
this->username = username;
this->password = password;
this->zoneId = zoneId;
this->fetchSize = fetchSize;
this->version = Version::V_1_0;
initZoneId();
}
Session(const std::string &host, const std::string &rpcPort, const std::string &username = "user",
const std::string &password = "password", const std::string &zoneId="", int fetchSize = DEFAULT_FETCH_SIZE) {
this->host = host;
this->rpcPort = stoi(rpcPort);
this->username = username;
this->password = password;
this->zoneId = zoneId;
this->fetchSize = fetchSize;
this->version = Version::V_1_0;
initZoneId();
}
Session(AbstractSessionBuilder* builder) {
this->host = builder->host;
this->rpcPort = builder->rpcPort;
this->username = builder->username;
this->password = builder->password;
this->zoneId = builder->zoneId;
this->fetchSize = builder->fetchSize;
this->version = Version::V_1_0;
this->sqlDialect = builder->sqlDialect;
this->database = builder->database;
initZoneId();
}
~Session();
void setSqlDialect(const std::string &dialect){
this->sqlDialect = dialect;
}
void setDatabase(const std::string &database) {
this->database = database;
}
string getDatabase() {
return database;
}
void changeDatabase(string database) {
this->database = database;
}
int64_t getSessionId();
void open();
void open(bool enableRPCCompression);
void open(bool enableRPCCompression, int connectionTimeoutInMs);
void close();
void setTimeZone(const std::string &zoneId);
std::string getTimeZone();
void insertRecord(const std::string &deviceId, int64_t time, const std::vector<std::string> &measurements,
const std::vector<std::string> &values);
void insertRecord(const std::string &deviceId, int64_t time, const std::vector<std::string> &measurements,
const std::vector<TSDataType::TSDataType> &types, const std::vector<char *> &values);
void insertAlignedRecord(const std::string &deviceId, int64_t time, const std::vector<std::string> &measurements,
const std::vector<std::string> &values);
void insertAlignedRecord(const std::string &deviceId, int64_t time, const std::vector<std::string> &measurements,
const std::vector<TSDataType::TSDataType> &types, const std::vector<char *> &values);
void insertRecords(const std::vector<std::string> &deviceIds,
const std::vector<int64_t> &times,
const std::vector<std::vector<std::string>> &measurementsList,
const std::vector<std::vector<std::string>> &valuesList);
void insertRecords(const std::vector<std::string> &deviceIds,
const std::vector<int64_t> &times,
const std::vector<std::vector<std::string>> &measurementsList,
const std::vector<std::vector<TSDataType::TSDataType>> &typesList,
const std::vector<std::vector<char *>> &valuesList);
void insertAlignedRecords(const std::vector<std::string> &deviceIds,
const std::vector<int64_t> &times,
const std::vector<std::vector<std::string>> &measurementsList,
const std::vector<std::vector<std::string>> &valuesList);
void insertAlignedRecords(const std::vector<std::string> &deviceIds,
const std::vector<int64_t> &times,
const std::vector<std::vector<std::string>> &measurementsList,
const std::vector<std::vector<TSDataType::TSDataType>> &typesList,
const std::vector<std::vector<char *>> &valuesList);
void insertRecordsOfOneDevice(const std::string &deviceId,
std::vector<int64_t> &times,
std::vector<std::vector<std::string>> &measurementsList,
std::vector<std::vector<TSDataType::TSDataType>> &typesList,
std::vector<std::vector<char *>> &valuesList);
void insertRecordsOfOneDevice(const std::string &deviceId,
std::vector<int64_t> &times,
std::vector<std::vector<std::string>> &measurementsList,
std::vector<std::vector<TSDataType::TSDataType>> &typesList,
std::vector<std::vector<char *>> &valuesList,
bool sorted);
void insertAlignedRecordsOfOneDevice(const std::string &deviceId,
std::vector<int64_t> &times,
std::vector<std::vector<std::string>> &measurementsList,
std::vector<std::vector<TSDataType::TSDataType>> &typesList,
std::vector<std::vector<char *>> &valuesList);
void insertAlignedRecordsOfOneDevice(const std::string &deviceId,
std::vector<int64_t> &times,
std::vector<std::vector<std::string>> &measurementsList,
std::vector<std::vector<TSDataType::TSDataType>> &typesList,
std::vector<std::vector<char *>> &valuesList,
bool sorted);
void insertTablet(Tablet &tablet);
void insertTablet(Tablet &tablet, bool sorted);
void insertRelationalTablet(Tablet &tablet);
void insertRelationalTablet(Tablet &tablet, bool sorted);
static void buildInsertTabletReq(TSInsertTabletReq &request, int64_t sessionId, Tablet &tablet, bool sorted);
void insertTablet(const TSInsertTabletReq &request);
void insertAlignedTablet(Tablet &tablet);
void insertAlignedTablet(Tablet &tablet, bool sorted);
void insertTablets(std::unordered_map<std::string, Tablet *> &tablets);
void insertTablets(std::unordered_map<std::string, Tablet *> &tablets, bool sorted);
void insertAlignedTablets(std::unordered_map<std::string, Tablet *> &tablets, bool sorted = false);
void testInsertRecord(const std::string &deviceId, int64_t time,
const std::vector<std::string> &measurements,
const std::vector<std::string> &values);
void testInsertTablet(const Tablet &tablet);
void testInsertRecords(const std::vector<std::string> &deviceIds,
const std::vector<int64_t> &times,
const std::vector<std::vector<std::string>> &measurementsList,
const std::vector<std::vector<std::string>> &valuesList);
void deleteTimeseries(const std::string &path);
void deleteTimeseries(const std::vector<std::string> &paths);
void deleteData(const std::string &path, int64_t endTime);
void deleteData(const std::vector<std::string> &paths, int64_t endTime);
void deleteData(const std::vector<std::string> &paths, int64_t startTime, int64_t endTime);
void setStorageGroup(const std::string &storageGroupId);
void deleteStorageGroup(const std::string &storageGroup);
void deleteStorageGroups(const std::vector<std::string> &storageGroups);
void createDatabase(const std::string &database);
void deleteDatabase(const std::string &database);
void deleteDatabases(const std::vector<std::string> &databases);
void createTimeseries(const std::string &path, TSDataType::TSDataType dataType, TSEncoding::TSEncoding encoding,
CompressionType::CompressionType compressor);
void createTimeseries(const std::string &path, TSDataType::TSDataType dataType, TSEncoding::TSEncoding encoding,
CompressionType::CompressionType compressor,
std::map<std::string, std::string> *props, std::map<std::string, std::string> *tags,
std::map<std::string, std::string> *attributes,
const std::string &measurementAlias);
void createMultiTimeseries(const std::vector<std::string> &paths,
const std::vector<TSDataType::TSDataType> &dataTypes,
const std::vector<TSEncoding::TSEncoding> &encodings,
const std::vector<CompressionType::CompressionType> &compressors,
std::vector<std::map<std::string, std::string>> *propsList,
std::vector<std::map<std::string, std::string>> *tagsList,
std::vector<std::map<std::string, std::string>> *attributesList,
std::vector<std::string> *measurementAliasList);
void createAlignedTimeseries(const std::string &deviceId,
const std::vector<std::string> &measurements,
const std::vector<TSDataType::TSDataType> &dataTypes,
const std::vector<TSEncoding::TSEncoding> &encodings,
const std::vector<CompressionType::CompressionType> &compressors);
bool checkTimeseriesExists(const std::string &path);
std::unique_ptr<SessionDataSet> executeQueryStatement(const std::string &sql) ;
std::unique_ptr<SessionDataSet> executeQueryStatement(const std::string &sql, int64_t timeoutInMs) ;
void executeNonQueryStatement(const std::string &sql);
std::unique_ptr<SessionDataSet> executeRawDataQuery(const std::vector<std::string> &paths, int64_t startTime, int64_t endTime);
std::unique_ptr<SessionDataSet> executeLastDataQuery(const std::vector<std::string> &paths);
std::unique_ptr<SessionDataSet> executeLastDataQuery(const std::vector<std::string> &paths, int64_t lastTime);
void createSchemaTemplate(const Template &templ);
void setSchemaTemplate(const std::string &template_name, const std::string &prefix_path);
void unsetSchemaTemplate(const std::string &prefix_path, const std::string &template_name);
void addAlignedMeasurementsInTemplate(const std::string &template_name,
const std::vector<std::string> &measurements,
const std::vector<TSDataType::TSDataType> &dataTypes,
const std::vector<TSEncoding::TSEncoding> &encodings,
const std::vector<CompressionType::CompressionType> &compressors);
void addAlignedMeasurementsInTemplate(const std::string &template_name,
const std::string &measurement,
TSDataType::TSDataType dataType,
TSEncoding::TSEncoding encoding,
CompressionType::CompressionType compressor);
void addUnalignedMeasurementsInTemplate(const std::string &template_name,
const std::vector<std::string> &measurements,
const std::vector<TSDataType::TSDataType> &dataTypes,
const std::vector<TSEncoding::TSEncoding> &encodings,
const std::vector<CompressionType::CompressionType> &compressors);
void addUnalignedMeasurementsInTemplate(const std::string &template_name,
const std::string &measurement,
TSDataType::TSDataType dataType,
TSEncoding::TSEncoding encoding,
CompressionType::CompressionType compressor);
void deleteNodeInTemplate(const std::string &template_name, const std::string &path);
int countMeasurementsInTemplate(const std::string &template_name);
bool isMeasurementInTemplate(const std::string &template_name, const std::string &path);
bool isPathExistInTemplate(const std::string &template_name, const std::string &path);
std::vector<std::string> showMeasurementsInTemplate(const std::string &template_name);
std::vector<std::string> showMeasurementsInTemplate(const std::string &template_name, const std::string &pattern);
bool checkTemplateExists(const std::string &template_name);
};
#endif // IOTDB_SESSION_H