blob: 154a9f6734acea4b30a1cb4e8b873382c07b267d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "Session.h"
#include <algorithm>
#include <memory>
#include <time.h>
#include <future>
#include <unordered_set>
#include "NodesSupplier.h"
#include "SessionDataSet.h"
using namespace std;
/**
* Timeout of query can be set by users.
* A negative number means using the default configuration of server.
* And value 0 will disable the function of query timeout.
*/
static const int64_t QUERY_TIMEOUT_MS = -1;
LogLevelType LOG_LEVEL = LEVEL_DEBUG;
TSDataType::TSDataType getTSDataTypeFromString(const string& str) {
// BOOLEAN, INT32, INT64, FLOAT, DOUBLE, TEXT, STRING, BLOB, TIMESTAMP, DATE, NULLTYPE
if (str == "BOOLEAN") {
return TSDataType::BOOLEAN;
} else if (str == "INT32") {
return TSDataType::INT32;
} else if (str == "INT64") {
return TSDataType::INT64;
} else if (str == "FLOAT") {
return TSDataType::FLOAT;
} else if (str == "DOUBLE") {
return TSDataType::DOUBLE;
} else if (str == "TEXT") {
return TSDataType::TEXT;
} else if (str == "TIMESTAMP") {
return TSDataType::TIMESTAMP;
} else if (str == "DATE") {
return TSDataType::DATE;
} else if (str == "BLOB") {
return TSDataType::BLOB;
} else if (str == "STRING") {
return TSDataType::STRING;
}
return TSDataType::UNKNOWN;
}
void Tablet::createColumns() {
for (size_t i = 0; i < schemas.size(); i++) {
TSDataType::TSDataType dataType = schemas[i].second;
switch (dataType) {
case TSDataType::BOOLEAN:
values[i] = new bool[maxRowNumber];
break;
case TSDataType::DATE:
values[i] = new boost::gregorian::date[maxRowNumber];
break;
case TSDataType::INT32:
values[i] = new int[maxRowNumber];
break;
case TSDataType::TIMESTAMP:
case TSDataType::INT64:
values[i] = new int64_t[maxRowNumber];
break;
case TSDataType::FLOAT:
values[i] = new float[maxRowNumber];
break;
case TSDataType::DOUBLE:
values[i] = new double[maxRowNumber];
break;
case TSDataType::STRING:
case TSDataType::BLOB:
case TSDataType::TEXT:
values[i] = new string[maxRowNumber];
break;
default:
throw UnSupportedDataTypeException(string("Data type ") + to_string(dataType) + " is not supported.");
}
}
}
void Tablet::deleteColumns() {
for (size_t i = 0; i < schemas.size(); i++) {
if (!values[i]) continue;
TSDataType::TSDataType dataType = schemas[i].second;
switch (dataType) {
case TSDataType::BOOLEAN: {
bool* valueBuf = (bool*)(values[i]);
delete[] valueBuf;
break;
}
case TSDataType::INT32: {
int* valueBuf = (int*)(values[i]);
delete[] valueBuf;
break;
}
case TSDataType::DATE: {
boost::gregorian::date* valueBuf = (boost::gregorian::date*)(values[i]);
delete[] valueBuf;
break;
}
case TSDataType::TIMESTAMP:
case TSDataType::INT64: {
int64_t* valueBuf = (int64_t*)(values[i]);
delete[] valueBuf;
break;
}
case TSDataType::FLOAT: {
float* valueBuf = (float*)(values[i]);
delete[] valueBuf;
break;
}
case TSDataType::DOUBLE: {
double* valueBuf = (double*)(values[i]);
delete[] valueBuf;
break;
}
case TSDataType::STRING:
case TSDataType::BLOB:
case TSDataType::TEXT: {
string* valueBuf = (string*)(values[i]);
delete[] valueBuf;
break;
}
default:
throw UnSupportedDataTypeException(string("Data type ") + to_string(dataType) + " is not supported.");
}
values[i] = nullptr;
}
}
void Tablet::deepCopyTabletColValue(void* const* srcPtr, void** destPtr, TSDataType::TSDataType type, int maxRowNumber) {
void *src = *srcPtr;
switch (type) {
case TSDataType::BOOLEAN:
*destPtr = new bool[maxRowNumber];
memcpy(*destPtr, src, maxRowNumber * sizeof(bool));
break;
case TSDataType::INT32:
*destPtr = new int32_t[maxRowNumber];
memcpy(*destPtr, src, maxRowNumber * sizeof(int32_t));
break;
case TSDataType::INT64:
case TSDataType::TIMESTAMP:
*destPtr = new int64_t[maxRowNumber];
memcpy(*destPtr, src, maxRowNumber * sizeof(int64_t));
break;
case TSDataType::FLOAT:
*destPtr = new float[maxRowNumber];
memcpy(*destPtr, src, maxRowNumber * sizeof(float));
break;
case TSDataType::DOUBLE:
*destPtr = new double[maxRowNumber];
memcpy(*destPtr, src, maxRowNumber * sizeof(double));
break;
case TSDataType::DATE: {
*destPtr = new boost::gregorian::date[maxRowNumber];
boost::gregorian::date* srcDate = static_cast<boost::gregorian::date*>(src);
boost::gregorian::date* destDate = static_cast<boost::gregorian::date*>(*destPtr);
for (size_t j = 0; j < maxRowNumber; ++j) {
destDate[j] = srcDate[j];
}
break;
}
case TSDataType::STRING:
case TSDataType::TEXT:
case TSDataType::BLOB: {
*destPtr = new std::string[maxRowNumber];
std::string* srcStr = static_cast<std::string*>(src);
std::string* destStr = static_cast<std::string*>(*destPtr);
for (size_t j = 0; j < maxRowNumber; ++j) {
destStr[j] = srcStr[j];
}
break;
}
default:
break;
}
}
void Tablet::reset() {
rowSize = 0;
for (size_t i = 0; i < schemas.size(); i++) {
bitMaps[i].reset();
}
}
size_t Tablet::getTimeBytesSize() {
return rowSize * 8;
}
size_t Tablet::getValueByteSize() {
size_t valueOccupation = 0;
for (size_t i = 0; i < schemas.size(); i++) {
switch (schemas[i].second) {
case TSDataType::BOOLEAN:
valueOccupation += rowSize;
break;
case TSDataType::INT32:
valueOccupation += rowSize * 4;
break;
case TSDataType::DATE:
valueOccupation += rowSize * 4;
break;
case TSDataType::TIMESTAMP:
case TSDataType::INT64:
valueOccupation += rowSize * 8;
break;
case TSDataType::FLOAT:
valueOccupation += rowSize * 4;
break;
case TSDataType::DOUBLE:
valueOccupation += rowSize * 8;
break;
case TSDataType::STRING:
case TSDataType::BLOB:
case TSDataType::TEXT: {
valueOccupation += rowSize * 4;
string* valueBuf = (string*)(values[i]);
for (size_t j = 0; j < rowSize; j++) {
valueOccupation += valueBuf[j].size();
}
break;
}
default:
throw UnSupportedDataTypeException(
string("Data type ") + to_string(schemas[i].second) + " is not supported.");
}
}
return valueOccupation;
}
void Tablet::setAligned(bool isAligned) {
this->isAligned = isAligned;
}
std::shared_ptr<storage::IDeviceID> Tablet::getDeviceID(int row) {
std::vector<std::string> id_array(idColumnIndexes.size() + 1);
size_t idArrayIdx = 0;
id_array[idArrayIdx++] = this->deviceId;
for (auto idColumnIndex : idColumnIndexes) {
void* strPtr = getValue(idColumnIndex, row, TSDataType::TEXT);
id_array[idArrayIdx++] = *static_cast<std::string*>(strPtr);
}
return std::make_shared<storage::StringArrayDeviceID>(id_array);
}
string SessionUtils::getTime(const Tablet& tablet) {
MyStringBuffer timeBuffer;
unsigned int n = 8u * tablet.rowSize;
if (n > timeBuffer.str.capacity()) {
timeBuffer.reserve(n);
}
for (size_t i = 0; i < tablet.rowSize; i++) {
timeBuffer.putInt64(tablet.timestamps[i]);
}
return timeBuffer.str;
}
string SessionUtils::getValue(const Tablet& tablet) {
MyStringBuffer valueBuffer;
unsigned int n = 8u * tablet.schemas.size() * tablet.rowSize;
if (n > valueBuffer.str.capacity()) {
valueBuffer.reserve(n);
}
for (size_t i = 0; i < tablet.schemas.size(); i++) {
TSDataType::TSDataType dataType = tablet.schemas[i].second;
const BitMap& bitMap = tablet.bitMaps[i];
switch (dataType) {
case TSDataType::BOOLEAN: {
bool* valueBuf = (bool*)(tablet.values[i]);
for (size_t index = 0; index < tablet.rowSize; index++) {
if (!bitMap.isMarked(index)) {
valueBuffer.putBool(valueBuf[index]);
}
else {
valueBuffer.putBool(false);
}
}
break;
}
case TSDataType::INT32: {
int* valueBuf = (int*)(tablet.values[i]);
for (size_t index = 0; index < tablet.rowSize; index++) {
if (!bitMap.isMarked(index)) {
valueBuffer.putInt(valueBuf[index]);
}
else {
valueBuffer.putInt((numeric_limits<int32_t>::min)());
}
}
break;
}
case TSDataType::DATE: {
boost::gregorian::date* valueBuf = (boost::gregorian::date*)(tablet.values[i]);
for (size_t index = 0; index < tablet.rowSize; index++) {
if (!bitMap.isMarked(index)) {
valueBuffer.putDate(valueBuf[index]);
}
else {
valueBuffer.putInt(EMPTY_DATE_INT);
}
}
break;
}
case TSDataType::TIMESTAMP:
case TSDataType::INT64: {
int64_t* valueBuf = (int64_t*)(tablet.values[i]);
for (size_t index = 0; index < tablet.rowSize; index++) {
if (!bitMap.isMarked(index)) {
valueBuffer.putInt64(valueBuf[index]);
}
else {
valueBuffer.putInt64((numeric_limits<int64_t>::min)());
}
}
break;
}
case TSDataType::FLOAT: {
float* valueBuf = (float*)(tablet.values[i]);
for (size_t index = 0; index < tablet.rowSize; index++) {
if (!bitMap.isMarked(index)) {
valueBuffer.putFloat(valueBuf[index]);
}
else {
valueBuffer.putFloat((numeric_limits<float>::min)());
}
}
break;
}
case TSDataType::DOUBLE: {
double* valueBuf = (double*)(tablet.values[i]);
for (size_t index = 0; index < tablet.rowSize; index++) {
if (!bitMap.isMarked(index)) {
valueBuffer.putDouble(valueBuf[index]);
}
else {
valueBuffer.putDouble((numeric_limits<double>::min)());
}
}
break;
}
case TSDataType::STRING:
case TSDataType::BLOB:
case TSDataType::TEXT: {
string* valueBuf = (string*)(tablet.values[i]);
for (size_t index = 0; index < tablet.rowSize; index++) {
if (!bitMap.isMarked(index)) {
valueBuffer.putString(valueBuf[index]);
}
else {
valueBuffer.putString("");
}
}
break;
}
default:
throw UnSupportedDataTypeException(string("Data type ") + to_string(dataType) + " is not supported.");
}
}
for (size_t i = 0; i < tablet.schemas.size(); i++) {
const BitMap& bitMap = tablet.bitMaps[i];
bool columnHasNull = !bitMap.isAllUnmarked();
valueBuffer.putChar(columnHasNull ? (char)1 : (char)0);
if (columnHasNull) {
const vector<char>& bytes = bitMap.getByteArray();
for (size_t index = 0; index < tablet.rowSize / 8 + 1; index++) {
valueBuffer.putChar(bytes[index]);
}
}
}
return valueBuffer.str;
}
bool SessionUtils::isTabletContainsSingleDevice(Tablet tablet) {
if (tablet.rowSize == 1) {
return true;
}
auto firstDeviceId = tablet.getDeviceID(0);
for (int i = 1; i < tablet.rowSize; ++i) {
if (*firstDeviceId != *tablet.getDeviceID(i)) {
return false;
}
}
return true;
}
string MeasurementNode::serialize() const {
MyStringBuffer buffer;
buffer.putString(getName());
buffer.putChar(getDataType());
buffer.putChar(getEncoding());
buffer.putChar(getCompressionType());
return buffer.str;
}
string Template::serialize() const {
MyStringBuffer buffer;
stack<pair<string, shared_ptr<TemplateNode>>> stack;
unordered_set<string> alignedPrefix;
buffer.putString(getName());
buffer.putBool(isAligned());
if (isAligned()) {
alignedPrefix.emplace("");
}
for (const auto& child : children_) {
stack.push(make_pair("", child.second));
}
while (!stack.empty()) {
auto cur = stack.top();
stack.pop();
string prefix = cur.first;
shared_ptr<TemplateNode> cur_node_ptr = cur.second;
string fullPath(prefix);
if (!cur_node_ptr->isMeasurement()) {
if (!prefix.empty()) {
fullPath.append(".");
}
fullPath.append(cur_node_ptr->getName());
if (cur_node_ptr->isAligned()) {
alignedPrefix.emplace(fullPath);
}
for (const auto& child : cur_node_ptr->getChildren()) {
stack.push(make_pair(fullPath, child.second));
}
}
else {
buffer.putString(prefix);
buffer.putBool(alignedPrefix.find(prefix) != alignedPrefix.end());
buffer.concat(cur_node_ptr->serialize());
}
}
return buffer.str;
}
/**
* When delete variable, make sure release all resource.
*/
Session::~Session() {
try {
close();
}
catch (const exception& e) {
log_debug(e.what());
}
}
void Session::removeBrokenSessionConnection(shared_ptr<SessionConnection> sessionConnection) {
if (enableRedirection_) {
this->endPointToSessionConnection.erase(sessionConnection->getEndPoint());
}
auto it1 = deviceIdToEndpoint.begin();
while (it1 != deviceIdToEndpoint.end()) {
if (it1->second == sessionConnection->getEndPoint()) {
it1 = deviceIdToEndpoint.erase(it1);
}
else {
++it1;
}
}
auto it2 = tableModelDeviceIdToEndpoint.begin();
while (it2 != tableModelDeviceIdToEndpoint.end()) {
if (it2->second == sessionConnection->getEndPoint()) {
it2 = tableModelDeviceIdToEndpoint.erase(it2);
}
else {
++it2;
}
}
}
/**
* check whether the batch has been sorted
*
* @return whether the batch has been sorted
*/
bool Session::checkSorted(const Tablet& tablet) {
for (size_t i = 1; i < tablet.rowSize; i++) {
if (tablet.timestamps[i] < tablet.timestamps[i - 1]) {
return false;
}
}
return true;
}
bool Session::checkSorted(const vector<int64_t>& times) {
for (size_t i = 1; i < times.size(); i++) {
if (times[i] < times[i - 1]) {
return false;
}
}
return true;
}
template <typename T>
std::vector<T> sortList(const std::vector<T>& valueList, const int* index, int indexLength) {
std::vector<T> sortedValues(valueList.size());
for (int i = 0; i < indexLength; i++) {
sortedValues[i] = valueList[index[i]];
}
return sortedValues;
}
template <typename T>
void sortValuesList(T* valueList, const int* index, size_t indexLength) {
T* sortedValues = new T[indexLength];
for (int i = 0; i < indexLength; i++) {
sortedValues[i] = valueList[index[i]];
}
for (int i = 0; i < indexLength; i++) {
valueList[i] = sortedValues[i];
}
delete[] sortedValues;
}
void Session::sortTablet(Tablet& tablet) {
/*
* following part of code sort the batch data by time,
* so we can insert continuous data in value list to get a better performance
*/
// sort to get index, and use index to sort value list
int* index = new int[tablet.rowSize];
for (size_t i = 0; i < tablet.rowSize; i++) {
index[i] = i;
}
sortIndexByTimestamp(index, tablet.timestamps, tablet.rowSize);
tablet.timestamps = sortList(tablet.timestamps, index, tablet.rowSize);
for (size_t i = 0; i < tablet.schemas.size(); i++) {
TSDataType::TSDataType dataType = tablet.schemas[i].second;
switch (dataType) {
case TSDataType::BOOLEAN: {
sortValuesList((bool*)(tablet.values[i]), index, tablet.rowSize);
break;
}
case TSDataType::INT32: {
sortValuesList((int*)(tablet.values[i]), index, tablet.rowSize);
break;
}
case TSDataType::DATE: {
sortValuesList((boost::gregorian::date*)(tablet.values[i]), index, tablet.rowSize);
break;
}
case TSDataType::TIMESTAMP:
case TSDataType::INT64: {
sortValuesList((int64_t*)(tablet.values[i]), index, tablet.rowSize);
break;
}
case TSDataType::FLOAT: {
sortValuesList((float*)(tablet.values[i]), index, tablet.rowSize);
break;
}
case TSDataType::DOUBLE: {
sortValuesList((double*)(tablet.values[i]), index, tablet.rowSize);
break;
}
case TSDataType::STRING:
case TSDataType::BLOB:
case TSDataType::TEXT: {
sortValuesList((string*)(tablet.values[i]), index, tablet.rowSize);
break;
}
default:
throw UnSupportedDataTypeException(string("Data type ") + to_string(dataType) + " is not supported.");
}
}
delete[] index;
}
void Session::sortIndexByTimestamp(int* index, std::vector<int64_t>& timestamps, int length) {
if (length <= 1) {
return;
}
TsCompare tsCompareObj(timestamps);
std::sort(&index[0], &index[length], tsCompareObj);
}
/**
* Append value into buffer in Big Endian order to comply with IoTDB server
*/
void Session::appendValues(string& buffer, const char* value, int size) {
static bool hasCheckedEndianFlag = false;
static bool localCpuIsBigEndian = false;
if (!hasCheckedEndianFlag) {
hasCheckedEndianFlag = true;
int chk = 0x0201; //used to distinguish CPU's type (BigEndian or LittleEndian)
localCpuIsBigEndian = (0x01 != *(char*)(&chk));
}
if (localCpuIsBigEndian) {
buffer.append(value, size);
}
else {
for (int i = size - 1; i >= 0; i--) {
buffer.append(value + i, 1);
}
}
}
void
Session::putValuesIntoBuffer(const vector<TSDataType::TSDataType>& types, const vector<char*>& values, string& buf) {
int32_t date;
for (size_t i = 0; i < values.size(); i++) {
int8_t typeNum = getDataTypeNumber(types[i]);
buf.append((char*)(&typeNum), sizeof(int8_t));
switch (types[i]) {
case TSDataType::BOOLEAN:
buf.append(values[i], 1);
break;
case TSDataType::INT32:
appendValues(buf, values[i], sizeof(int32_t));
break;
case TSDataType::DATE:
date = parseDateExpressionToInt(*(boost::gregorian::date*)values[i]);
appendValues(buf, (char*)&date, sizeof(int32_t));
break;
case TSDataType::TIMESTAMP:
case TSDataType::INT64:
appendValues(buf, values[i], sizeof(int64_t));
break;
case TSDataType::FLOAT:
appendValues(buf, values[i], sizeof(float));
break;
case TSDataType::DOUBLE:
appendValues(buf, values[i], sizeof(double));
break;
case TSDataType::STRING:
case TSDataType::BLOB:
case TSDataType::TEXT: {
int32_t len = (uint32_t)strlen(values[i]);
appendValues(buf, (char*)(&len), sizeof(uint32_t));
// no need to change the byte order of string value
buf.append(values[i], len);
break;
}
default:
break;
}
}
}
int8_t Session::getDataTypeNumber(TSDataType::TSDataType type) {
switch (type) {
case TSDataType::BOOLEAN:
return 0;
case TSDataType::INT32:
return 1;
case TSDataType::INT64:
return 2;
case TSDataType::FLOAT:
return 3;
case TSDataType::DOUBLE:
return 4;
case TSDataType::TEXT:
return 5;
case TSDataType::TIMESTAMP:
return 8;
case TSDataType::DATE:
return 9;
case TSDataType::BLOB:
return 10;
case TSDataType::STRING:
return 11;
default:
return -1;
}
}
string Session::getVersionString(Version::Version version) {
switch (version) {
case Version::V_0_12:
return "V_0_12";
case Version::V_0_13:
return "V_0_13";
case Version::V_1_0:
return "V_1_0";
default:
return "V_0_12";
}
}
void Session::initZoneId() {
if (!zoneId_.empty()) {
return;
}
time_t ts = 0;
struct tm tmv;
#if defined(_WIN64) || defined (WIN32) || defined (_WIN32)
localtime_s(&tmv, &ts);
#else
localtime_r(&ts, &tmv);
#endif
char zoneStr[32];
strftime(zoneStr, sizeof(zoneStr), "%z", &tmv);
zoneId_ = zoneStr;
}
void Session::initNodesSupplier() {
std::vector<TEndPoint> endPoints;
TEndPoint endPoint;
endPoint.__set_ip(host_);
endPoint.__set_port(rpcPort_);
endPoints.emplace_back(endPoint);
if (enableAutoFetch_) {
nodesSupplier_ = NodesSupplier::create(endPoints, username_, password_);
}
else {
nodesSupplier_ = make_shared<StaticNodesSupplier>(endPoints);
}
}
void Session::initDefaultSessionConnection() {
defaultEndPoint_.__set_ip(host_);
defaultEndPoint_.__set_port(rpcPort_);
defaultSessionConnection_ = make_shared<SessionConnection>(this, defaultEndPoint_, zoneId_, nodesSupplier_, fetchSize_,
60, 500,
sqlDialect_, database_);
}
void Session::insertStringRecordsWithLeaderCache(vector<string> deviceIds, vector<int64_t> times,
vector<vector<string>> measurementsList,
vector<vector<string>> valuesList, bool isAligned) {
std::unordered_map<std::shared_ptr<SessionConnection>, TSInsertStringRecordsReq> recordsGroup;
for (int i = 0; i < deviceIds.size(); i++) {
auto connection = getSessionConnection(deviceIds[i]);
if (recordsGroup.find(connection) == recordsGroup.end()) {
TSInsertStringRecordsReq request;
std::vector<std::string> emptyPrefixPaths;
std::vector<std::vector<std::string>> emptyMeasurementsList;
vector<vector<string>> emptyValuesList;
std::vector<int64_t> emptyTimestamps;
request.__set_isAligned(isAligned);
request.__set_prefixPaths(emptyPrefixPaths);
request.__set_timestamps(emptyTimestamps);
request.__set_measurementsList(emptyMeasurementsList);
request.__set_valuesList(emptyValuesList);
recordsGroup.insert(make_pair(connection, request));
}
TSInsertStringRecordsReq& existingReq = recordsGroup[connection];
existingReq.prefixPaths.emplace_back(deviceIds[i]);
existingReq.timestamps.emplace_back(times[i]);
existingReq.measurementsList.emplace_back(measurementsList[i]);
existingReq.valuesList.emplace_back(valuesList[i]);
}
std::function<void(std::shared_ptr<SessionConnection>, const TSInsertStringRecordsReq&)> consumer =
[](const std::shared_ptr<SessionConnection>& c, const TSInsertStringRecordsReq& r) {
c->insertStringRecords(r);
};
if (recordsGroup.size() == 1) {
insertOnce(recordsGroup, consumer);
}
else {
insertByGroup(recordsGroup, consumer);
}
}
void Session::insertRecordsWithLeaderCache(vector<string> deviceIds, vector<int64_t> times,
vector<vector<string>> measurementsList,
const vector<vector<TSDataType::TSDataType>>& typesList,
vector<vector<char*>> valuesList, bool isAligned) {
std::unordered_map<std::shared_ptr<SessionConnection>, TSInsertRecordsReq> recordsGroup;
for (int i = 0; i < deviceIds.size(); i++) {
auto connection = getSessionConnection(deviceIds[i]);
if (recordsGroup.find(connection) == recordsGroup.end()) {
TSInsertRecordsReq request;
std::vector<std::string> emptyPrefixPaths;
std::vector<std::vector<std::string>> emptyMeasurementsList;
std::vector<std::string> emptyValuesList;
std::vector<int64_t> emptyTimestamps;
request.__set_isAligned(isAligned);
request.__set_prefixPaths(emptyPrefixPaths);
request.__set_timestamps(emptyTimestamps);
request.__set_measurementsList(emptyMeasurementsList);
request.__set_valuesList(emptyValuesList);
recordsGroup.insert(make_pair(connection, request));
}
TSInsertRecordsReq& existingReq = recordsGroup[connection];
existingReq.prefixPaths.emplace_back(deviceIds[i]);
existingReq.timestamps.emplace_back(times[i]);
existingReq.measurementsList.emplace_back(measurementsList[i]);
vector<string> bufferList;
string buffer;
putValuesIntoBuffer(typesList[i], valuesList[i], buffer);
existingReq.valuesList.emplace_back(buffer);
recordsGroup[connection] = existingReq;
}
std::function<void(std::shared_ptr<SessionConnection>, const TSInsertRecordsReq&)> consumer =
[](const std::shared_ptr<SessionConnection>& c, const TSInsertRecordsReq& r) {
c->insertRecords(r);
};
if (recordsGroup.size() == 1) {
insertOnce(recordsGroup, consumer);
}
else {
insertByGroup(recordsGroup, consumer);
}
}
void Session::insertTabletsWithLeaderCache(unordered_map<string, Tablet*> tablets, bool sorted, bool isAligned) {
std::unordered_map<std::shared_ptr<SessionConnection>, TSInsertTabletsReq> tabletsGroup;
if (tablets.empty()) {
throw BatchExecutionException("No tablet is inserting!");
}
for (const auto& item : tablets) {
if (isAligned != item.second->isAligned) {
throw BatchExecutionException("The tablets should be all aligned or non-aligned!");
}
if (!checkSorted(*(item.second))) {
sortTablet(*(item.second));
}
auto deviceId = item.first;
auto tablet = item.second;
auto connection = getSessionConnection(deviceId);
auto it = tabletsGroup.find(connection);
if (it == tabletsGroup.end()) {
TSInsertTabletsReq request;
tabletsGroup[connection] = request;
}
TSInsertTabletsReq& existingReq = tabletsGroup[connection];
existingReq.prefixPaths.emplace_back(tablet->deviceId);
existingReq.timestampsList.emplace_back(move(SessionUtils::getTime(*tablet)));
existingReq.valuesList.emplace_back(move(SessionUtils::getValue(*tablet)));
existingReq.sizeList.emplace_back(tablet->rowSize);
vector<int> dataTypes;
vector<string> measurements;
for (pair<string, TSDataType::TSDataType> schema : tablet->schemas) {
measurements.push_back(schema.first);
dataTypes.push_back(schema.second);
}
existingReq.measurementsList.emplace_back(measurements);
existingReq.typesList.emplace_back(dataTypes);
}
std::function<void(std::shared_ptr<SessionConnection>, const TSInsertTabletsReq&)> consumer =
[](const std::shared_ptr<SessionConnection>& c, const TSInsertTabletsReq& r) {
c->insertTablets(r);
};
if (tabletsGroup.size() == 1) {
insertOnce(tabletsGroup, consumer);
}
else {
insertByGroup(tabletsGroup, consumer);
}
}
void Session::open() {
open(false, DEFAULT_TIMEOUT_MS);
}
void Session::open(bool enableRPCCompression) {
open(enableRPCCompression, DEFAULT_TIMEOUT_MS);
}
void Session::open(bool enableRPCCompression, int connectionTimeoutInMs) {
if (!isClosed_) {
return;
}
try {
initDefaultSessionConnection();
}
catch (const exception& e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
zoneId_ = defaultSessionConnection_->zoneId;
if (enableRedirection_) {
endPointToSessionConnection.insert(make_pair(defaultEndPoint_, defaultSessionConnection_));
}
isClosed_ = false;
}
void Session::close() {
if (isClosed_) {
return;
}
isClosed_ = true;
}
void Session::insertRecord(const string& deviceId, int64_t time,
const vector<string>& measurements,
const vector<string>& values) {
TSInsertStringRecordReq req;
req.__set_prefixPath(deviceId);
req.__set_timestamp(time);
req.__set_measurements(measurements);
req.__set_values(values);
req.__set_isAligned(false);
try {
getSessionConnection(deviceId)->insertStringRecord(req);
}
catch (RedirectException& e) {
handleRedirection(deviceId, e.endPoint);
} catch (const IoTDBConnectionException& e) {
if (enableRedirection_ && deviceIdToEndpoint.find(deviceId) != deviceIdToEndpoint.end()) {
deviceIdToEndpoint.erase(deviceId);
try {
defaultSessionConnection_->insertStringRecord(req);
}
catch (RedirectException& e) {
}
}
else {
throw e;
}
}
}
void Session::insertRecord(const string& deviceId, int64_t time,
const vector<string>& measurements,
const vector<TSDataType::TSDataType>& types,
const vector<char*>& values) {
TSInsertRecordReq req;
req.__set_prefixPath(deviceId);
req.__set_timestamp(time);
req.__set_measurements(measurements);
string buffer;
putValuesIntoBuffer(types, values, buffer);
req.__set_values(buffer);
req.__set_isAligned(false);
try {
getSessionConnection(deviceId)->insertRecord(req);
}
catch (RedirectException& e) {
handleRedirection(deviceId, e.endPoint);
} catch (const IoTDBConnectionException& e) {
if (enableRedirection_ && deviceIdToEndpoint.find(deviceId) != deviceIdToEndpoint.end()) {
deviceIdToEndpoint.erase(deviceId);
try {
defaultSessionConnection_->insertRecord(req);
}
catch (RedirectException& e) {
}
}
else {
throw e;
}
}
}
void Session::insertAlignedRecord(const string& deviceId, int64_t time,
const vector<string>& measurements,
const vector<string>& values) {
TSInsertStringRecordReq req;
req.__set_prefixPath(deviceId);
req.__set_timestamp(time);
req.__set_measurements(measurements);
req.__set_values(values);
req.__set_isAligned(true);
try {
getSessionConnection(deviceId)->insertStringRecord(req);
}
catch (RedirectException& e) {
handleRedirection(deviceId, e.endPoint);
} catch (const IoTDBConnectionException& e) {
if (enableRedirection_ && deviceIdToEndpoint.find(deviceId) != deviceIdToEndpoint.end()) {
deviceIdToEndpoint.erase(deviceId);
try {
defaultSessionConnection_->insertStringRecord(req);
}
catch (RedirectException& e) {
}
}
else {
throw e;
}
}
}
void Session::insertAlignedRecord(const string& deviceId, int64_t time,
const vector<string>& measurements,
const vector<TSDataType::TSDataType>& types,
const vector<char*>& values) {
TSInsertRecordReq req;
req.__set_prefixPath(deviceId);
req.__set_timestamp(time);
req.__set_measurements(measurements);
string buffer;
putValuesIntoBuffer(types, values, buffer);
req.__set_values(buffer);
req.__set_isAligned(false);
try {
getSessionConnection(deviceId)->insertRecord(req);
}
catch (RedirectException& e) {
handleRedirection(deviceId, e.endPoint);
} catch (const IoTDBConnectionException& e) {
if (enableRedirection_ && deviceIdToEndpoint.find(deviceId) != deviceIdToEndpoint.end()) {
deviceIdToEndpoint.erase(deviceId);
try {
defaultSessionConnection_->insertRecord(req);
}
catch (RedirectException& e) {
}
}
else {
throw e;
}
}
}
void Session::insertRecords(const vector<string>& deviceIds,
const vector<int64_t>& times,
const vector<vector<string>>& measurementsList,
const vector<vector<string>>& valuesList) {
size_t len = deviceIds.size();
if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) {
logic_error e("deviceIds, times, measurementsList and valuesList's size should be equal");
throw exception(e);
}
if (enableRedirection_) {
insertStringRecordsWithLeaderCache(deviceIds, times, measurementsList, valuesList, false);
}
else {
TSInsertStringRecordsReq request;
request.__set_prefixPaths(deviceIds);
request.__set_timestamps(times);
request.__set_measurementsList(measurementsList);
request.__set_valuesList(valuesList);
request.__set_isAligned(false);
try {
defaultSessionConnection_->insertStringRecords(request);
}
catch (RedirectException& e) {
}
}
}
void Session::insertRecords(const vector<string>& deviceIds,
const vector<int64_t>& times,
const vector<vector<string>>& measurementsList,
const vector<vector<TSDataType::TSDataType>>& typesList,
const vector<vector<char*>>& valuesList) {
size_t len = deviceIds.size();
if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) {
logic_error e("deviceIds, times, measurementsList and valuesList's size should be equal");
throw exception(e);
}
if (enableRedirection_) {
insertRecordsWithLeaderCache(deviceIds, times, measurementsList, typesList, valuesList, false);
}
else {
TSInsertRecordsReq request;
request.__set_prefixPaths(deviceIds);
request.__set_timestamps(times);
request.__set_measurementsList(measurementsList);
vector<string> bufferList;
for (size_t i = 0; i < valuesList.size(); i++) {
string buffer;
putValuesIntoBuffer(typesList[i], valuesList[i], buffer);
bufferList.push_back(buffer);
}
request.__set_valuesList(bufferList);
request.__set_isAligned(false);
try {
defaultSessionConnection_->insertRecords(request);
}
catch (RedirectException& e) {
}
}
}
void Session::insertAlignedRecords(const vector<string>& deviceIds,
const vector<int64_t>& times,
const vector<vector<string>>& measurementsList,
const vector<vector<string>>& valuesList) {
size_t len = deviceIds.size();
if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) {
logic_error e("deviceIds, times, measurementsList and valuesList's size should be equal");
throw exception(e);
}
if (enableRedirection_) {
insertStringRecordsWithLeaderCache(deviceIds, times, measurementsList, valuesList, true);
}
else {
TSInsertStringRecordsReq request;
request.__set_prefixPaths(deviceIds);
request.__set_timestamps(times);
request.__set_measurementsList(measurementsList);
request.__set_valuesList(valuesList);
request.__set_isAligned(true);
try {
defaultSessionConnection_->insertStringRecords(request);
}
catch (RedirectException& e) {
}
}
}
void Session::insertAlignedRecords(const vector<string>& deviceIds,
const vector<int64_t>& times,
const vector<vector<string>>& measurementsList,
const vector<vector<TSDataType::TSDataType>>& typesList,
const vector<vector<char*>>& valuesList) {
size_t len = deviceIds.size();
if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) {
logic_error e("deviceIds, times, measurementsList and valuesList's size should be equal");
throw exception(e);
}
if (enableRedirection_) {
insertRecordsWithLeaderCache(deviceIds, times, measurementsList, typesList, valuesList, true);
}
else {
TSInsertRecordsReq request;
request.__set_prefixPaths(deviceIds);
request.__set_timestamps(times);
request.__set_measurementsList(measurementsList);
vector<string> bufferList;
for (size_t i = 0; i < valuesList.size(); i++) {
string buffer;
putValuesIntoBuffer(typesList[i], valuesList[i], buffer);
bufferList.push_back(buffer);
}
request.__set_valuesList(bufferList);
request.__set_isAligned(false);
try {
defaultSessionConnection_->insertRecords(request);
}
catch (RedirectException& e) {
}
}
}
void Session::insertRecordsOfOneDevice(const string& deviceId,
vector<int64_t>& times,
vector<vector<string>>& measurementsList,
vector<vector<TSDataType::TSDataType>>& typesList,
vector<vector<char*>>& valuesList) {
insertRecordsOfOneDevice(deviceId, times, measurementsList, typesList, valuesList, false);
}
void Session::insertRecordsOfOneDevice(const string& deviceId,
vector<int64_t>& times,
vector<vector<string>>& measurementsList,
vector<vector<TSDataType::TSDataType>>& typesList,
vector<vector<char*>>& valuesList,
bool sorted) {
if (!checkSorted(times)) {
int* index = new int[times.size()];
for (size_t i = 0; i < times.size(); i++) {
index[i] = (int)i;
}
sortIndexByTimestamp(index, times, (int)(times.size()));
times = sortList(times, index, (int)(times.size()));
measurementsList = sortList(measurementsList, index, (int)(times.size()));
typesList = sortList(typesList, index, (int)(times.size()));
valuesList = sortList(valuesList, index, (int)(times.size()));
delete[] index;
}
TSInsertRecordsOfOneDeviceReq request;
request.__set_prefixPath(deviceId);
request.__set_timestamps(times);
request.__set_measurementsList(measurementsList);
vector<string> bufferList;
for (size_t i = 0; i < valuesList.size(); i++) {
string buffer;
putValuesIntoBuffer(typesList[i], valuesList[i], buffer);
bufferList.push_back(buffer);
}
request.__set_valuesList(bufferList);
request.__set_isAligned(false);
TSStatus respStatus;
try {
getSessionConnection(deviceId)->insertRecordsOfOneDevice(request);
}
catch (RedirectException& e) {
handleRedirection(deviceId, e.endPoint);
} catch (const IoTDBConnectionException& e) {
if (enableRedirection_ && deviceIdToEndpoint.find(deviceId) != deviceIdToEndpoint.end()) {
deviceIdToEndpoint.erase(deviceId);
try {
defaultSessionConnection_->insertRecordsOfOneDevice(request);
}
catch (RedirectException& e) {
}
}
else {
throw e;
}
}
}
void Session::insertAlignedRecordsOfOneDevice(const string& deviceId,
vector<int64_t>& times,
vector<vector<string>>& measurementsList,
vector<vector<TSDataType::TSDataType>>& typesList,
vector<vector<char*>>& valuesList) {
insertAlignedRecordsOfOneDevice(deviceId, times, measurementsList, typesList, valuesList, false);
}
void Session::insertAlignedRecordsOfOneDevice(const string& deviceId,
vector<int64_t>& times,
vector<vector<string>>& measurementsList,
vector<vector<TSDataType::TSDataType>>& typesList,
vector<vector<char*>>& valuesList,
bool sorted) {
if (!checkSorted(times)) {
int* index = new int[times.size()];
for (size_t i = 0; i < times.size(); i++) {
index[i] = (int)i;
}
sortIndexByTimestamp(index, times, (int)(times.size()));
times = sortList(times, index, (int)(times.size()));
measurementsList = sortList(measurementsList, index, (int)(times.size()));
typesList = sortList(typesList, index, (int)(times.size()));
valuesList = sortList(valuesList, index, (int)(times.size()));
delete[] index;
}
TSInsertRecordsOfOneDeviceReq request;
request.__set_prefixPath(deviceId);
request.__set_timestamps(times);
request.__set_measurementsList(measurementsList);
vector<string> bufferList;
for (size_t i = 0; i < valuesList.size(); i++) {
string buffer;
putValuesIntoBuffer(typesList[i], valuesList[i], buffer);
bufferList.push_back(buffer);
}
request.__set_valuesList(bufferList);
request.__set_isAligned(true);
TSStatus respStatus;
try {
getSessionConnection(deviceId)->insertRecordsOfOneDevice(request);
}
catch (RedirectException& e) {
handleRedirection(deviceId, e.endPoint);
} catch (const IoTDBConnectionException& e) {
if (enableRedirection_ && deviceIdToEndpoint.find(deviceId) != deviceIdToEndpoint.end()) {
deviceIdToEndpoint.erase(deviceId);
try {
defaultSessionConnection_->insertRecordsOfOneDevice(request);
}
catch (RedirectException& e) {
}
}
else {
throw e;
}
}
}
void Session::insertTablet(Tablet& tablet) {
try {
insertTablet(tablet, false);
}
catch (const exception& e) {
log_debug(e.what());
logic_error error(e.what());
throw exception(error);
}
}
void Session::buildInsertTabletReq(TSInsertTabletReq& request, Tablet& tablet, bool sorted) {
if ((!sorted) && !checkSorted(tablet)) {
sortTablet(tablet);
}
request.prefixPath = tablet.deviceId;
request.measurements.reserve(tablet.schemas.size());
request.types.reserve(tablet.schemas.size());
for (pair<string, TSDataType::TSDataType> schema : tablet.schemas) {
request.measurements.push_back(schema.first);
request.types.push_back(schema.second);
}
request.values = move(SessionUtils::getValue(tablet));
request.timestamps = move(SessionUtils::getTime(tablet));
request.__set_size(tablet.rowSize);
request.__set_isAligned(tablet.isAligned);
}
void Session::insertTablet(TSInsertTabletReq request) {
auto deviceId = request.prefixPath;
try {
getSessionConnection(deviceId)->insertTablet(request);
}
catch (RedirectException& e) {
handleRedirection(deviceId, e.endPoint);
} catch (const IoTDBConnectionException& e) {
if (enableRedirection_ && deviceIdToEndpoint.find(deviceId) != deviceIdToEndpoint.end()) {
deviceIdToEndpoint.erase(deviceId);
try {
defaultSessionConnection_->insertTablet(request);
}
catch (RedirectException& e) {
}
}
else {
throw e;
}
}
}
void Session::insertTablet(Tablet& tablet, bool sorted) {
TSInsertTabletReq request;
buildInsertTabletReq(request, tablet, sorted);
insertTablet(request);
}
void Session::insertRelationalTablet(Tablet& tablet, bool sorted) {
std::unordered_map<std::shared_ptr<SessionConnection>, Tablet> relationalTabletGroup;
if (tableModelDeviceIdToEndpoint.empty()) {
relationalTabletGroup.insert(make_pair(defaultSessionConnection_, tablet));
}
else if (SessionUtils::isTabletContainsSingleDevice(tablet)) {
relationalTabletGroup.insert(make_pair(getSessionConnection(tablet.getDeviceID(0)), tablet));
}
else {
for (int row = 0; row < tablet.rowSize; row++) {
auto iDeviceID = tablet.getDeviceID(row);
std::shared_ptr<SessionConnection> connection = getSessionConnection(iDeviceID);
auto it = relationalTabletGroup.find(connection);
if (it == relationalTabletGroup.end()) {
Tablet newTablet(tablet.deviceId, tablet.schemas, tablet.columnTypes, tablet.rowSize);
it = relationalTabletGroup.insert(std::make_pair(connection, newTablet)).first;
}
Tablet& currentTablet = it->second;
int rowIndex = currentTablet.rowSize++;
currentTablet.timestamps[rowIndex] = tablet.timestamps[row];
for (int col = 0; col < tablet.schemas.size(); col++) {
switch (tablet.schemas[col].second) {
case TSDataType::BOOLEAN:
currentTablet.addValue(tablet.schemas[col].first, rowIndex,
*(bool*)tablet.getValue(col, row, tablet.schemas[col].second));
break;
case TSDataType::INT32:
currentTablet.addValue(tablet.schemas[col].first, rowIndex,
*(int32_t*)tablet.getValue(col, row, tablet.schemas[col].second));
break;
case TSDataType::INT64:
case TSDataType::TIMESTAMP:
currentTablet.addValue(tablet.schemas[col].first, rowIndex,
*(int64_t*)tablet.getValue(col, row, tablet.schemas[col].second));
break;
case TSDataType::FLOAT:
currentTablet.addValue(tablet.schemas[col].first, rowIndex,
*(float*)tablet.getValue(col, row, tablet.schemas[col].second));
break;
case TSDataType::DOUBLE:
currentTablet.addValue(tablet.schemas[col].first, rowIndex,
*(double*)tablet.getValue(col, row, tablet.schemas[col].second));
break;
case TSDataType::DATE: {
currentTablet.addValue(tablet.schemas[col].first, rowIndex,
*(boost::gregorian::date*)tablet.getValue(col, row, tablet.schemas[col].second));
break;
}
case TSDataType::STRING:
case TSDataType::TEXT:
case TSDataType::BLOB: {
currentTablet.addValue(tablet.schemas[col].first, rowIndex,
*(string*)tablet.getValue(col, row, tablet.schemas[col].second));
break;
}
default:
break;
}
}
}
}
if (relationalTabletGroup.size() == 1) {
insertRelationalTabletOnce(relationalTabletGroup, sorted);
}
else {
insertRelationalTabletByGroup(relationalTabletGroup, sorted);
}
}
void Session::insertRelationalTablet(Tablet& tablet) {
insertRelationalTablet(tablet, false);
}
void Session::insertRelationalTabletOnce(const std::unordered_map<std::shared_ptr<SessionConnection>, Tablet>&
relationalTabletGroup, bool sorted) {
auto iter = relationalTabletGroup.begin();
auto connection = iter->first;
auto tablet = iter->second;
TSInsertTabletReq request;
buildInsertTabletReq(request, tablet, sorted);
request.__set_writeToTable(true);
std::vector<int8_t> columnCategories;
for (auto& category : tablet.columnTypes) {
columnCategories.push_back(static_cast<int8_t>(category));
}
request.__set_columnCategories(columnCategories);
try {
TSStatus respStatus;
connection->getSessionClient()->insertTablet(respStatus, request);
RpcUtils::verifySuccess(respStatus);
}
catch (RedirectException& e) {
auto endPointList = e.endPointList;
for (int i = 0; i < endPointList.size(); i++) {
auto deviceID = tablet.getDeviceID(i);
handleRedirection(deviceID, endPointList[i]);
}
} catch (const IoTDBConnectionException& e) {
if (endPointToSessionConnection.size() > 1) {
removeBrokenSessionConnection(connection);
try {
TSStatus respStatus;
defaultSessionConnection_->getSessionClient()->insertTablet(respStatus, request);
RpcUtils::verifySuccess(respStatus);
}
catch (RedirectException& e) {
}
}
else {
throw IoTDBConnectionException(e.what());
}
} catch (const TTransportException& e) {
log_debug(e.what());
throw IoTDBConnectionException(e.what());
} catch (const IoTDBException& e) {
log_debug(e.what());
throw;
} catch (const exception& e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
}
void Session::insertRelationalTabletByGroup(const std::unordered_map<std::shared_ptr<SessionConnection>, Tablet>&
relationalTabletGroup, bool sorted) {
// Create a vector to store future objects for asynchronous operations
std::vector<std::future<void>> futures;
for (auto iter = relationalTabletGroup.begin(); iter != relationalTabletGroup.end(); iter++) {
auto connection = iter->first;
auto tablet = iter->second;
// Launch asynchronous task for each tablet insertion
futures.emplace_back(std::async(std::launch::async, [=]() mutable {
TSInsertTabletReq request;
buildInsertTabletReq(request, tablet, sorted);
request.__set_writeToTable(true);
std::vector<int8_t> columnCategories;
for (auto& category : tablet.columnTypes) {
columnCategories.push_back(static_cast<int8_t>(category));
}
request.__set_columnCategories(columnCategories);
try {
TSStatus respStatus;
connection->getSessionClient()->insertTablet(respStatus, request);
RpcUtils::verifySuccess(respStatus);
}
catch (const TTransportException& e) {
log_debug(e.what());
throw IoTDBConnectionException(e.what());
} catch (const IoTDBException& e) {
log_debug(e.what());
throw;
} catch (const exception& e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
}));
}
for (auto& f : futures) {
f.get();
}
}
void Session::insertAlignedTablet(Tablet& tablet) {
insertAlignedTablet(tablet, false);
}
void Session::insertAlignedTablet(Tablet& tablet, bool sorted) {
tablet.setAligned(true);
try {
insertTablet(tablet, sorted);
}
catch (const exception& e) {
log_debug(e.what());
logic_error error(e.what());
throw exception(error);
}
}
void Session::insertTablets(unordered_map<string, Tablet*>& tablets) {
try {
insertTablets(tablets, false);
}
catch (const exception& e) {
log_debug(e.what());
logic_error error(e.what());
throw exception(error);
}
}
void Session::insertTablets(unordered_map<string, Tablet*>& tablets, bool sorted) {
if (tablets.empty()) {
throw BatchExecutionException("No tablet is inserting!");
}
auto beginIter = tablets.begin();
bool isAligned = ((*beginIter).second)->isAligned;
if (enableRedirection_) {
insertTabletsWithLeaderCache(tablets, sorted, isAligned);
}
else {
TSInsertTabletsReq request;
for (const auto& item : tablets) {
if (isAligned != item.second->isAligned) {
throw BatchExecutionException("The tablets should be all aligned or non-aligned!");
}
if (!checkSorted(*(item.second))) {
sortTablet(*(item.second));
}
request.prefixPaths.push_back(item.second->deviceId);
vector<string> measurements;
vector<int> dataTypes;
for (pair<string, TSDataType::TSDataType> schema : item.second->schemas) {
measurements.push_back(schema.first);
dataTypes.push_back(schema.second);
}
request.measurementsList.push_back(measurements);
request.typesList.push_back(dataTypes);
request.timestampsList.push_back(move(SessionUtils::getTime(*(item.second))));
request.valuesList.push_back(move(SessionUtils::getValue(*(item.second))));
request.sizeList.push_back(item.second->rowSize);
}
request.__set_isAligned(isAligned);
try {
TSStatus respStatus;
defaultSessionConnection_->insertTablets(request);
RpcUtils::verifySuccess(respStatus);
}
catch (RedirectException& e) {
}
}
}
void Session::insertAlignedTablets(unordered_map<string, Tablet*>& tablets, bool sorted) {
for (auto iter = tablets.begin(); iter != tablets.end(); iter++) {
iter->second->setAligned(true);
}
try {
insertTablets(tablets, sorted);
}
catch (const exception& e) {
log_debug(e.what());
logic_error error(e.what());
throw exception(error);
}
}
void Session::testInsertRecord(const string& deviceId, int64_t time, const vector<string>& measurements,
const vector<string>& values) {
TSInsertStringRecordReq req;
req.__set_prefixPath(deviceId);
req.__set_timestamp(time);
req.__set_measurements(measurements);
req.__set_values(values);
TSStatus tsStatus;
try {
defaultSessionConnection_->testInsertStringRecord(req);
RpcUtils::verifySuccess(tsStatus);
}
catch (const TTransportException& e) {
log_debug(e.what());
throw IoTDBConnectionException(e.what());
} catch (const IoTDBException& e) {
log_debug(e.what());
throw;
} catch (const exception& e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
}
void Session::testInsertTablet(const Tablet& tablet) {
TSInsertTabletReq request;
request.prefixPath = tablet.deviceId;
for (pair<string, TSDataType::TSDataType> schema : tablet.schemas) {
request.measurements.push_back(schema.first);
request.types.push_back(schema.second);
}
request.__set_timestamps(move(SessionUtils::getTime(tablet)));
request.__set_values(move(SessionUtils::getValue(tablet)));
request.__set_size(tablet.rowSize);
try {
TSStatus tsStatus;
defaultSessionConnection_->testInsertTablet(request);
RpcUtils::verifySuccess(tsStatus);
}
catch (const TTransportException& e) {
log_debug(e.what());
throw IoTDBConnectionException(e.what());
} catch (const IoTDBException& e) {
log_debug(e.what());
throw;
} catch (const exception& e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
}
void Session::testInsertRecords(const vector<string>& deviceIds,
const vector<int64_t>& times,
const vector<vector<string>>& measurementsList,
const vector<vector<string>>& valuesList) {
size_t len = deviceIds.size();
if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) {
logic_error error("deviceIds, times, measurementsList and valuesList's size should be equal");
throw exception(error);
}
TSInsertStringRecordsReq request;
request.__set_prefixPaths(deviceIds);
request.__set_timestamps(times);
request.__set_measurementsList(measurementsList);
request.__set_valuesList(valuesList);
try {
TSStatus tsStatus;
defaultSessionConnection_->getSessionClient()->insertStringRecords(tsStatus, request);
RpcUtils::verifySuccess(tsStatus);
}
catch (const TTransportException& e) {
log_debug(e.what());
throw IoTDBConnectionException(e.what());
} catch (const IoTDBException& e) {
log_debug(e.what());
throw;
} catch (const exception& e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
}
void Session::deleteTimeseries(const string& path) {
vector<string> paths;
paths.push_back(path);
deleteTimeseries(paths);
}
void Session::deleteTimeseries(const vector<string>& paths) {
defaultSessionConnection_->deleteTimeseries(paths);
}
void Session::deleteData(const string& path, int64_t endTime) {
vector<string> paths;
paths.push_back(path);
deleteData(paths, LONG_LONG_MIN, endTime);
}
void Session::deleteData(const vector<string>& paths, int64_t endTime) {
deleteData(paths, LONG_LONG_MIN, endTime);
}
void Session::deleteData(const vector<string>& paths, int64_t startTime, int64_t endTime) {
TSDeleteDataReq req;
req.__set_paths(paths);
req.__set_startTime(startTime);
req.__set_endTime(endTime);
defaultSessionConnection_->deleteData(req);
}
void Session::setStorageGroup(const string& storageGroupId) {
defaultSessionConnection_->setStorageGroup(storageGroupId);
}
void Session::deleteStorageGroup(const string& storageGroup) {
vector<string> storageGroups;
storageGroups.push_back(storageGroup);
deleteStorageGroups(storageGroups);
}
void Session::deleteStorageGroups(const vector<string>& storageGroups) {
defaultSessionConnection_->deleteStorageGroups(storageGroups);
}
void Session::createDatabase(const string& database) {
this->setStorageGroup(database);
}
void Session::deleteDatabase(const string& database) {
this->deleteStorageGroups(vector<string>{database});
}
void Session::deleteDatabases(const vector<string>& databases) {
this->deleteStorageGroups(databases);
}
void Session::createTimeseries(const string& path,
TSDataType::TSDataType dataType,
TSEncoding::TSEncoding encoding,
CompressionType::CompressionType compressor) {
try {
createTimeseries(path, dataType, encoding, compressor, nullptr, nullptr, nullptr, "");
}
catch (const exception& e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
}
void Session::createTimeseries(const string& path,
TSDataType::TSDataType dataType,
TSEncoding::TSEncoding encoding,
CompressionType::CompressionType compressor,
map<string, string>* props,
map<string, string>* tags,
map<string, string>* attributes,
const string& measurementAlias) {
TSCreateTimeseriesReq req;
req.__set_path(path);
req.__set_dataType(dataType);
req.__set_encoding(encoding);
req.__set_compressor(compressor);
if (props != nullptr) {
req.__set_props(*props);
}
if (tags != nullptr) {
req.__set_tags(*tags);
}
if (attributes != nullptr) {
req.__set_attributes(*attributes);
}
if (!measurementAlias.empty()) {
req.__set_measurementAlias(measurementAlias);
}
defaultSessionConnection_->createTimeseries(req);
}
void Session::createMultiTimeseries(const vector<string>& paths,
const vector<TSDataType::TSDataType>& dataTypes,
const vector<TSEncoding::TSEncoding>& encodings,
const vector<CompressionType::CompressionType>& compressors,
vector<map<string, string>>* propsList,
vector<map<string, string>>* tagsList,
vector<map<string, string>>* attributesList,
vector<string>* measurementAliasList) {
TSCreateMultiTimeseriesReq request;
request.__set_paths(paths);
vector<int> dataTypesOrdinal;
dataTypesOrdinal.reserve(dataTypes.size());
for (TSDataType::TSDataType dataType : dataTypes) {
dataTypesOrdinal.push_back(dataType);
}
request.__set_dataTypes(dataTypesOrdinal);
vector<int> encodingsOrdinal;
encodingsOrdinal.reserve(encodings.size());
for (TSEncoding::TSEncoding encoding : encodings) {
encodingsOrdinal.push_back(encoding);
}
request.__set_encodings(encodingsOrdinal);
vector<int> compressorsOrdinal;
compressorsOrdinal.reserve(compressors.size());
for (CompressionType::CompressionType compressor : compressors) {
compressorsOrdinal.push_back(compressor);
}
request.__set_compressors(compressorsOrdinal);
if (propsList != nullptr) {
request.__set_propsList(*propsList);
}
if (tagsList != nullptr) {
request.__set_tagsList(*tagsList);
}
if (attributesList != nullptr) {
request.__set_attributesList(*attributesList);
}
if (measurementAliasList != nullptr) {
request.__set_measurementAliasList(*measurementAliasList);
}
defaultSessionConnection_->createMultiTimeseries(request);
}
void Session::createAlignedTimeseries(const std::string& deviceId,
const std::vector<std::string>& measurements,
const std::vector<TSDataType::TSDataType>& dataTypes,
const std::vector<TSEncoding::TSEncoding>& encodings,
const std::vector<CompressionType::CompressionType>& compressors) {
TSCreateAlignedTimeseriesReq request;
request.__set_prefixPath(deviceId);
request.__set_measurements(measurements);
vector<int> dataTypesOrdinal;
dataTypesOrdinal.reserve(dataTypes.size());
for (TSDataType::TSDataType dataType : dataTypes) {
dataTypesOrdinal.push_back(dataType);
}
request.__set_dataTypes(dataTypesOrdinal);
vector<int> encodingsOrdinal;
encodingsOrdinal.reserve(encodings.size());
for (TSEncoding::TSEncoding encoding : encodings) {
encodingsOrdinal.push_back(encoding);
}
request.__set_encodings(encodingsOrdinal);
vector<int> compressorsOrdinal;
compressorsOrdinal.reserve(compressors.size());
for (CompressionType::CompressionType compressor : compressors) {
compressorsOrdinal.push_back(compressor);
}
request.__set_compressors(compressorsOrdinal);
defaultSessionConnection_->createAlignedTimeseries(request);
}
bool Session::checkTimeseriesExists(const string& path) {
try {
std::unique_ptr<SessionDataSet> dataset = executeQueryStatement("SHOW TIMESERIES " + path);
bool isExisted = dataset->hasNext();
dataset->closeOperationHandle();
return isExisted;
}
catch (const exception& e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
}
shared_ptr<SessionConnection> Session::getQuerySessionConnection() {
auto endPoint = nodesSupplier_->getQueryEndPoint();
if (!endPoint.is_initialized() || endPointToSessionConnection.empty()) {
return defaultSessionConnection_;
}
auto it = endPointToSessionConnection.find(endPoint.value());
if (it != endPointToSessionConnection.end()) {
return it->second;
}
shared_ptr<SessionConnection> newConnection;
try {
newConnection = make_shared<SessionConnection>(this, endPoint.value(), zoneId_, nodesSupplier_,
fetchSize_, 60, 500, sqlDialect_, database_);
endPointToSessionConnection.emplace(endPoint.value(), newConnection);
return newConnection;
}
catch (exception& e) {
log_debug("Session::getQuerySessionConnection() exception: " + e.what());
return newConnection;
}
}
shared_ptr<SessionConnection> Session::getSessionConnection(std::string deviceId) {
if (!enableRedirection_ ||
deviceIdToEndpoint.find(deviceId) == deviceIdToEndpoint.end() ||
endPointToSessionConnection.find(deviceIdToEndpoint[deviceId]) == endPointToSessionConnection.end()) {
return defaultSessionConnection_;
}
return endPointToSessionConnection.find(deviceIdToEndpoint[deviceId])->second;
}
shared_ptr<SessionConnection> Session::getSessionConnection(std::shared_ptr<storage::IDeviceID> deviceId) {
if (!enableRedirection_ ||
tableModelDeviceIdToEndpoint.find(deviceId) == tableModelDeviceIdToEndpoint.end() ||
endPointToSessionConnection.find(tableModelDeviceIdToEndpoint[deviceId]) == endPointToSessionConnection.end()) {
return defaultSessionConnection_;
}
return endPointToSessionConnection.find(tableModelDeviceIdToEndpoint[deviceId])->second;
}
string Session::getTimeZone() {
auto ret = defaultSessionConnection_->getTimeZone();
return ret.timeZone;
}
void Session::setTimeZone(const string& zoneId) {
TSSetTimeZoneReq req;
req.__set_sessionId(defaultSessionConnection_->sessionId);
req.__set_timeZone(zoneId);
defaultSessionConnection_->setTimeZone(req);
}
unique_ptr<SessionDataSet> Session::executeQueryStatement(const string& sql) {
return executeQueryStatementMayRedirect(sql, QUERY_TIMEOUT_MS);
}
unique_ptr<SessionDataSet> Session::executeQueryStatement(const string& sql, int64_t timeoutInMs) {
return executeQueryStatementMayRedirect(sql, timeoutInMs);
}
void Session::handleQueryRedirection(TEndPoint endPoint) {
if (!enableRedirection_) return;
shared_ptr<SessionConnection> newConnection;
auto it = endPointToSessionConnection.find(endPoint);
if (it != endPointToSessionConnection.end()) {
newConnection = it->second;
}
else {
try {
newConnection = make_shared<SessionConnection>(this, endPoint, zoneId_, nodesSupplier_,
fetchSize_, 60, 500, sqlDialect_, database_);
endPointToSessionConnection.emplace(endPoint, newConnection);
}
catch (exception& e) {
throw IoTDBConnectionException(e.what());
}
}
defaultSessionConnection_ = newConnection;
}
void Session::handleRedirection(const std::string& deviceId, TEndPoint endPoint) {
if (!enableRedirection_) return;
if (endPoint.ip == "127.0.0.1") return;
deviceIdToEndpoint[deviceId] = endPoint;
shared_ptr<SessionConnection> newConnection;
auto it = endPointToSessionConnection.find(endPoint);
if (it != endPointToSessionConnection.end()) {
newConnection = it->second;
}
else {
try {
newConnection = make_shared<SessionConnection>(this, endPoint, zoneId_, nodesSupplier_,
fetchSize_, 60, 500, sqlDialect_, database_);
endPointToSessionConnection.emplace(endPoint, newConnection);
}
catch (exception& e) {
deviceIdToEndpoint.erase(deviceId);
throw IoTDBConnectionException(e.what());
}
}
}
void Session::handleRedirection(const std::shared_ptr<storage::IDeviceID>& deviceId, TEndPoint endPoint) {
if (!enableRedirection_) return;
if (endPoint.ip == "127.0.0.1") return;
tableModelDeviceIdToEndpoint[deviceId] = endPoint;
shared_ptr<SessionConnection> newConnection;
auto it = endPointToSessionConnection.find(endPoint);
if (it != endPointToSessionConnection.end()) {
newConnection = it->second;
}
else {
try {
newConnection = make_shared<SessionConnection>(this, endPoint, zoneId_, nodesSupplier_,
fetchSize_, 60, 500, sqlDialect_, database_);
endPointToSessionConnection.emplace(endPoint, newConnection);
}
catch (exception& e) {
tableModelDeviceIdToEndpoint.erase(deviceId);
throw IoTDBConnectionException(e.what());
}
}
}
std::unique_ptr<SessionDataSet> Session::executeQueryStatementMayRedirect(const std::string& sql, int64_t timeoutInMs) {
auto sessionConnection = getQuerySessionConnection();
if (!sessionConnection) {
log_warn("Session connection not found");
return nullptr;
}
try {
return sessionConnection->executeQueryStatement(sql, timeoutInMs);
}
catch (RedirectException& e) {
log_warn("Session connection redirect exception: " + e.what());
handleQueryRedirection(e.endPoint);
try {
return defaultSessionConnection_->executeQueryStatement(sql, timeoutInMs);
}
catch (exception& e) {
log_error("Exception while executing redirected query statement: %s", e.what());
throw ExecutionException(e.what());
}
} catch (exception& e) {
log_error("Exception while executing query statement: %s", e.what());
throw e;
}
}
void Session::executeNonQueryStatement(const string& sql) {
try {
defaultSessionConnection_->executeNonQueryStatement(sql);
}
catch (const exception& e) {
throw IoTDBException(e.what());
}
}
unique_ptr<SessionDataSet>
Session::executeRawDataQuery(const vector<string>& paths, int64_t startTime, int64_t endTime) {
return defaultSessionConnection_->executeRawDataQuery(paths, startTime, endTime);
}
unique_ptr<SessionDataSet> Session::executeLastDataQuery(const vector<string>& paths) {
return executeLastDataQuery(paths, LONG_LONG_MIN);
}
unique_ptr<SessionDataSet> Session::executeLastDataQuery(const vector<string>& paths, int64_t lastTime) {
return defaultSessionConnection_->executeLastDataQuery(paths, lastTime);
}
void Session::createSchemaTemplate(const Template& templ) {
TSCreateSchemaTemplateReq req;
req.__set_name(templ.getName());
req.__set_serializedTemplate(templ.serialize());
defaultSessionConnection_->createSchemaTemplate(req);
}
void Session::setSchemaTemplate(const string& template_name, const string& prefix_path) {
TSSetSchemaTemplateReq req;
req.__set_templateName(template_name);
req.__set_prefixPath(prefix_path);
defaultSessionConnection_->setSchemaTemplate(req);
}
void Session::unsetSchemaTemplate(const string& prefix_path, const string& template_name) {
TSUnsetSchemaTemplateReq req;
req.__set_templateName(template_name);
req.__set_prefixPath(prefix_path);
defaultSessionConnection_->unsetSchemaTemplate(req);
}
void Session::addAlignedMeasurementsInTemplate(const string& template_name, const vector<std::string>& measurements,
const vector<TSDataType::TSDataType>& dataTypes,
const vector<TSEncoding::TSEncoding>& encodings,
const vector<CompressionType::CompressionType>& compressors) {
TSAppendSchemaTemplateReq req;
req.__set_name(template_name);
req.__set_measurements(measurements);
req.__set_isAligned(true);
vector<int> dataTypesOrdinal;
dataTypesOrdinal.reserve(dataTypes.size());
for (TSDataType::TSDataType dataType : dataTypes) {
dataTypesOrdinal.push_back(dataType);
}
req.__set_dataTypes(dataTypesOrdinal);
vector<int> encodingsOrdinal;
encodingsOrdinal.reserve(encodings.size());
for (TSEncoding::TSEncoding encoding : encodings) {
encodingsOrdinal.push_back(encoding);
}
req.__set_encodings(encodingsOrdinal);
vector<int> compressorsOrdinal;
compressorsOrdinal.reserve(compressors.size());
for (CompressionType::CompressionType compressor : compressors) {
compressorsOrdinal.push_back(compressor);
}
req.__set_compressors(compressorsOrdinal);
defaultSessionConnection_->appendSchemaTemplate(req);
}
void Session::addAlignedMeasurementsInTemplate(const string& template_name, const string& measurement,
TSDataType::TSDataType dataType, TSEncoding::TSEncoding encoding,
CompressionType::CompressionType compressor) {
vector<std::string> measurements(1, measurement);
vector<TSDataType::TSDataType> dataTypes(1, dataType);
vector<TSEncoding::TSEncoding> encodings(1, encoding);
vector<CompressionType::CompressionType> compressors(1, compressor);
addAlignedMeasurementsInTemplate(template_name, measurements, dataTypes, encodings, compressors);
}
void Session::addUnalignedMeasurementsInTemplate(const string& template_name, const vector<std::string>& measurements,
const vector<TSDataType::TSDataType>& dataTypes,
const vector<TSEncoding::TSEncoding>& encodings,
const vector<CompressionType::CompressionType>& compressors) {
TSAppendSchemaTemplateReq req;
req.__set_name(template_name);
req.__set_measurements(measurements);
req.__set_isAligned(false);
vector<int> dataTypesOrdinal;
dataTypesOrdinal.reserve(dataTypes.size());
for (TSDataType::TSDataType dataType : dataTypes) {
dataTypesOrdinal.push_back(dataType);
}
req.__set_dataTypes(dataTypesOrdinal);
vector<int> encodingsOrdinal;
encodingsOrdinal.reserve(encodings.size());
for (TSEncoding::TSEncoding encoding : encodings) {
encodingsOrdinal.push_back(encoding);
}
req.__set_encodings(encodingsOrdinal);
vector<int> compressorsOrdinal;
compressorsOrdinal.reserve(compressors.size());
for (CompressionType::CompressionType compressor : compressors) {
compressorsOrdinal.push_back(compressor);
}
req.__set_compressors(compressorsOrdinal);
defaultSessionConnection_->appendSchemaTemplate(req);
}
void Session::addUnalignedMeasurementsInTemplate(const string& template_name, const string& measurement,
TSDataType::TSDataType dataType, TSEncoding::TSEncoding encoding,
CompressionType::CompressionType compressor) {
vector<std::string> measurements(1, measurement);
vector<TSDataType::TSDataType> dataTypes(1, dataType);
vector<TSEncoding::TSEncoding> encodings(1, encoding);
vector<CompressionType::CompressionType> compressors(1, compressor);
addUnalignedMeasurementsInTemplate(template_name, measurements, dataTypes, encodings, compressors);
}
void Session::deleteNodeInTemplate(const string& template_name, const string& path) {
TSPruneSchemaTemplateReq req;
req.__set_name(template_name);
req.__set_path(path);
defaultSessionConnection_->pruneSchemaTemplate(req);
}
int Session::countMeasurementsInTemplate(const string& template_name) {
TSQueryTemplateReq req;
req.__set_name(template_name);
req.__set_queryType(TemplateQueryType::COUNT_MEASUREMENTS);
TSQueryTemplateResp resp = defaultSessionConnection_->querySchemaTemplate(req);
return resp.count;
}
bool Session::isMeasurementInTemplate(const string& template_name, const string& path) {
TSQueryTemplateReq req;
req.__set_name(template_name);
req.__set_measurement(path);
req.__set_queryType(TemplateQueryType::IS_MEASUREMENT);
TSQueryTemplateResp resp = defaultSessionConnection_->querySchemaTemplate(req);
return resp.result;
}
bool Session::isPathExistInTemplate(const string& template_name, const string& path) {
TSQueryTemplateReq req;
req.__set_name(template_name);
req.__set_measurement(path);
req.__set_queryType(TemplateQueryType::PATH_EXIST);
TSQueryTemplateResp resp = defaultSessionConnection_->querySchemaTemplate(req);
return resp.result;
}
std::vector<std::string> Session::showMeasurementsInTemplate(const string& template_name) {
TSQueryTemplateReq req;
req.__set_name(template_name);
req.__set_measurement("");
req.__set_queryType(TemplateQueryType::SHOW_MEASUREMENTS);
TSQueryTemplateResp resp = defaultSessionConnection_->querySchemaTemplate(req);
return resp.measurements;
}
std::vector<std::string> Session::showMeasurementsInTemplate(const string& template_name, const string& pattern) {
TSQueryTemplateReq req;
req.__set_name(template_name);
req.__set_measurement(pattern);
req.__set_queryType(TemplateQueryType::SHOW_MEASUREMENTS);
TSQueryTemplateResp resp = defaultSessionConnection_->querySchemaTemplate(req);
return resp.measurements;
}
bool Session::checkTemplateExists(const string& template_name) {
try {
std::unique_ptr<SessionDataSet> dataset = executeQueryStatement(
"SHOW NODES IN DEVICE TEMPLATE " + template_name);
bool isExisted = dataset->hasNext();
dataset->closeOperationHandle();
return isExisted;
}
catch (const exception& e) {
if (strstr(e.what(), "does not exist") != NULL) {
return false;
}
log_debug(e.what());
throw IoTDBException(e.what());
}
}