blob: 54dd353750149419c176406bd4b6849bd9f49c07 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <string>
#include <vector>
#include <exception>
#include <iostream>
#include <algorithm>
#include <map>
#include <utility>
#include <memory>
#include <new>
#include <thread>
#include <mutex>
#include <stdexcept>
#include <cstdlib>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/protocol/TCompactProtocol.h>
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TTransportException.h>
#include <thrift/transport/TBufferTransports.h>
#include "TSIService.h"
using ::apache::thrift::protocol::TBinaryProtocol;
using ::apache::thrift::protocol::TCompactProtocol;
using ::apache::thrift::transport::TSocket;
using ::apache::thrift::transport::TTransport;
using ::apache::thrift::transport::TTransportException;
using ::apache::thrift::transport::TBufferedTransport;
using ::apache::thrift::transport::TFramedTransport;
using ::apache::thrift::TException;
class IoTDBConnectionException : public std::exception {
public:
IoTDBConnectionException() : message() {}
IoTDBConnectionException(const char *m) : message(m) {}
IoTDBConnectionException(std::string m) : message(m) {}
virtual const char *what() const throw() {
return message.c_str();
}
private:
std::string message;
};
class BatchExecutionException : public std::exception {
public:
BatchExecutionException() : message() {}
BatchExecutionException(const char *m) : message(m) {}
BatchExecutionException(std::string m) : message(m) {}
BatchExecutionException(std::vector <TSStatus> statusList) : message(), statusList(statusList) {}
BatchExecutionException(std::vector <TSStatus> statusList, std::string m) : message(m), statusList(statusList) {}
virtual const char *what() const throw() {
return message.c_str();
}
std::vector <TSStatus> statusList;
private:
std::string message;
};
class UnSupportedDataTypeException : public std::exception {
private:
std::string message;
public:
UnSupportedDataTypeException() : message() {}
UnSupportedDataTypeException(const char *m) : message(m) {}
UnSupportedDataTypeException(std::string m) : message("UnSupported dataType: " + m) {}
};
namespace CompressionType {
enum CompressionType {
UNCOMPRESSED, SNAPPY, GZIP, LZO, SDT, PAA, PLA, LZ4
};
}
namespace TSDataType {
enum TSDataType {
BOOLEAN, INT32, INT64, FLOAT, DOUBLE, TEXT, NULLTYPE
};
}
namespace TSEncoding {
enum TSEncoding {
PLAIN = 0,
PLAIN_DICTIONARY = 1,
RLE = 2,
DIFF = 3,
TS_2DIFF = 4,
BITMAP = 5,
GORILLA_V1 = 6,
REGULAR = 7,
GORILLA = 8
};
}
namespace TSStatusCode {
enum TSStatusCode {
SUCCESS_STATUS = 200,
STILL_EXECUTING_STATUS = 201,
INVALID_HANDLE_STATUS = 202,
NODE_DELETE_FAILED_ERROR = 298,
ALIAS_ALREADY_EXIST_ERROR = 299,
PATH_ALREADY_EXIST_ERROR = 300,
PATH_NOT_EXIST_ERROR = 301,
UNSUPPORTED_FETCH_METADATA_OPERATION_ERROR = 302,
METADATA_ERROR = 303,
OUT_OF_TTL_ERROR = 305,
CONFIG_ADJUSTER = 306,
MERGE_ERROR = 307,
SYSTEM_CHECK_ERROR = 308,
SYNC_DEVICE_OWNER_CONFLICT_ERROR = 309,
SYNC_CONNECTION_EXCEPTION = 310,
STORAGE_GROUP_PROCESSOR_ERROR = 311,
STORAGE_GROUP_ERROR = 312,
STORAGE_ENGINE_ERROR = 313,
TSFILE_PROCESSOR_ERROR = 314,
PATH_ILLEGAL = 315,
LOAD_FILE_ERROR = 316,
STORAGE_GROUP_NOT_READY = 317,
EXECUTE_STATEMENT_ERROR = 400,
SQL_PARSE_ERROR = 401,
GENERATE_TIME_ZONE_ERROR = 402,
SET_TIME_ZONE_ERROR = 403,
NOT_STORAGE_GROUP_ERROR = 404,
QUERY_NOT_ALLOWED = 405,
AST_FORMAT_ERROR = 406,
LOGICAL_OPERATOR_ERROR = 407,
LOGICAL_OPTIMIZE_ERROR = 408,
UNSUPPORTED_FILL_TYPE_ERROR = 409,
PATH_ERROR = 410,
QUERY_PROCESS_ERROR = 411,
WRITE_PROCESS_ERROR = 412,
INTERNAL_SERVER_ERROR = 500,
CLOSE_OPERATION_ERROR = 501,
READ_ONLY_SYSTEM_ERROR = 502,
DISK_SPACE_INSUFFICIENT_ERROR = 503,
START_UP_ERROR = 504,
SHUT_DOWN_ERROR = 505,
MULTIPLE_ERROR = 506,
WRONG_LOGIN_PASSWORD_ERROR = 600,
NOT_LOGIN_ERROR = 601,
NO_PERMISSION_ERROR = 602,
UNINITIALIZED_AUTH_ERROR = 603,
PARTITION_NOT_READY = 700,
TIME_OUT = 701,
NO_LEADER = 702,
UNSUPPORTED_OPERATION = 703,
NODE_READ_ONLY = 704,
INCOMPATIBLE_VERSION = 203,
};
}
class RpcUtils {
public:
std::shared_ptr <TSStatus> SUCCESS_STATUS;
RpcUtils() {
SUCCESS_STATUS = std::make_shared<TSStatus>();
SUCCESS_STATUS->__set_code(TSStatusCode::SUCCESS_STATUS);
}
static void verifySuccess(TSStatus &status);
static void verifySuccess(std::vector <TSStatus> &statuses);
static TSStatus getStatus(TSStatusCode::TSStatusCode tsStatusCode);
static TSStatus getStatus(int code, std::string message);
static std::shared_ptr <TSExecuteStatementResp> getTSExecuteStatementResp(TSStatusCode::TSStatusCode tsStatusCode);
static std::shared_ptr <TSExecuteStatementResp>
getTSExecuteStatementResp(TSStatusCode::TSStatusCode tsStatusCode, std::string message);
static std::shared_ptr <TSExecuteStatementResp> getTSExecuteStatementResp(TSStatus &status);
static std::shared_ptr <TSFetchResultsResp> getTSFetchResultsResp(TSStatusCode::TSStatusCode tsStatusCode);
static std::shared_ptr <TSFetchResultsResp>
getTSFetchResultsResp(TSStatusCode::TSStatusCode tsStatusCode, std::string appendMessage);
static std::shared_ptr <TSFetchResultsResp> getTSFetchResultsResp(TSStatus &status);
};
// Simulate the ByteBuffer class in Java
class MyStringBuffer {
private:
char *getchar(int len) {
char *ret = new char[len];
for (int i = pos; i < pos + len; i++)
ret[pos + len - 1 - i] = str[i];
pos += len;
return ret;
}
void putchar(int len, char *ins) {
for (int i = len - 1; i > -1; i--)
str += ins[i];
}
public:
std::string str;
int pos;
bool hasRemaining() {
return pos < str.size();
}
MyStringBuffer() {}
MyStringBuffer(std::string str) {
this->str = str;
this->pos = 0;
}
int getInt() {
char *data = getchar(4);
int ret = *(int *) data;
delete[]data;
return ret;
}
int64_t getLong() {
char *data = getchar(8);
int64_t ret = *(int64_t *) data;
delete[]data;
return ret;
}
float getFloat() {
char *data = getchar(4);
float ret = *(float *) data;
delete[]data;
return ret;
}
double getDouble() {
char *data = getchar(8);
double ret = *(double *) data;
delete[]data;
return ret;
}
char getChar() {
char *data = getchar(1);
char ret = *(char *) data;
delete[]data;
return ret;
}
bool getBool() {
return getChar() == 1;
}
std::string getString() {
int len = getInt();
std::string ret;
for (int i = 0; i < len; i++) ret.append(1, getChar());
return ret;
}
void putInt(int ins) {
char *data = (char *) &ins;
putchar(4, data);
}
void putLong(int64_t ins) {
char *data = (char *) &ins;
putchar(8, data);
}
void putFloat(float ins) {
char *data = (char *) &ins;
putchar(4, data);
}
void putDouble(double ins) {
char *data = (char *) &ins;
putchar(8, data);
}
void putChar(char ins) {
char *data = (char *) &ins;
putchar(1, data);
}
void putBool(bool ins) {
char tmp = 0;
if (ins) tmp = 1;
putChar(tmp);
}
void putString(std::string ins) {
int len = ins.size();
putInt(len);
for (int i = 0; i < len; i++) putChar(ins[i]);
}
};
class Field {
public:
TSDataType::TSDataType dataType;
bool boolV;
int intV;
int64_t longV;
float floatV;
double doubleV;
std::string stringV;
Field(TSDataType::TSDataType a) {
dataType = a;
}
Field() {}
};
/*
* A tablet data of one device, the tablet contains multiple measurements of this device that share
* the same time column.
*
* for example: device root.sg1.d1
*
* time, m1, m2, m3
* 1, 1, 2, 3
* 2, 1, 2, 3
* 3, 1, 2, 3
*
* Notice: The tablet should not have empty cell
*
*/
class Tablet {
private:
static const int DEFAULT_SIZE = 1024;
public:
std::string deviceId; // deviceId of this tablet
std::vector <std::pair<std::string, TSDataType::TSDataType>> schemas; // the list of measurement schemas for creating the tablet
std::vector <int64_t> timestamps; //timestamps in this tablet
std::vector <std::vector<std::string>> values;
int rowSize; //the number of rows to include in this tablet
int maxRowNumber; // the maximum number of rows for this tablet
Tablet() {}
/**
* Return a tablet with default specified row number. This is the standard
* constructor (all Tablet should be the same size).
*
* @param deviceId the name of the device specified to be written in
* @param timeseries the list of measurement schemas for creating the tablet
*/
Tablet(std::string deviceId, std::vector <std::pair<std::string, TSDataType::TSDataType>> &timeseries) {
Tablet(deviceId, timeseries, DEFAULT_SIZE);
}
/**
* Return a tablet with the specified number of rows (maxBatchSize). Only
* call this constructor directly for testing purposes. Tablet should normally
* always be default size.
*
* @param deviceId the name of the device specified to be written in
* @param schemas the list of measurement schemas for creating the row
* batch
* @param maxRowNumber the maximum number of rows for this tablet
*/
Tablet(std::string deviceId, std::vector <std::pair<std::string, TSDataType::TSDataType>> &schemas,
int maxRowNumber) {
this->deviceId = deviceId;
this->schemas = schemas;
this->maxRowNumber = maxRowNumber;
// create timestamp column
timestamps.resize(maxRowNumber);
// create value columns
values.resize(schemas.size());
for (int i = 0; i < schemas.size(); i++) {
values[i].resize(maxRowNumber);
}
this->rowSize = 0;
}
void reset(); // Reset Tablet to the default state - set the rowSize to 0
void createColumns();
int getTimeBytesSize();
int getValueByteSize(); // total byte size that values occupies
};
class SessionUtils {
public:
static std::string getTime(Tablet &tablet);
static std::string getValue(Tablet &tablet);
};
class RowRecord {
public:
int64_t timestamp;
std::vector <Field> fields;
RowRecord(int64_t timestamp) {
this->timestamp = timestamp;
}
RowRecord(int64_t timestamp, std::vector <Field> &fields) {
this->timestamp = timestamp;
this->fields = fields;
}
RowRecord() {
this->timestamp = -1;
}
void addField(Field &f) {
this->fields.push_back(f);
}
std::string toString() {
std::string ret = std::to_string(timestamp);
for (int i = 0; i < fields.size(); i++) {
ret.append("\t");
TSDataType::TSDataType dataType = fields[i].dataType;
switch (dataType) {
case TSDataType::BOOLEAN: {
std::string field = fields[i].boolV ? "true" : "false";
ret.append(field);
break;
}
case TSDataType::INT32: {
ret.append(std::to_string(fields[i].intV));
break;
}
case TSDataType::INT64: {
ret.append(std::to_string(fields[i].longV));
break;
}
case TSDataType::FLOAT: {
ret.append(std::to_string(fields[i].floatV));
break;
}
case TSDataType::DOUBLE: {
ret.append(std::to_string(fields[i].doubleV));
break;
}
case TSDataType::TEXT: {
ret.append(fields[i].stringV);
break;
}
case TSDataType::NULLTYPE: {
ret.append("NULL");
}
}
}
ret.append("\n");
return ret;
}
};
class SessionDataSet {
private:
bool hasCachedRecord = false;
std::string sql;
int64_t queryId;
int64_t statementId;
int64_t sessionId;
std::shared_ptr <TSIServiceIf> client;
int batchSize = 1024;
std::vector <std::string> columnNameList;
std::vector <std::string> columnTypeDeduplicatedList;
// duplicated column index -> origin index
std::map<int, int> duplicateLocation;
// column name -> column location
std::map<std::string, int> columnMap;
// column size
int columnSize = 0;
int rowsIndex = 0; // used to record the row index in current TSQueryDataSet
std::shared_ptr <TSQueryDataSet> tsQueryDataSet;
MyStringBuffer tsQueryDataSetTimeBuffer;
std::vector <std::unique_ptr<MyStringBuffer>> valueBuffers;
std::vector <std::unique_ptr<MyStringBuffer>> bitmapBuffers;
RowRecord rowRecord;
char *currentBitmap = NULL; // used to cache the current bitmap for every column
static const int flag = 0x80; // used to do `or` operation with bitmap to judge whether the value is null
public:
SessionDataSet() {}
SessionDataSet(std::string sql, std::vector <std::string> &columnNameList,
std::vector <std::string> &columnTypeList, int64_t queryId, int64_t statementId,
std::shared_ptr <TSIServiceIf> client, int64_t sessionId,
std::shared_ptr <TSQueryDataSet> queryDataSet) : tsQueryDataSetTimeBuffer(queryDataSet->time) {
this->sessionId = sessionId;
this->sql = sql;
this->queryId = queryId;
this->statementId = statementId;
this->client = client;
this->columnNameList = columnNameList;
this->currentBitmap = new char[columnNameList.size()];
this->columnSize = columnNameList.size();
// column name -> column location
for (int i = 0; i < columnNameList.size(); i++) {
std::string name = columnNameList[i];
if (this->columnMap.find(name) != this->columnMap.end()) {
duplicateLocation[i] = columnMap[name];
} else {
this->columnMap[name] = i;
this->columnTypeDeduplicatedList.push_back(columnTypeList[i]);
}
this->valueBuffers.push_back(
std::unique_ptr<MyStringBuffer>(new MyStringBuffer(queryDataSet->valueList[i])));
this->bitmapBuffers.push_back(
std::unique_ptr<MyStringBuffer>(new MyStringBuffer(queryDataSet->bitmapList[i])));
}
this->tsQueryDataSet = queryDataSet;
}
~SessionDataSet() {
if (currentBitmap != NULL) {
delete[] currentBitmap;
currentBitmap = NULL;
}
}
int getBatchSize();
void setBatchSize(int batchSize);
std::vector <std::string> getColumnNames();
bool hasNext();
void constructOneRow();
bool isNull(int index, int rowNum);
RowRecord *next();
void closeOperationHandle();
};
template<typename T>
std::vector <T> sortList(std::vector <T> &valueList, int *index, int indexLength) {
std::vector <T> sortedValues(valueList.size());
for (int i = 0; i < indexLength; i++) {
sortedValues[i] = valueList[index[i]];
}
return sortedValues;
}
class Session {
private:
std::string host;
int rpcPort;
std::string username;
std::string password;
TSProtocolVersion::type protocolVersion = TSProtocolVersion::IOTDB_SERVICE_PROTOCOL_V3;
std::shared_ptr <TSIServiceIf> client;
std::shared_ptr <apache::thrift::transport::TSocket> transport;
bool isClosed = true;
int64_t sessionId;
int64_t statementId;
std::string zoneId;
int fetchSize;
const static int DEFAULT_FETCH_SIZE = 10000;
const static int DEFAULT_TIMEOUT_MS = 0;
bool checkSorted(Tablet &tablet);
bool checkSorted(std::vector <int64_t> &times);
void sortTablet(Tablet &tablet);
void sortIndexByTimestamp(int *index, std::vector <int64_t> &timestamps, int length);
std::string getTimeZone();
void setTimeZone(std::string zoneId);
void appendValues(std::string &buffer, char *value, int size);
void
putValuesIntoBuffer(std::vector <TSDataType::TSDataType> &types, std::vector<char *> &values, std::string &buf);
int8_t getDataTypeNumber(TSDataType::TSDataType type);
public:
Session(std::string host, int rpcPort) : username("user"), password("password") {
this->host = host;
this->rpcPort = rpcPort;
}
Session(std::string host, int rpcPort, std::string username, std::string password)
: fetchSize(10000) {
this->host = host;
this->rpcPort = rpcPort;
this->username = username;
this->password = password;
this->zoneId = "UTC+08:00";
}
Session(std::string host, int rpcPort, std::string username, std::string password, int fetchSize) {
this->host = host;
this->rpcPort = rpcPort;
this->username = username;
this->password = password;
this->fetchSize = fetchSize;
this->zoneId = "UTC+08:00";
}
Session(std::string host, std::string rpcPort, std::string username = "user",
std::string password = "password", int fetchSize = 10000) {
this->host = host;
this->rpcPort = stoi(rpcPort);
this->username = username;
this->password = password;
this->fetchSize = fetchSize;
this->zoneId = "UTC+08:00";
}
void open();
void open(bool enableRPCCompression);
void open(bool enableRPCCompression, int connectionTimeoutInMs);
void close();
void insertRecord(std::string deviceId, int64_t time, std::vector <std::string> &measurements,
std::vector <std::string> &values);
void insertRecord(std::string deviceId, int64_t time, std::vector <std::string> &measurements,
std::vector <TSDataType::TSDataType> &types, std::vector<char *> &values);
void insertRecords(std::vector <std::string> &deviceIds, std::vector <int64_t> &times,
std::vector <std::vector<std::string>> &measurementsList,
std::vector <std::vector<std::string>> &valuesList);
void insertRecords(std::vector <std::string> &deviceIds, std::vector <int64_t> &times,
std::vector <std::vector<std::string>> &measurementsList,
std::vector <std::vector<TSDataType::TSDataType>> typesList,
std::vector <std::vector<char *>> &valuesList);
void insertRecordsOfOneDevice(std::string deviceId, std::vector <int64_t> &times,
std::vector <std::vector<std::string>> measurementsList,
std::vector <std::vector<TSDataType::TSDataType>> typesList,
std::vector <std::vector<char *>> &valuesList);
void insertRecordsOfOneDevice(std::string deviceId, std::vector <int64_t> &times,
std::vector <std::vector<std::string>> measurementsList,
std::vector <std::vector<TSDataType::TSDataType>> typesList,
std::vector <std::vector<char *>> &valuesList, bool sorted);
void insertTablet(Tablet &tablet);
void insertTablet(Tablet &tablet, bool sorted);
void insertTablets(std::map<std::string, Tablet *> &tablets);
void insertTablets(std::map<std::string, Tablet *> &tablets, bool sorted);
void testInsertRecord(std::string deviceId, int64_t time, std::vector <std::string> &measurements,
std::vector <std::string> &values);
void testInsertTablet(Tablet &tablet);
void testInsertRecords(std::vector <std::string> &deviceIds, std::vector <int64_t> &times,
std::vector <std::vector<std::string>> &measurementsList,
std::vector <std::vector<std::string>> &valuesList);
void deleteTimeseries(std::string path);
void deleteTimeseries(std::vector <std::string> &paths);
void deleteData(std::string path, int64_t time);
void deleteData(std::vector <std::string> &deviceId, int64_t time);
void setStorageGroup(std::string storageGroupId);
void deleteStorageGroup(std::string storageGroup);
void deleteStorageGroups(std::vector <std::string> &storageGroups);
void createTimeseries(std::string path, TSDataType::TSDataType dataType, TSEncoding::TSEncoding encoding,
CompressionType::CompressionType compressor);
void createTimeseries(std::string path, TSDataType::TSDataType dataType, TSEncoding::TSEncoding encoding,
CompressionType::CompressionType compressor,
std::map <std::string, std::string> *props, std::map <std::string, std::string> *tags,
std::map <std::string, std::string> *attributes, std::string measurementAlias);
void createMultiTimeseries(std::vector <std::string> paths, std::vector <TSDataType::TSDataType> dataTypes,
std::vector <TSEncoding::TSEncoding> encodings,
std::vector <CompressionType::CompressionType> compressors,
std::vector <std::map<std::string, std::string>> *propsList,
std::vector <std::map<std::string, std::string>> *tagsList,
std::vector <std::map<std::string, std::string>> *attributesList,
std::vector <std::string> *measurementAliasList);
bool checkTimeseriesExists(std::string path);
std::unique_ptr <SessionDataSet> executeQueryStatement(std::string sql);
void executeNonQueryStatement(std::string sql);
};