blob: d9b9edb1ebf18e4d1e4c702d630a4578b80995c9 [file]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "Session.h"
#include "SessionDataSet.h"
#include "SessionImpl.h"
#include "ThriftConvert.h"
#include <algorithm>
#include <cstring>
#include <future>
#include <memory>
#include <stack>
#include <thrift/transport/TTransportException.h>
#include <time.h>
#include <unordered_set>
using apache::thrift::transport::TTransportException;
using namespace std;
/**
* Timeout of query can be set by users.
* A negative number means using the default configuration of server.
* And value 0 will disable the function of query timeout.
*/
static const int64_t QUERY_TIMEOUT_MS = -1;
TSDataType::TSDataType getTSDataTypeFromString(const string &str) {
// BOOLEAN, INT32, INT64, FLOAT, DOUBLE, TEXT, STRING, BLOB, TIMESTAMP, DATE,
// NULLTYPE
if (str == "BOOLEAN") {
return TSDataType::BOOLEAN;
} else if (str == "INT32") {
return TSDataType::INT32;
} else if (str == "INT64") {
return TSDataType::INT64;
} else if (str == "FLOAT") {
return TSDataType::FLOAT;
} else if (str == "DOUBLE") {
return TSDataType::DOUBLE;
} else if (str == "TEXT") {
return TSDataType::TEXT;
} else if (str == "TIMESTAMP") {
return TSDataType::TIMESTAMP;
} else if (str == "DATE") {
return TSDataType::DATE;
} else if (str == "BLOB") {
return TSDataType::BLOB;
} else if (str == "STRING") {
return TSDataType::STRING;
} else if (str == "OBJECT") {
return TSDataType::OBJECT;
}
return TSDataType::UNKNOWN;
}
void Tablet::createColumns() {
for (size_t i = 0; i < schemas.size(); i++) {
TSDataType::TSDataType dataType = schemas[i].second;
switch (dataType) {
case TSDataType::BOOLEAN:
values[i] = new bool[maxRowNumber];
break;
case TSDataType::DATE:
values[i] = new IoTDBDate[maxRowNumber];
break;
case TSDataType::INT32:
values[i] = new int[maxRowNumber];
break;
case TSDataType::TIMESTAMP:
case TSDataType::INT64:
values[i] = new int64_t[maxRowNumber];
break;
case TSDataType::FLOAT:
values[i] = new float[maxRowNumber];
break;
case TSDataType::DOUBLE:
values[i] = new double[maxRowNumber];
break;
case TSDataType::STRING:
case TSDataType::BLOB:
case TSDataType::OBJECT:
case TSDataType::TEXT:
values[i] = new string[maxRowNumber];
break;
default:
throw UnSupportedDataTypeException(
string("Data type ") + to_string(dataType) + " is not supported.");
}
}
}
void Tablet::deleteColumns() {
for (size_t i = 0; i < schemas.size(); i++) {
if (!values[i])
continue;
TSDataType::TSDataType dataType = schemas[i].second;
switch (dataType) {
case TSDataType::BOOLEAN: {
bool *valueBuf = (bool *)(values[i]);
delete[] valueBuf;
break;
}
case TSDataType::INT32: {
int *valueBuf = (int *)(values[i]);
delete[] valueBuf;
break;
}
case TSDataType::DATE: {
IoTDBDate *valueBuf = (IoTDBDate *)(values[i]);
delete[] valueBuf;
break;
}
case TSDataType::TIMESTAMP:
case TSDataType::INT64: {
int64_t *valueBuf = (int64_t *)(values[i]);
delete[] valueBuf;
break;
}
case TSDataType::FLOAT: {
float *valueBuf = (float *)(values[i]);
delete[] valueBuf;
break;
}
case TSDataType::DOUBLE: {
double *valueBuf = (double *)(values[i]);
delete[] valueBuf;
break;
}
case TSDataType::STRING:
case TSDataType::BLOB:
case TSDataType::OBJECT:
case TSDataType::TEXT: {
string *valueBuf = (string *)(values[i]);
delete[] valueBuf;
break;
}
default:
throw UnSupportedDataTypeException(
string("Data type ") + to_string(dataType) + " is not supported.");
}
values[i] = nullptr;
}
}
void Tablet::deepCopyTabletColValue(void *const *srcPtr, void **destPtr,
TSDataType::TSDataType type,
int maxRowNumber) {
void *src = *srcPtr;
switch (type) {
case TSDataType::BOOLEAN:
*destPtr = new bool[maxRowNumber];
memcpy(*destPtr, src, maxRowNumber * sizeof(bool));
break;
case TSDataType::INT32:
*destPtr = new int32_t[maxRowNumber];
memcpy(*destPtr, src, maxRowNumber * sizeof(int32_t));
break;
case TSDataType::INT64:
case TSDataType::TIMESTAMP:
*destPtr = new int64_t[maxRowNumber];
memcpy(*destPtr, src, maxRowNumber * sizeof(int64_t));
break;
case TSDataType::FLOAT:
*destPtr = new float[maxRowNumber];
memcpy(*destPtr, src, maxRowNumber * sizeof(float));
break;
case TSDataType::DOUBLE:
*destPtr = new double[maxRowNumber];
memcpy(*destPtr, src, maxRowNumber * sizeof(double));
break;
case TSDataType::DATE: {
*destPtr = new IoTDBDate[maxRowNumber];
IoTDBDate *srcDate = static_cast<IoTDBDate *>(src);
IoTDBDate *destDate = static_cast<IoTDBDate *>(*destPtr);
for (size_t j = 0; j < maxRowNumber; ++j) {
destDate[j] = srcDate[j];
}
break;
}
case TSDataType::STRING:
case TSDataType::TEXT:
case TSDataType::OBJECT:
case TSDataType::BLOB: {
*destPtr = new std::string[maxRowNumber];
std::string *srcStr = static_cast<std::string *>(src);
std::string *destStr = static_cast<std::string *>(*destPtr);
for (size_t j = 0; j < maxRowNumber; ++j) {
destStr[j] = srcStr[j];
}
break;
}
default:
break;
}
}
void Tablet::reset() {
rowSize = 0;
for (size_t i = 0; i < schemas.size(); i++) {
bitMaps[i].reset();
}
}
size_t Tablet::getTimeBytesSize() { return rowSize * 8; }
size_t Tablet::getValueByteSize() {
size_t valueOccupation = 0;
for (size_t i = 0; i < schemas.size(); i++) {
switch (schemas[i].second) {
case TSDataType::BOOLEAN:
valueOccupation += rowSize;
break;
case TSDataType::INT32:
valueOccupation += rowSize * 4;
break;
case TSDataType::DATE:
valueOccupation += rowSize * 4;
break;
case TSDataType::TIMESTAMP:
case TSDataType::INT64:
valueOccupation += rowSize * 8;
break;
case TSDataType::FLOAT:
valueOccupation += rowSize * 4;
break;
case TSDataType::DOUBLE:
valueOccupation += rowSize * 8;
break;
case TSDataType::STRING:
case TSDataType::BLOB:
case TSDataType::OBJECT:
case TSDataType::TEXT: {
valueOccupation += rowSize * 4;
string *valueBuf = (string *)(values[i]);
for (size_t j = 0; j < rowSize; j++) {
valueOccupation += valueBuf[j].size();
}
break;
}
default:
throw UnSupportedDataTypeException(string("Data type ") +
to_string(schemas[i].second) +
" is not supported.");
}
}
return valueOccupation;
}
void Tablet::setAligned(bool isAligned) { this->isAligned = isAligned; }
string SessionUtils::getTime(const Tablet &tablet) {
MyStringBuffer timeBuffer;
unsigned int n = 8u * tablet.rowSize;
if (n > timeBuffer.str.capacity()) {
timeBuffer.reserve(n);
}
for (size_t i = 0; i < tablet.rowSize; i++) {
timeBuffer.putInt64(tablet.timestamps[i]);
}
return timeBuffer.str;
}
string SessionUtils::getValue(const Tablet &tablet) {
MyStringBuffer valueBuffer;
unsigned int n = 8u * tablet.schemas.size() * tablet.rowSize;
if (n > valueBuffer.str.capacity()) {
valueBuffer.reserve(n);
}
for (size_t i = 0; i < tablet.schemas.size(); i++) {
TSDataType::TSDataType dataType = tablet.schemas[i].second;
const BitMap &bitMap = tablet.bitMaps[i];
switch (dataType) {
case TSDataType::BOOLEAN: {
bool *valueBuf = (bool *)(tablet.values[i]);
for (size_t index = 0; index < tablet.rowSize; index++) {
if (!bitMap.isMarked(index)) {
valueBuffer.putBool(valueBuf[index]);
} else {
valueBuffer.putBool(false);
}
}
break;
}
case TSDataType::INT32: {
int *valueBuf = (int *)(tablet.values[i]);
for (size_t index = 0; index < tablet.rowSize; index++) {
if (!bitMap.isMarked(index)) {
valueBuffer.putInt(valueBuf[index]);
} else {
valueBuffer.putInt((numeric_limits<int32_t>::min)());
}
}
break;
}
case TSDataType::DATE: {
IoTDBDate *valueBuf = (IoTDBDate *)(tablet.values[i]);
for (size_t index = 0; index < tablet.rowSize; index++) {
if (!bitMap.isMarked(index)) {
valueBuffer.putDate(valueBuf[index]);
} else {
valueBuffer.putInt(EMPTY_DATE_INT);
}
}
break;
}
case TSDataType::TIMESTAMP:
case TSDataType::INT64: {
int64_t *valueBuf = (int64_t *)(tablet.values[i]);
for (size_t index = 0; index < tablet.rowSize; index++) {
if (!bitMap.isMarked(index)) {
valueBuffer.putInt64(valueBuf[index]);
} else {
valueBuffer.putInt64((numeric_limits<int64_t>::min)());
}
}
break;
}
case TSDataType::FLOAT: {
float *valueBuf = (float *)(tablet.values[i]);
for (size_t index = 0; index < tablet.rowSize; index++) {
if (!bitMap.isMarked(index)) {
valueBuffer.putFloat(valueBuf[index]);
} else {
valueBuffer.putFloat((numeric_limits<float>::min)());
}
}
break;
}
case TSDataType::DOUBLE: {
double *valueBuf = (double *)(tablet.values[i]);
for (size_t index = 0; index < tablet.rowSize; index++) {
if (!bitMap.isMarked(index)) {
valueBuffer.putDouble(valueBuf[index]);
} else {
valueBuffer.putDouble((numeric_limits<double>::min)());
}
}
break;
}
case TSDataType::STRING:
case TSDataType::BLOB:
case TSDataType::OBJECT:
case TSDataType::TEXT: {
string *valueBuf = (string *)(tablet.values[i]);
for (size_t index = 0; index < tablet.rowSize; index++) {
if (!bitMap.isMarked(index)) {
valueBuffer.putString(valueBuf[index]);
} else {
valueBuffer.putString("");
}
}
break;
}
default:
throw UnSupportedDataTypeException(
string("Data type ") + to_string(dataType) + " is not supported.");
}
}
for (size_t i = 0; i < tablet.schemas.size(); i++) {
const BitMap &bitMap = tablet.bitMaps[i];
bool columnHasNull = !bitMap.isAllUnmarked();
valueBuffer.putChar(columnHasNull ? (char)1 : (char)0);
if (columnHasNull) {
const vector<char> &bytes = bitMap.getByteArray();
for (size_t index = 0; index < tablet.rowSize / 8 + 1; index++) {
valueBuffer.putChar(bytes[index]);
}
}
}
return valueBuffer.str;
}
string MeasurementNode::serialize() const {
MyStringBuffer buffer;
buffer.putString(getName());
buffer.putChar(getDataType());
buffer.putChar(getEncoding());
buffer.putChar(getCompressionType());
return buffer.str;
}
string Template::serialize() const {
MyStringBuffer buffer;
std::stack<std::pair<std::string, std::shared_ptr<TemplateNode>>> nodeStack;
unordered_set<string> alignedPrefix;
buffer.putString(getName());
buffer.putBool(isAligned());
if (isAligned()) {
alignedPrefix.emplace("");
}
for (const auto &child : children_) {
nodeStack.push(make_pair("", child.second));
}
while (!nodeStack.empty()) {
auto cur = nodeStack.top();
nodeStack.pop();
string prefix = cur.first;
shared_ptr<TemplateNode> cur_node_ptr = cur.second;
string fullPath(prefix);
if (!cur_node_ptr->isMeasurement()) {
if (!prefix.empty()) {
fullPath.append(".");
}
fullPath.append(cur_node_ptr->getName());
if (cur_node_ptr->isAligned()) {
alignedPrefix.emplace(fullPath);
}
for (const auto &child : cur_node_ptr->getChildren()) {
nodeStack.push(make_pair(fullPath, child.second));
}
} else {
buffer.putString(prefix);
buffer.putBool(alignedPrefix.find(prefix) != alignedPrefix.end());
buffer.concat(cur_node_ptr->serialize());
}
}
return buffer.str;
}
Session::Session(const std::string &host, int rpcPort) : impl_(new Impl()) {
impl_->username_ = "root";
impl_->password_ = "root";
impl_->version = Version::V_1_0;
impl_->host_ = host;
impl_->rpcPort_ = rpcPort;
impl_->initZoneId();
impl_->initNodesSupplier();
}
Session::Session(const std::vector<std::string> &nodeUrls,
const std::string &username, const std::string &password)
: impl_(new Impl()) {
impl_->nodeUrls_ = nodeUrls;
impl_->username_ = username;
impl_->password_ = password;
impl_->version = Version::V_1_0;
impl_->initZoneId();
impl_->initNodesSupplier(impl_->nodeUrls_);
}
Session::Session(const std::string &host, int rpcPort,
const std::string &username, const std::string &password)
: impl_(new Impl()) {
impl_->host_ = host;
impl_->rpcPort_ = rpcPort;
impl_->username_ = username;
impl_->password_ = password;
impl_->fetchSize_ = iotdb::session::DEFAULT_FETCH_SIZE;
impl_->version = Version::V_1_0;
impl_->initZoneId();
impl_->initNodesSupplier();
}
Session::Session(const std::string &host, int rpcPort,
const std::string &username, const std::string &password,
const std::string &zoneId, int fetchSize)
: impl_(new Impl()) {
impl_->host_ = host;
impl_->rpcPort_ = rpcPort;
impl_->username_ = username;
impl_->password_ = password;
impl_->zoneId_ = zoneId;
impl_->fetchSize_ = fetchSize;
impl_->version = Version::V_1_0;
impl_->initZoneId();
impl_->initNodesSupplier();
}
Session::Session(const std::string &host, const std::string &rpcPort,
const std::string &username, const std::string &password,
const std::string &zoneId, int fetchSize)
: impl_(new Impl()) {
impl_->host_ = host;
impl_->rpcPort_ = stoi(rpcPort);
impl_->username_ = username;
impl_->password_ = password;
impl_->zoneId_ = zoneId;
impl_->fetchSize_ = fetchSize;
impl_->version = Version::V_1_0;
impl_->initZoneId();
impl_->initNodesSupplier();
}
Session::Session(AbstractSessionBuilder *builder) : impl_(new Impl()) {
impl_->host_ = builder->host;
impl_->rpcPort_ = builder->rpcPort;
impl_->username_ = builder->username;
impl_->password_ = builder->password;
impl_->zoneId_ = builder->zoneId;
impl_->fetchSize_ = builder->fetchSize;
impl_->version = Version::V_1_0;
impl_->sqlDialect_ = builder->sqlDialect;
impl_->database_ = builder->database;
impl_->enableAutoFetch_ = builder->enableAutoFetch;
impl_->enableRedirection_ = builder->enableRedirections;
impl_->connectTimeoutMs_ = builder->connectTimeoutMs;
impl_->nodeUrls_ = builder->nodeUrls;
impl_->useSSL_ = builder->useSSL;
impl_->trustCertFilePath_ = builder->trustCertFilePath;
impl_->initZoneId();
impl_->initNodesSupplier(impl_->nodeUrls_);
}
/**
* When delete variable, make sure release all resource.
*/
Session::~Session() {
try {
close();
} catch (const exception &e) {
log_debug(e.what());
}
}
void Session::Impl::removeBrokenSessionConnection(
shared_ptr<SessionConnection> sessionConnection) {
if (enableRedirection_) {
this->endPointToSessionConnection.erase(sessionConnection->getEndPoint());
}
auto it1 = deviceIdToEndpoint.begin();
while (it1 != deviceIdToEndpoint.end()) {
if (it1->second == sessionConnection->getEndPoint()) {
it1 = deviceIdToEndpoint.erase(it1);
} else {
++it1;
}
}
}
/**
* check whether the batch has been sorted
*
* @return whether the batch has been sorted
*/
bool Session::Impl::checkSorted(const Tablet &tablet) {
for (size_t i = 1; i < tablet.rowSize; i++) {
if (tablet.timestamps[i] < tablet.timestamps[i - 1]) {
return false;
}
}
return true;
}
bool Session::Impl::checkSorted(const vector<int64_t> &times) {
for (size_t i = 1; i < times.size(); i++) {
if (times[i] < times[i - 1]) {
return false;
}
}
return true;
}
template <typename T>
std::vector<T> sortList(const std::vector<T> &valueList, const int *index,
int indexLength) {
std::vector<T> sortedValues(valueList.size());
for (int i = 0; i < indexLength; i++) {
sortedValues[i] = valueList[index[i]];
}
return sortedValues;
}
template <typename T>
void sortValuesList(T *valueList, const int *index, size_t indexLength) {
T *sortedValues = new T[indexLength];
for (int i = 0; i < indexLength; i++) {
sortedValues[i] = valueList[index[i]];
}
for (int i = 0; i < indexLength; i++) {
valueList[i] = sortedValues[i];
}
delete[] sortedValues;
}
void Session::Impl::sortTablet(Tablet &tablet) {
/*
* following part of code sort the batch data by time,
* so we can insert continuous data in value list to get a better performance
*/
// sort to get index, and use index to sort value list
int *index = new int[tablet.rowSize];
for (size_t i = 0; i < tablet.rowSize; i++) {
index[i] = i;
}
sortIndexByTimestamp(index, tablet.timestamps, tablet.rowSize);
tablet.timestamps = sortList(tablet.timestamps, index, tablet.rowSize);
for (size_t i = 0; i < tablet.schemas.size(); i++) {
TSDataType::TSDataType dataType = tablet.schemas[i].second;
switch (dataType) {
case TSDataType::BOOLEAN: {
sortValuesList((bool *)(tablet.values[i]), index, tablet.rowSize);
break;
}
case TSDataType::INT32: {
sortValuesList((int *)(tablet.values[i]), index, tablet.rowSize);
break;
}
case TSDataType::DATE: {
sortValuesList((IoTDBDate *)(tablet.values[i]), index, tablet.rowSize);
break;
}
case TSDataType::TIMESTAMP:
case TSDataType::INT64: {
sortValuesList((int64_t *)(tablet.values[i]), index, tablet.rowSize);
break;
}
case TSDataType::FLOAT: {
sortValuesList((float *)(tablet.values[i]), index, tablet.rowSize);
break;
}
case TSDataType::DOUBLE: {
sortValuesList((double *)(tablet.values[i]), index, tablet.rowSize);
break;
}
case TSDataType::STRING:
case TSDataType::BLOB:
case TSDataType::OBJECT:
case TSDataType::TEXT: {
sortValuesList((string *)(tablet.values[i]), index, tablet.rowSize);
break;
}
default:
throw UnSupportedDataTypeException(
string("Data type ") + to_string(dataType) + " is not supported.");
}
}
delete[] index;
}
void Session::Impl::sortIndexByTimestamp(int *index,
std::vector<int64_t> &timestamps,
int length) {
if (length <= 1) {
return;
}
TsCompare tsCompareObj(timestamps);
std::sort(&index[0], &index[length], tsCompareObj);
}
/**
* Append value into buffer in Big Endian order to comply with IoTDB server
*/
void Session::Impl::appendValues(string &buffer, const char *value, int size) {
static bool hasCheckedEndianFlag = false;
static bool localCpuIsBigEndian = false;
if (!hasCheckedEndianFlag) {
hasCheckedEndianFlag = true;
int chk =
0x0201; // used to distinguish CPU's type (BigEndian or LittleEndian)
localCpuIsBigEndian = (0x01 != *(char *)(&chk));
}
if (localCpuIsBigEndian) {
buffer.append(value, size);
} else {
for (int i = size - 1; i >= 0; i--) {
buffer.append(value + i, 1);
}
}
}
void Session::Impl::putValuesIntoBuffer(
const vector<TSDataType::TSDataType> &types, const vector<char *> &values,
string &buf) {
int32_t date;
for (size_t i = 0; i < values.size(); i++) {
int8_t typeNum = getDataTypeNumber(types[i]);
buf.append((char *)(&typeNum), sizeof(int8_t));
switch (types[i]) {
case TSDataType::BOOLEAN:
buf.append(values[i], 1);
break;
case TSDataType::INT32:
appendValues(buf, values[i], sizeof(int32_t));
break;
case TSDataType::DATE:
date = parseDateExpressionToInt(*(IoTDBDate *)values[i]);
appendValues(buf, (char *)&date, sizeof(int32_t));
break;
case TSDataType::TIMESTAMP:
case TSDataType::INT64:
appendValues(buf, values[i], sizeof(int64_t));
break;
case TSDataType::FLOAT:
appendValues(buf, values[i], sizeof(float));
break;
case TSDataType::DOUBLE:
appendValues(buf, values[i], sizeof(double));
break;
case TSDataType::STRING:
case TSDataType::BLOB:
case TSDataType::OBJECT:
case TSDataType::TEXT: {
int32_t len = (uint32_t)strlen(values[i]);
appendValues(buf, (char *)(&len), sizeof(uint32_t));
// no need to change the byte order of string value
buf.append(values[i], len);
break;
}
default:
break;
}
}
}
int8_t Session::Impl::getDataTypeNumber(TSDataType::TSDataType type) {
switch (type) {
case TSDataType::BOOLEAN:
return 0;
case TSDataType::INT32:
return 1;
case TSDataType::INT64:
return 2;
case TSDataType::FLOAT:
return 3;
case TSDataType::DOUBLE:
return 4;
case TSDataType::TEXT:
return 5;
case TSDataType::TIMESTAMP:
return 8;
case TSDataType::DATE:
return 9;
case TSDataType::BLOB:
return 10;
case TSDataType::STRING:
return 11;
case TSDataType::OBJECT:
return 12;
default:
return -1;
}
}
string Session::Impl::getVersionString(Version::Version version) {
switch (version) {
case Version::V_0_12:
return "V_0_12";
case Version::V_0_13:
return "V_0_13";
case Version::V_1_0:
return "V_1_0";
default:
return "V_0_12";
}
}
void Session::Impl::initZoneId() {
if (!zoneId_.empty()) {
return;
}
time_t ts = 0;
struct tm tmv;
#if defined(_WIN64) || defined(WIN32) || defined(_WIN32)
localtime_s(&tmv, &ts);
#else
localtime_r(&ts, &tmv);
#endif
char zoneStr[32];
strftime(zoneStr, sizeof(zoneStr), "%z", &tmv);
zoneId_ = zoneStr;
}
void Session::Impl::initNodesSupplier(
const std::vector<std::string> &nodeUrls) {
std::vector<TEndPoint> endPoints;
std::unordered_set<std::string> uniqueEndpoints;
if (nodeUrls.empty() && host_.empty()) {
throw IoTDBException("No available nodes");
}
// Process provided node URLs
if (!nodeUrls.empty()) {
for (auto &url : nodeUrls) {
try {
TEndPoint endPoint = UrlUtils::parseTEndPointIpv4AndIpv6Url(url);
if (endPoint.port == 0)
continue; // Skip invalid endpoints
std::string endpointKey =
endPoint.ip + ":" + std::to_string(endPoint.port);
if (uniqueEndpoints.find(endpointKey) == uniqueEndpoints.end()) {
endPoints.emplace_back(std::move(endPoint));
uniqueEndpoints.insert(std::move(endpointKey));
}
} catch (...) {
continue; // Skip malformed URLs
}
}
}
// Fallback to local endpoint if no valid endpoints found
if (endPoints.empty()) {
if (host_.empty() || rpcPort_ == 0) {
throw IoTDBException("No valid endpoints available");
}
TEndPoint endPoint;
endPoint.__set_ip(host_);
endPoint.__set_port(rpcPort_);
endPoints.emplace_back(std::move(endPoint));
}
if (enableAutoFetch_) {
nodesSupplier_ = NodesSupplier::create(endPoints, username_, password_,
useSSL_, trustCertFilePath_);
} else {
nodesSupplier_ = make_shared<StaticNodesSupplier>(endPoints);
}
}
void Session::Impl::initDefaultSessionConnection() {
// Try all endpoints from supplier until a connection is established.
auto endpoints = nodesSupplier_->getEndPointList();
bool connected = false;
for (const auto &endpoint : endpoints) {
try {
host_ = endpoint.ip;
rpcPort_ = endpoint.port;
defaultEndPoint_.__set_ip(host_);
defaultEndPoint_.__set_port(rpcPort_);
defaultSessionConnection_ = std::make_shared<SessionConnection>(
this, defaultEndPoint_, zoneId_, nodesSupplier_, fetchSize_, 3, 500,
connectTimeoutMs_, sqlDialect_, database_);
connected = true;
break;
} catch (const IoTDBException &e) {
log_debug(e.what());
throw;
} catch (const std::exception &e) {
log_warn(e.what());
}
}
if (!connected) {
throw std::runtime_error(
"No available node to establish SessionConnection.");
}
}
void Session::Impl::insertStringRecordsWithLeaderCache(
vector<string> deviceIds, vector<int64_t> times,
vector<vector<string>> measurementsList, vector<vector<string>> valuesList,
bool isAligned) {
std::unordered_map<std::shared_ptr<SessionConnection>,
TSInsertStringRecordsReq>
recordsGroup;
for (int i = 0; i < deviceIds.size(); i++) {
auto connection = getSessionConnection(deviceIds[i]);
if (recordsGroup.find(connection) == recordsGroup.end()) {
TSInsertStringRecordsReq request;
std::vector<std::string> emptyPrefixPaths;
std::vector<std::vector<std::string>> emptyMeasurementsList;
vector<vector<string>> emptyValuesList;
std::vector<int64_t> emptyTimestamps;
request.__set_isAligned(isAligned);
request.__set_prefixPaths(emptyPrefixPaths);
request.__set_timestamps(emptyTimestamps);
request.__set_measurementsList(emptyMeasurementsList);
request.__set_valuesList(emptyValuesList);
recordsGroup.insert(make_pair(connection, request));
}
TSInsertStringRecordsReq &existingReq = recordsGroup[connection];
existingReq.prefixPaths.emplace_back(deviceIds[i]);
existingReq.timestamps.emplace_back(times[i]);
existingReq.measurementsList.emplace_back(measurementsList[i]);
existingReq.valuesList.emplace_back(valuesList[i]);
}
std::function<void(std::shared_ptr<SessionConnection>,
const TSInsertStringRecordsReq &)>
consumer =
[](const std::shared_ptr<SessionConnection> &c,
const TSInsertStringRecordsReq &r) { c->insertStringRecords(r); };
if (recordsGroup.size() == 1) {
insertOnce(recordsGroup, consumer);
} else {
insertByGroup(recordsGroup, consumer);
}
}
void Session::Impl::insertRecordsWithLeaderCache(
vector<string> deviceIds, vector<int64_t> times,
vector<vector<string>> measurementsList,
const vector<vector<TSDataType::TSDataType>> &typesList,
vector<vector<char *>> valuesList, bool isAligned) {
std::unordered_map<std::shared_ptr<SessionConnection>, TSInsertRecordsReq>
recordsGroup;
for (int i = 0; i < deviceIds.size(); i++) {
auto connection = getSessionConnection(deviceIds[i]);
if (recordsGroup.find(connection) == recordsGroup.end()) {
TSInsertRecordsReq request;
std::vector<std::string> emptyPrefixPaths;
std::vector<std::vector<std::string>> emptyMeasurementsList;
std::vector<std::string> emptyValuesList;
std::vector<int64_t> emptyTimestamps;
request.__set_isAligned(isAligned);
request.__set_prefixPaths(emptyPrefixPaths);
request.__set_timestamps(emptyTimestamps);
request.__set_measurementsList(emptyMeasurementsList);
request.__set_valuesList(emptyValuesList);
recordsGroup.insert(make_pair(connection, request));
}
TSInsertRecordsReq &existingReq = recordsGroup[connection];
existingReq.prefixPaths.emplace_back(deviceIds[i]);
existingReq.timestamps.emplace_back(times[i]);
existingReq.measurementsList.emplace_back(measurementsList[i]);
vector<string> bufferList;
string buffer;
putValuesIntoBuffer(typesList[i], valuesList[i], buffer);
existingReq.valuesList.emplace_back(buffer);
recordsGroup[connection] = existingReq;
}
std::function<void(std::shared_ptr<SessionConnection>,
const TSInsertRecordsReq &)>
consumer = [](const std::shared_ptr<SessionConnection> &c,
const TSInsertRecordsReq &r) { c->insertRecords(r); };
if (recordsGroup.size() == 1) {
insertOnce(recordsGroup, consumer);
} else {
insertByGroup(recordsGroup, consumer);
}
}
void Session::Impl::insertTabletsWithLeaderCache(
unordered_map<string, Tablet *> &tablets, bool sorted, bool isAligned) {
std::unordered_map<std::shared_ptr<SessionConnection>, TSInsertTabletsReq>
tabletsGroup;
if (tablets.empty()) {
throw BatchExecutionException("No tablet is inserting!");
}
for (const auto &item : tablets) {
if (isAligned != item.second->isAligned) {
throw BatchExecutionException(
"The tablets should be all aligned or non-aligned!");
}
if (!checkSorted(*(item.second))) {
sortTablet(*(item.second));
}
auto deviceId = item.first;
auto tablet = item.second;
auto connection = getSessionConnection(deviceId);
auto it = tabletsGroup.find(connection);
if (it == tabletsGroup.end()) {
TSInsertTabletsReq request;
tabletsGroup[connection] = request;
}
TSInsertTabletsReq &existingReq = tabletsGroup[connection];
existingReq.prefixPaths.emplace_back(tablet->deviceId);
existingReq.timestampsList.emplace_back(
move(SessionUtils::getTime(*tablet)));
existingReq.valuesList.emplace_back(move(SessionUtils::getValue(*tablet)));
existingReq.sizeList.emplace_back(tablet->rowSize);
vector<int> dataTypes;
vector<string> measurements;
for (pair<string, TSDataType::TSDataType> schema : tablet->schemas) {
measurements.push_back(schema.first);
dataTypes.push_back(schema.second);
}
existingReq.measurementsList.emplace_back(measurements);
existingReq.typesList.emplace_back(dataTypes);
}
std::function<void(std::shared_ptr<SessionConnection>,
const TSInsertTabletsReq &)>
consumer = [](const std::shared_ptr<SessionConnection> &c,
const TSInsertTabletsReq &r) { c->insertTablets(r); };
if (tabletsGroup.size() == 1) {
insertOnce(tabletsGroup, consumer);
} else {
insertByGroup(tabletsGroup, consumer);
}
}
void Session::open() { open(false, Impl::DEFAULT_TIMEOUT_MS); }
void Session::open(bool enableRPCCompression) {
open(enableRPCCompression, Impl::DEFAULT_TIMEOUT_MS);
}
void Session::open(bool enableRPCCompression, int connectionTimeoutInMs) {
if (!impl_->isClosed_) {
return;
}
try {
impl_->initDefaultSessionConnection();
} catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
impl_->zoneId_ = impl_->defaultSessionConnection_->zoneId;
if (impl_->enableRedirection_) {
impl_->endPointToSessionConnection.insert(
make_pair(impl_->defaultEndPoint_, impl_->defaultSessionConnection_));
}
impl_->isClosed_ = false;
}
void Session::close() {
if (impl_->isClosed_) {
return;
}
impl_->isClosed_ = true;
}
void Session::insertRecord(const string &deviceId, int64_t time,
const vector<string> &measurements,
const vector<string> &values) {
TSInsertStringRecordReq req;
req.__set_prefixPath(deviceId);
req.__set_timestamp(time);
req.__set_measurements(measurements);
req.__set_values(values);
req.__set_isAligned(false);
try {
impl_->getSessionConnection(deviceId)->insertStringRecord(req);
} catch (RedirectException &e) {
impl_->handleRedirection(deviceId, endpointToThrift(e.endPoint));
} catch (const IoTDBConnectionException &e) {
if (impl_->enableRedirection_ && impl_->deviceIdToEndpoint.find(deviceId) !=
impl_->deviceIdToEndpoint.end()) {
impl_->deviceIdToEndpoint.erase(deviceId);
try {
impl_->defaultSessionConnection_->insertStringRecord(req);
} catch (RedirectException &e) {
}
} else {
throw;
}
}
}
void Session::insertRecord(const string &deviceId, int64_t time,
const vector<string> &measurements,
const vector<TSDataType::TSDataType> &types,
const vector<char *> &values) {
TSInsertRecordReq req;
req.__set_prefixPath(deviceId);
req.__set_timestamp(time);
req.__set_measurements(measurements);
string buffer;
impl_->putValuesIntoBuffer(types, values, buffer);
req.__set_values(buffer);
req.__set_isAligned(false);
try {
impl_->getSessionConnection(deviceId)->insertRecord(req);
} catch (RedirectException &e) {
impl_->handleRedirection(deviceId, endpointToThrift(e.endPoint));
} catch (const IoTDBConnectionException &e) {
if (impl_->enableRedirection_ && impl_->deviceIdToEndpoint.find(deviceId) !=
impl_->deviceIdToEndpoint.end()) {
impl_->deviceIdToEndpoint.erase(deviceId);
try {
impl_->defaultSessionConnection_->insertRecord(req);
} catch (RedirectException &e) {
}
} else {
throw;
}
}
}
void Session::insertAlignedRecord(const string &deviceId, int64_t time,
const vector<string> &measurements,
const vector<string> &values) {
TSInsertStringRecordReq req;
req.__set_prefixPath(deviceId);
req.__set_timestamp(time);
req.__set_measurements(measurements);
req.__set_values(values);
req.__set_isAligned(true);
try {
impl_->getSessionConnection(deviceId)->insertStringRecord(req);
} catch (RedirectException &e) {
impl_->handleRedirection(deviceId, endpointToThrift(e.endPoint));
} catch (const IoTDBConnectionException &e) {
if (impl_->enableRedirection_ && impl_->deviceIdToEndpoint.find(deviceId) !=
impl_->deviceIdToEndpoint.end()) {
impl_->deviceIdToEndpoint.erase(deviceId);
try {
impl_->defaultSessionConnection_->insertStringRecord(req);
} catch (RedirectException &e) {
}
} else {
throw;
}
}
}
void Session::insertAlignedRecord(const string &deviceId, int64_t time,
const vector<string> &measurements,
const vector<TSDataType::TSDataType> &types,
const vector<char *> &values) {
TSInsertRecordReq req;
req.__set_prefixPath(deviceId);
req.__set_timestamp(time);
req.__set_measurements(measurements);
string buffer;
impl_->putValuesIntoBuffer(types, values, buffer);
req.__set_values(buffer);
req.__set_isAligned(false);
try {
impl_->getSessionConnection(deviceId)->insertRecord(req);
} catch (RedirectException &e) {
impl_->handleRedirection(deviceId, endpointToThrift(e.endPoint));
} catch (const IoTDBConnectionException &e) {
if (impl_->enableRedirection_ && impl_->deviceIdToEndpoint.find(deviceId) !=
impl_->deviceIdToEndpoint.end()) {
impl_->deviceIdToEndpoint.erase(deviceId);
try {
impl_->defaultSessionConnection_->insertRecord(req);
} catch (RedirectException &e) {
}
} else {
throw;
}
}
}
void Session::insertRecords(const vector<string> &deviceIds,
const vector<int64_t> &times,
const vector<vector<string>> &measurementsList,
const vector<vector<string>> &valuesList) {
size_t len = deviceIds.size();
if (len != times.size() || len != measurementsList.size() ||
len != valuesList.size()) {
logic_error e("deviceIds, times, measurementsList and valuesList's size "
"should be equal");
throw exception(e);
}
if (impl_->enableRedirection_) {
impl_->insertStringRecordsWithLeaderCache(
deviceIds, times, measurementsList, valuesList, false);
} else {
TSInsertStringRecordsReq request;
request.__set_prefixPaths(deviceIds);
request.__set_timestamps(times);
request.__set_measurementsList(measurementsList);
request.__set_valuesList(valuesList);
request.__set_isAligned(false);
try {
impl_->defaultSessionConnection_->insertStringRecords(request);
} catch (RedirectException &e) {
}
}
}
void Session::insertRecords(
const vector<string> &deviceIds, const vector<int64_t> &times,
const vector<vector<string>> &measurementsList,
const vector<vector<TSDataType::TSDataType>> &typesList,
const vector<vector<char *>> &valuesList) {
size_t len = deviceIds.size();
if (len != times.size() || len != measurementsList.size() ||
len != valuesList.size()) {
logic_error e("deviceIds, times, measurementsList and valuesList's size "
"should be equal");
throw exception(e);
}
if (impl_->enableRedirection_) {
impl_->insertRecordsWithLeaderCache(deviceIds, times, measurementsList,
typesList, valuesList, false);
} else {
TSInsertRecordsReq request;
request.__set_prefixPaths(deviceIds);
request.__set_timestamps(times);
request.__set_measurementsList(measurementsList);
vector<string> bufferList;
for (size_t i = 0; i < valuesList.size(); i++) {
string buffer;
impl_->putValuesIntoBuffer(typesList[i], valuesList[i], buffer);
bufferList.push_back(buffer);
}
request.__set_valuesList(bufferList);
request.__set_isAligned(false);
try {
impl_->defaultSessionConnection_->insertRecords(request);
} catch (RedirectException &e) {
}
}
}
void Session::insertAlignedRecords(
const vector<string> &deviceIds, const vector<int64_t> &times,
const vector<vector<string>> &measurementsList,
const vector<vector<string>> &valuesList) {
size_t len = deviceIds.size();
if (len != times.size() || len != measurementsList.size() ||
len != valuesList.size()) {
logic_error e("deviceIds, times, measurementsList and valuesList's size "
"should be equal");
throw exception(e);
}
if (impl_->enableRedirection_) {
impl_->insertStringRecordsWithLeaderCache(
deviceIds, times, measurementsList, valuesList, true);
} else {
TSInsertStringRecordsReq request;
request.__set_prefixPaths(deviceIds);
request.__set_timestamps(times);
request.__set_measurementsList(measurementsList);
request.__set_valuesList(valuesList);
request.__set_isAligned(true);
try {
impl_->defaultSessionConnection_->insertStringRecords(request);
} catch (RedirectException &e) {
}
}
}
void Session::insertAlignedRecords(
const vector<string> &deviceIds, const vector<int64_t> &times,
const vector<vector<string>> &measurementsList,
const vector<vector<TSDataType::TSDataType>> &typesList,
const vector<vector<char *>> &valuesList) {
size_t len = deviceIds.size();
if (len != times.size() || len != measurementsList.size() ||
len != valuesList.size()) {
logic_error e("deviceIds, times, measurementsList and valuesList's size "
"should be equal");
throw exception(e);
}
if (impl_->enableRedirection_) {
impl_->insertRecordsWithLeaderCache(deviceIds, times, measurementsList,
typesList, valuesList, true);
} else {
TSInsertRecordsReq request;
request.__set_prefixPaths(deviceIds);
request.__set_timestamps(times);
request.__set_measurementsList(measurementsList);
vector<string> bufferList;
for (size_t i = 0; i < valuesList.size(); i++) {
string buffer;
impl_->putValuesIntoBuffer(typesList[i], valuesList[i], buffer);
bufferList.push_back(buffer);
}
request.__set_valuesList(bufferList);
request.__set_isAligned(false);
try {
impl_->defaultSessionConnection_->insertRecords(request);
} catch (RedirectException &e) {
}
}
}
void Session::insertRecordsOfOneDevice(
const string &deviceId, vector<int64_t> &times,
vector<vector<string>> &measurementsList,
vector<vector<TSDataType::TSDataType>> &typesList,
vector<vector<char *>> &valuesList) {
insertRecordsOfOneDevice(deviceId, times, measurementsList, typesList,
valuesList, false);
}
void Session::insertRecordsOfOneDevice(
const string &deviceId, vector<int64_t> &times,
vector<vector<string>> &measurementsList,
vector<vector<TSDataType::TSDataType>> &typesList,
vector<vector<char *>> &valuesList, bool sorted) {
if (!impl_->checkSorted(times)) {
int *index = new int[times.size()];
for (size_t i = 0; i < times.size(); i++) {
index[i] = (int)i;
}
impl_->sortIndexByTimestamp(index, times, (int)(times.size()));
times = sortList(times, index, (int)(times.size()));
measurementsList = sortList(measurementsList, index, (int)(times.size()));
typesList = sortList(typesList, index, (int)(times.size()));
valuesList = sortList(valuesList, index, (int)(times.size()));
delete[] index;
}
TSInsertRecordsOfOneDeviceReq request;
request.__set_prefixPath(deviceId);
request.__set_timestamps(times);
request.__set_measurementsList(measurementsList);
vector<string> bufferList;
for (size_t i = 0; i < valuesList.size(); i++) {
string buffer;
impl_->putValuesIntoBuffer(typesList[i], valuesList[i], buffer);
bufferList.push_back(buffer);
}
request.__set_valuesList(bufferList);
request.__set_isAligned(false);
TSStatus respStatus;
try {
impl_->getSessionConnection(deviceId)->insertRecordsOfOneDevice(request);
} catch (RedirectException &e) {
impl_->handleRedirection(deviceId, endpointToThrift(e.endPoint));
} catch (const IoTDBConnectionException &e) {
if (impl_->enableRedirection_ && impl_->deviceIdToEndpoint.find(deviceId) !=
impl_->deviceIdToEndpoint.end()) {
impl_->deviceIdToEndpoint.erase(deviceId);
try {
impl_->defaultSessionConnection_->insertRecordsOfOneDevice(request);
} catch (RedirectException &e) {
}
} else {
throw;
}
}
}
void Session::insertAlignedRecordsOfOneDevice(
const string &deviceId, vector<int64_t> &times,
vector<vector<string>> &measurementsList,
vector<vector<TSDataType::TSDataType>> &typesList,
vector<vector<char *>> &valuesList) {
insertAlignedRecordsOfOneDevice(deviceId, times, measurementsList, typesList,
valuesList, false);
}
void Session::insertAlignedRecordsOfOneDevice(
const string &deviceId, vector<int64_t> &times,
vector<vector<string>> &measurementsList,
vector<vector<TSDataType::TSDataType>> &typesList,
vector<vector<char *>> &valuesList, bool sorted) {
if (!impl_->checkSorted(times)) {
int *index = new int[times.size()];
for (size_t i = 0; i < times.size(); i++) {
index[i] = (int)i;
}
impl_->sortIndexByTimestamp(index, times, (int)(times.size()));
times = sortList(times, index, (int)(times.size()));
measurementsList = sortList(measurementsList, index, (int)(times.size()));
typesList = sortList(typesList, index, (int)(times.size()));
valuesList = sortList(valuesList, index, (int)(times.size()));
delete[] index;
}
TSInsertRecordsOfOneDeviceReq request;
request.__set_prefixPath(deviceId);
request.__set_timestamps(times);
request.__set_measurementsList(measurementsList);
vector<string> bufferList;
for (size_t i = 0; i < valuesList.size(); i++) {
string buffer;
impl_->putValuesIntoBuffer(typesList[i], valuesList[i], buffer);
bufferList.push_back(buffer);
}
request.__set_valuesList(bufferList);
request.__set_isAligned(true);
TSStatus respStatus;
try {
impl_->getSessionConnection(deviceId)->insertRecordsOfOneDevice(request);
} catch (RedirectException &e) {
impl_->handleRedirection(deviceId, endpointToThrift(e.endPoint));
} catch (const IoTDBConnectionException &e) {
if (impl_->enableRedirection_ && impl_->deviceIdToEndpoint.find(deviceId) !=
impl_->deviceIdToEndpoint.end()) {
impl_->deviceIdToEndpoint.erase(deviceId);
try {
impl_->defaultSessionConnection_->insertRecordsOfOneDevice(request);
} catch (RedirectException &e) {
}
} else {
throw;
}
}
}
void Session::insertTablet(Tablet &tablet) {
try {
insertTablet(tablet, false);
} catch (const exception &e) {
log_debug(e.what());
logic_error error(e.what());
throw exception(error);
}
}
void Session::Impl::buildInsertTabletReq(TSInsertTabletReq &request,
Tablet &tablet, bool sorted) {
if ((!sorted) && !checkSorted(tablet)) {
sortTablet(tablet);
}
request.__set_prefixPath(tablet.deviceId);
std::vector<std::string> reqMeasurements;
reqMeasurements.reserve(tablet.schemas.size());
std::vector<int32_t> types;
types.reserve(tablet.schemas.size());
for (pair<string, TSDataType::TSDataType> schema : tablet.schemas) {
reqMeasurements.push_back(schema.first);
types.push_back(schema.second);
}
request.__set_measurements(reqMeasurements);
request.__set_types(types);
request.__set_values(SessionUtils::getValue(tablet));
request.__set_timestamps(SessionUtils::getTime(tablet));
request.__set_size(tablet.rowSize);
request.__set_isAligned(tablet.isAligned);
}
void Session::Impl::insertTablet(TSInsertTabletReq request) {
auto deviceId = request.prefixPath;
try {
getSessionConnection(deviceId)->insertTablet(request);
} catch (RedirectException &e) {
handleRedirection(deviceId, endpointToThrift(e.endPoint));
} catch (const IoTDBConnectionException &e) {
if (enableRedirection_ &&
deviceIdToEndpoint.find(deviceId) != deviceIdToEndpoint.end()) {
deviceIdToEndpoint.erase(deviceId);
try {
defaultSessionConnection_->insertTablet(request);
} catch (RedirectException &e) {
}
} else {
throw;
}
}
}
void Session::insertTablet(Tablet &tablet, bool sorted) {
TSInsertTabletReq request;
impl_->buildInsertTabletReq(request, tablet, sorted);
impl_->insertTablet(request);
}
void Session::insertAlignedTablet(Tablet &tablet) {
insertAlignedTablet(tablet, false);
}
void Session::insertAlignedTablet(Tablet &tablet, bool sorted) {
tablet.setAligned(true);
try {
insertTablet(tablet, sorted);
} catch (const exception &e) {
log_debug(e.what());
logic_error error(e.what());
throw exception(error);
}
}
void Session::insertTablets(unordered_map<string, Tablet *> &tablets) {
try {
insertTablets(tablets, false);
} catch (const exception &e) {
log_debug(e.what());
logic_error error(e.what());
throw exception(error);
}
}
void Session::insertTablets(unordered_map<string, Tablet *> &tablets,
bool sorted) {
if (tablets.empty()) {
throw BatchExecutionException("No tablet is inserting!");
}
auto beginIter = tablets.begin();
bool isAligned = ((*beginIter).second)->isAligned;
if (impl_->enableRedirection_) {
impl_->insertTabletsWithLeaderCache(tablets, sorted, isAligned);
} else {
TSInsertTabletsReq request;
for (const auto &item : tablets) {
if (isAligned != item.second->isAligned) {
throw BatchExecutionException(
"The tablets should be all aligned or non-aligned!");
}
if (!impl_->checkSorted(*(item.second))) {
impl_->sortTablet(*(item.second));
}
request.prefixPaths.push_back(item.second->deviceId);
vector<string> measurements;
vector<int> dataTypes;
for (pair<string, TSDataType::TSDataType> schema : item.second->schemas) {
measurements.push_back(schema.first);
dataTypes.push_back(schema.second);
}
request.measurementsList.push_back(measurements);
request.typesList.push_back(dataTypes);
request.timestampsList.push_back(
move(SessionUtils::getTime(*(item.second))));
request.valuesList.push_back(
move(SessionUtils::getValue(*(item.second))));
request.sizeList.push_back(item.second->rowSize);
}
request.__set_isAligned(isAligned);
try {
TSStatus respStatus;
impl_->defaultSessionConnection_->insertTablets(request);
RpcUtils::verifySuccess(respStatus);
} catch (RedirectException &e) {
}
}
}
void Session::insertAlignedTablets(unordered_map<string, Tablet *> &tablets,
bool sorted) {
for (auto iter = tablets.begin(); iter != tablets.end(); iter++) {
iter->second->setAligned(true);
}
try {
insertTablets(tablets, sorted);
} catch (const exception &e) {
log_debug(e.what());
logic_error error(e.what());
throw exception(error);
}
}
void Session::testInsertRecord(const string &deviceId, int64_t time,
const vector<string> &measurements,
const vector<string> &values) {
TSInsertStringRecordReq req;
req.__set_prefixPath(deviceId);
req.__set_timestamp(time);
req.__set_measurements(measurements);
req.__set_values(values);
TSStatus tsStatus;
try {
impl_->defaultSessionConnection_->testInsertStringRecord(req);
RpcUtils::verifySuccess(tsStatus);
} catch (const TTransportException &e) {
log_debug(e.what());
throw IoTDBConnectionException(e.what());
} catch (const IoTDBException &e) {
log_debug(e.what());
throw;
} catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
}
void Session::testInsertTablet(const Tablet &tablet) {
TSInsertTabletReq request;
request.prefixPath = tablet.deviceId;
for (pair<string, TSDataType::TSDataType> schema : tablet.schemas) {
request.measurements.push_back(schema.first);
request.types.push_back(schema.second);
}
request.__set_timestamps(move(SessionUtils::getTime(tablet)));
request.__set_values(move(SessionUtils::getValue(tablet)));
request.__set_size(tablet.rowSize);
try {
TSStatus tsStatus;
impl_->defaultSessionConnection_->testInsertTablet(request);
RpcUtils::verifySuccess(tsStatus);
} catch (const TTransportException &e) {
log_debug(e.what());
throw IoTDBConnectionException(e.what());
} catch (const IoTDBException &e) {
log_debug(e.what());
throw;
} catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
}
void Session::testInsertRecords(const vector<string> &deviceIds,
const vector<int64_t> &times,
const vector<vector<string>> &measurementsList,
const vector<vector<string>> &valuesList) {
size_t len = deviceIds.size();
if (len != times.size() || len != measurementsList.size() ||
len != valuesList.size()) {
logic_error error("deviceIds, times, measurementsList and valuesList's "
"size should be equal");
throw exception(error);
}
TSInsertStringRecordsReq request;
request.__set_prefixPaths(deviceIds);
request.__set_timestamps(times);
request.__set_measurementsList(measurementsList);
request.__set_valuesList(valuesList);
try {
TSStatus tsStatus;
impl_->defaultSessionConnection_->getSessionClient()->insertStringRecords(
tsStatus, request);
RpcUtils::verifySuccess(tsStatus);
} catch (const TTransportException &e) {
log_debug(e.what());
throw IoTDBConnectionException(e.what());
} catch (const IoTDBException &e) {
log_debug(e.what());
throw;
} catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
}
void Session::deleteTimeseries(const string &path) {
vector<string> paths;
paths.push_back(path);
deleteTimeseries(paths);
}
void Session::deleteTimeseries(const vector<string> &paths) {
impl_->defaultSessionConnection_->deleteTimeseries(paths);
}
void Session::deleteData(const string &path, int64_t endTime) {
vector<string> paths;
paths.push_back(path);
deleteData(paths, LONG_LONG_MIN, endTime);
}
void Session::deleteData(const vector<string> &paths, int64_t endTime) {
deleteData(paths, LONG_LONG_MIN, endTime);
}
void Session::deleteData(const vector<string> &paths, int64_t startTime,
int64_t endTime) {
TSDeleteDataReq req;
req.__set_paths(paths);
req.__set_startTime(startTime);
req.__set_endTime(endTime);
impl_->defaultSessionConnection_->deleteData(req);
}
void Session::setStorageGroup(const string &storageGroupId) {
impl_->defaultSessionConnection_->setStorageGroup(storageGroupId);
}
void Session::deleteStorageGroup(const string &storageGroup) {
vector<string> storageGroups;
storageGroups.push_back(storageGroup);
deleteStorageGroups(storageGroups);
}
void Session::deleteStorageGroups(const vector<string> &storageGroups) {
impl_->defaultSessionConnection_->deleteStorageGroups(storageGroups);
}
void Session::createDatabase(const string &database) {
this->setStorageGroup(database);
}
void Session::deleteDatabase(const string &database) {
this->deleteStorageGroups(vector<string>{database});
}
void Session::deleteDatabases(const vector<string> &databases) {
this->deleteStorageGroups(databases);
}
void Session::createTimeseries(const string &path,
TSDataType::TSDataType dataType,
TSEncoding::TSEncoding encoding,
CompressionType::CompressionType compressor) {
try {
createTimeseries(path, dataType, encoding, compressor, nullptr, nullptr,
nullptr, "");
} catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
}
void Session::createTimeseries(const string &path,
TSDataType::TSDataType dataType,
TSEncoding::TSEncoding encoding,
CompressionType::CompressionType compressor,
map<string, string> *props,
map<string, string> *tags,
map<string, string> *attributes,
const string &measurementAlias) {
TSCreateTimeseriesReq req;
req.__set_path(path);
req.__set_dataType(dataType);
req.__set_encoding(encoding);
req.__set_compressor(compressor);
if (props != nullptr) {
req.__set_props(*props);
}
if (tags != nullptr) {
req.__set_tags(*tags);
}
if (attributes != nullptr) {
req.__set_attributes(*attributes);
}
if (!measurementAlias.empty()) {
req.__set_measurementAlias(measurementAlias);
}
impl_->defaultSessionConnection_->createTimeseries(req);
}
void Session::createMultiTimeseries(
const vector<string> &paths,
const vector<TSDataType::TSDataType> &dataTypes,
const vector<TSEncoding::TSEncoding> &encodings,
const vector<CompressionType::CompressionType> &compressors,
vector<map<string, string>> *propsList,
vector<map<string, string>> *tagsList,
vector<map<string, string>> *attributesList,
vector<string> *measurementAliasList) {
TSCreateMultiTimeseriesReq request;
request.__set_paths(paths);
vector<int> dataTypesOrdinal;
dataTypesOrdinal.reserve(dataTypes.size());
for (TSDataType::TSDataType dataType : dataTypes) {
dataTypesOrdinal.push_back(dataType);
}
request.__set_dataTypes(dataTypesOrdinal);
vector<int> encodingsOrdinal;
encodingsOrdinal.reserve(encodings.size());
for (TSEncoding::TSEncoding encoding : encodings) {
encodingsOrdinal.push_back(encoding);
}
request.__set_encodings(encodingsOrdinal);
vector<int> compressorsOrdinal;
compressorsOrdinal.reserve(compressors.size());
for (CompressionType::CompressionType compressor : compressors) {
compressorsOrdinal.push_back(compressor);
}
request.__set_compressors(compressorsOrdinal);
if (propsList != nullptr) {
request.__set_propsList(*propsList);
}
if (tagsList != nullptr) {
request.__set_tagsList(*tagsList);
}
if (attributesList != nullptr) {
request.__set_attributesList(*attributesList);
}
if (measurementAliasList != nullptr) {
request.__set_measurementAliasList(*measurementAliasList);
}
impl_->defaultSessionConnection_->createMultiTimeseries(request);
}
void Session::createAlignedTimeseries(
const std::string &deviceId, const std::vector<std::string> &measurements,
const std::vector<TSDataType::TSDataType> &dataTypes,
const std::vector<TSEncoding::TSEncoding> &encodings,
const std::vector<CompressionType::CompressionType> &compressors) {
TSCreateAlignedTimeseriesReq request;
request.__set_prefixPath(deviceId);
request.__set_measurements(measurements);
vector<int> dataTypesOrdinal;
dataTypesOrdinal.reserve(dataTypes.size());
for (TSDataType::TSDataType dataType : dataTypes) {
dataTypesOrdinal.push_back(dataType);
}
request.__set_dataTypes(dataTypesOrdinal);
vector<int> encodingsOrdinal;
encodingsOrdinal.reserve(encodings.size());
for (TSEncoding::TSEncoding encoding : encodings) {
encodingsOrdinal.push_back(encoding);
}
request.__set_encodings(encodingsOrdinal);
vector<int> compressorsOrdinal;
compressorsOrdinal.reserve(compressors.size());
for (CompressionType::CompressionType compressor : compressors) {
compressorsOrdinal.push_back(compressor);
}
request.__set_compressors(compressorsOrdinal);
impl_->defaultSessionConnection_->createAlignedTimeseries(request);
}
bool Session::checkTimeseriesExists(const string &path) {
try {
std::unique_ptr<SessionDataSet> dataset =
executeQueryStatement("SHOW TIMESERIES " + path);
if (dataset == nullptr) {
throw IoTDBException("executeQueryStatement failed");
}
bool isExisted = dataset->hasNext();
dataset->closeOperationHandle();
return isExisted;
} catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
}
shared_ptr<SessionConnection> Session::Impl::getQuerySessionConnection() {
auto endPoint = nodesSupplier_->getQueryEndPoint();
if (!endPoint.is_initialized() || endPointToSessionConnection.empty()) {
return defaultSessionConnection_;
}
auto it = endPointToSessionConnection.find(endPoint.value());
if (it != endPointToSessionConnection.end()) {
return it->second;
}
shared_ptr<SessionConnection> newConnection;
try {
newConnection = make_shared<SessionConnection>(
this, endPoint.value(), zoneId_, nodesSupplier_, fetchSize_, 60, 500,
connectTimeoutMs_, sqlDialect_, database_);
endPointToSessionConnection.emplace(endPoint.value(), newConnection);
return newConnection;
} catch (exception &e) {
log_debug("Session::Impl::getQuerySessionConnection() exception: " +
e.what());
return newConnection;
}
}
shared_ptr<SessionConnection>
Session::Impl::getSessionConnection(std::string deviceId) {
if (!enableRedirection_ ||
deviceIdToEndpoint.find(deviceId) == deviceIdToEndpoint.end() ||
endPointToSessionConnection.find(deviceIdToEndpoint[deviceId]) ==
endPointToSessionConnection.end()) {
return defaultSessionConnection_;
}
return endPointToSessionConnection.find(deviceIdToEndpoint[deviceId])->second;
}
string Session::getTimeZone() {
auto ret = impl_->defaultSessionConnection_->getTimeZone();
return ret.timeZone;
}
void Session::setTimeZone(const string &zoneId) {
TSSetTimeZoneReq req;
req.__set_sessionId(impl_->defaultSessionConnection_->sessionId);
req.__set_timeZone(zoneId);
impl_->defaultSessionConnection_->setTimeZone(req);
}
unique_ptr<SessionDataSet> Session::executeQueryStatement(const string &sql) {
return executeQueryStatementMayRedirect(sql, QUERY_TIMEOUT_MS);
}
unique_ptr<SessionDataSet> Session::executeQueryStatement(const string &sql,
int64_t timeoutInMs) {
return executeQueryStatementMayRedirect(sql, timeoutInMs);
}
void Session::Impl::handleQueryRedirection(TEndPoint endPoint) {
if (!enableRedirection_)
return;
shared_ptr<SessionConnection> newConnection;
auto it = endPointToSessionConnection.find(endPoint);
if (it != endPointToSessionConnection.end()) {
newConnection = it->second;
} else {
try {
newConnection = make_shared<SessionConnection>(
this, endPoint, zoneId_, nodesSupplier_, fetchSize_, 60, 500,
connectTimeoutMs_, sqlDialect_, database_);
endPointToSessionConnection.emplace(endPoint, newConnection);
} catch (exception &e) {
throw IoTDBConnectionException(e.what());
}
}
defaultSessionConnection_ = newConnection;
}
void Session::Impl::handleRedirection(const std::string &deviceId,
TEndPoint endPoint) {
if (!enableRedirection_)
return;
if (endPoint.ip == "0.0.0.0")
return;
deviceIdToEndpoint[deviceId] = endPoint;
shared_ptr<SessionConnection> newConnection;
auto it = endPointToSessionConnection.find(endPoint);
if (it != endPointToSessionConnection.end()) {
newConnection = it->second;
} else {
try {
newConnection = make_shared<SessionConnection>(
this, endPoint, zoneId_, nodesSupplier_, fetchSize_, 60, 500, 1000,
sqlDialect_, database_);
endPointToSessionConnection.emplace(endPoint, newConnection);
} catch (exception &e) {
deviceIdToEndpoint.erase(deviceId);
throw IoTDBConnectionException(e.what());
}
}
}
std::unique_ptr<SessionDataSet>
Session::executeQueryStatementMayRedirect(const std::string &sql,
int64_t timeoutInMs) {
auto sessionConnection = impl_->getQuerySessionConnection();
if (!sessionConnection) {
log_warn("Session connection not found");
return nullptr;
}
try {
return sessionConnection->executeQueryStatement(sql, timeoutInMs);
} catch (RedirectException &e) {
log_warn("Session connection redirect exception: " + e.what());
impl_->handleQueryRedirection(endpointToThrift(e.endPoint));
try {
return impl_->defaultSessionConnection_->executeQueryStatement(
sql, timeoutInMs);
} catch (exception &e) {
log_error("Exception while executing redirected query statement: %s",
e.what());
throw ExecutionException(e.what());
}
} catch (exception &e) {
log_error("Exception while executing query statement: %s", e.what());
throw;
}
}
void Session::executeNonQueryStatement(const string &sql) {
try {
impl_->defaultSessionConnection_->executeNonQueryStatement(sql);
} catch (const exception &e) {
throw IoTDBException(e.what());
}
}
unique_ptr<SessionDataSet>
Session::executeRawDataQuery(const vector<string> &paths, int64_t startTime,
int64_t endTime) {
return impl_->defaultSessionConnection_->executeRawDataQuery(paths, startTime,
endTime);
}
unique_ptr<SessionDataSet>
Session::executeLastDataQuery(const vector<string> &paths) {
return executeLastDataQuery(paths, LONG_LONG_MIN);
}
unique_ptr<SessionDataSet>
Session::executeLastDataQuery(const vector<string> &paths, int64_t lastTime) {
return impl_->defaultSessionConnection_->executeLastDataQuery(paths,
lastTime);
}
void Session::createSchemaTemplate(const Template &templ) {
TSCreateSchemaTemplateReq req;
req.__set_name(templ.getName());
req.__set_serializedTemplate(templ.serialize());
impl_->defaultSessionConnection_->createSchemaTemplate(req);
}
void Session::setSchemaTemplate(const string &template_name,
const string &prefix_path) {
TSSetSchemaTemplateReq req;
req.__set_templateName(template_name);
req.__set_prefixPath(prefix_path);
impl_->defaultSessionConnection_->setSchemaTemplate(req);
}
void Session::unsetSchemaTemplate(const string &prefix_path,
const string &template_name) {
TSUnsetSchemaTemplateReq req;
req.__set_templateName(template_name);
req.__set_prefixPath(prefix_path);
impl_->defaultSessionConnection_->unsetSchemaTemplate(req);
}
void Session::addAlignedMeasurementsInTemplate(
const string &template_name, const vector<std::string> &measurements,
const vector<TSDataType::TSDataType> &dataTypes,
const vector<TSEncoding::TSEncoding> &encodings,
const vector<CompressionType::CompressionType> &compressors) {
TSAppendSchemaTemplateReq req;
req.__set_name(template_name);
req.__set_measurements(measurements);
req.__set_isAligned(true);
vector<int> dataTypesOrdinal;
dataTypesOrdinal.reserve(dataTypes.size());
for (TSDataType::TSDataType dataType : dataTypes) {
dataTypesOrdinal.push_back(dataType);
}
req.__set_dataTypes(dataTypesOrdinal);
vector<int> encodingsOrdinal;
encodingsOrdinal.reserve(encodings.size());
for (TSEncoding::TSEncoding encoding : encodings) {
encodingsOrdinal.push_back(encoding);
}
req.__set_encodings(encodingsOrdinal);
vector<int> compressorsOrdinal;
compressorsOrdinal.reserve(compressors.size());
for (CompressionType::CompressionType compressor : compressors) {
compressorsOrdinal.push_back(compressor);
}
req.__set_compressors(compressorsOrdinal);
impl_->defaultSessionConnection_->appendSchemaTemplate(req);
}
void Session::addAlignedMeasurementsInTemplate(
const string &template_name, const string &measurement,
TSDataType::TSDataType dataType, TSEncoding::TSEncoding encoding,
CompressionType::CompressionType compressor) {
vector<std::string> measurements(1, measurement);
vector<TSDataType::TSDataType> dataTypes(1, dataType);
vector<TSEncoding::TSEncoding> encodings(1, encoding);
vector<CompressionType::CompressionType> compressors(1, compressor);
addAlignedMeasurementsInTemplate(template_name, measurements, dataTypes,
encodings, compressors);
}
void Session::addUnalignedMeasurementsInTemplate(
const string &template_name, const vector<std::string> &measurements,
const vector<TSDataType::TSDataType> &dataTypes,
const vector<TSEncoding::TSEncoding> &encodings,
const vector<CompressionType::CompressionType> &compressors) {
TSAppendSchemaTemplateReq req;
req.__set_name(template_name);
req.__set_measurements(measurements);
req.__set_isAligned(false);
vector<int> dataTypesOrdinal;
dataTypesOrdinal.reserve(dataTypes.size());
for (TSDataType::TSDataType dataType : dataTypes) {
dataTypesOrdinal.push_back(dataType);
}
req.__set_dataTypes(dataTypesOrdinal);
vector<int> encodingsOrdinal;
encodingsOrdinal.reserve(encodings.size());
for (TSEncoding::TSEncoding encoding : encodings) {
encodingsOrdinal.push_back(encoding);
}
req.__set_encodings(encodingsOrdinal);
vector<int> compressorsOrdinal;
compressorsOrdinal.reserve(compressors.size());
for (CompressionType::CompressionType compressor : compressors) {
compressorsOrdinal.push_back(compressor);
}
req.__set_compressors(compressorsOrdinal);
impl_->defaultSessionConnection_->appendSchemaTemplate(req);
}
void Session::addUnalignedMeasurementsInTemplate(
const string &template_name, const string &measurement,
TSDataType::TSDataType dataType, TSEncoding::TSEncoding encoding,
CompressionType::CompressionType compressor) {
vector<std::string> measurements(1, measurement);
vector<TSDataType::TSDataType> dataTypes(1, dataType);
vector<TSEncoding::TSEncoding> encodings(1, encoding);
vector<CompressionType::CompressionType> compressors(1, compressor);
addUnalignedMeasurementsInTemplate(template_name, measurements, dataTypes,
encodings, compressors);
}
void Session::deleteNodeInTemplate(const string &template_name,
const string &path) {
TSPruneSchemaTemplateReq req;
req.__set_name(template_name);
req.__set_path(path);
impl_->defaultSessionConnection_->pruneSchemaTemplate(req);
}
int Session::countMeasurementsInTemplate(const string &template_name) {
TSQueryTemplateReq req;
req.__set_name(template_name);
req.__set_queryType(TemplateQueryType::COUNT_MEASUREMENTS);
TSQueryTemplateResp resp =
impl_->defaultSessionConnection_->querySchemaTemplate(req);
return resp.count;
}
bool Session::isMeasurementInTemplate(const string &template_name,
const string &path) {
TSQueryTemplateReq req;
req.__set_name(template_name);
req.__set_measurement(path);
req.__set_queryType(TemplateQueryType::IS_MEASUREMENT);
TSQueryTemplateResp resp =
impl_->defaultSessionConnection_->querySchemaTemplate(req);
return resp.result;
}
bool Session::isPathExistInTemplate(const string &template_name,
const string &path) {
TSQueryTemplateReq req;
req.__set_name(template_name);
req.__set_measurement(path);
req.__set_queryType(TemplateQueryType::PATH_EXIST);
TSQueryTemplateResp resp =
impl_->defaultSessionConnection_->querySchemaTemplate(req);
return resp.result;
}
std::vector<std::string>
Session::showMeasurementsInTemplate(const string &template_name) {
TSQueryTemplateReq req;
req.__set_name(template_name);
req.__set_measurement("");
req.__set_queryType(TemplateQueryType::SHOW_MEASUREMENTS);
TSQueryTemplateResp resp =
impl_->defaultSessionConnection_->querySchemaTemplate(req);
return resp.measurements;
}
std::vector<std::string>
Session::showMeasurementsInTemplate(const string &template_name,
const string &pattern) {
TSQueryTemplateReq req;
req.__set_name(template_name);
req.__set_measurement(pattern);
req.__set_queryType(TemplateQueryType::SHOW_MEASUREMENTS);
TSQueryTemplateResp resp =
impl_->defaultSessionConnection_->querySchemaTemplate(req);
return resp.measurements;
}
bool Session::checkTemplateExists(const string &template_name) {
try {
std::unique_ptr<SessionDataSet> dataset =
executeQueryStatement("SHOW NODES IN DEVICE TEMPLATE " + template_name);
if (dataset == nullptr) {
throw IoTDBException("executeQueryStatement failed");
}
bool isExisted = dataset->hasNext();
dataset->closeOperationHandle();
return isExisted;
} catch (const exception &e) {
if (strstr(e.what(), "does not exist") != NULL) {
return false;
}
log_debug(e.what());
throw IoTDBException(e.what());
}
}