/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

#include "Session.h"
#include <algorithm>
#include <memory>
#include <time.h>
#include <future>
#include <unordered_set>
#include "NodesSupplier.h"
#include "SessionDataSet.h"

using namespace std;

/**
* Timeout of query can be set by users.
* A negative number means using the default configuration of server.
* And value 0 will disable the function of query timeout.
*/
static const int64_t QUERY_TIMEOUT_MS = -1;

LogLevelType LOG_LEVEL = LEVEL_DEBUG;

TSDataType::TSDataType getTSDataTypeFromString(const string& str) {
    // BOOLEAN, INT32, INT64, FLOAT, DOUBLE, TEXT, STRING, BLOB, TIMESTAMP, DATE, NULLTYPE
    if (str == "BOOLEAN") {
        return TSDataType::BOOLEAN;
    } else if (str == "INT32") {
        return TSDataType::INT32;
    } else if (str == "INT64") {
        return TSDataType::INT64;
    } else if (str == "FLOAT") {
        return TSDataType::FLOAT;
    } else if (str == "DOUBLE") {
        return TSDataType::DOUBLE;
    } else if (str == "TEXT") {
        return TSDataType::TEXT;
    } else if (str == "TIMESTAMP") {
        return TSDataType::TIMESTAMP;
    } else if (str == "DATE") {
        return TSDataType::DATE;
    } else if (str == "BLOB") {
        return TSDataType::BLOB;
    } else if (str == "STRING") {
        return TSDataType::STRING;
    }
    return TSDataType::UNKNOWN;
}

void Tablet::createColumns() {
    for (size_t i = 0; i < schemas.size(); i++) {
        TSDataType::TSDataType dataType = schemas[i].second;
        switch (dataType) {
        case TSDataType::BOOLEAN:
            values[i] = new bool[maxRowNumber];
            break;
        case TSDataType::DATE:
            values[i] = new boost::gregorian::date[maxRowNumber];
            break;
        case TSDataType::INT32:
            values[i] = new int[maxRowNumber];
            break;
        case TSDataType::TIMESTAMP:
        case TSDataType::INT64:
            values[i] = new int64_t[maxRowNumber];
            break;
        case TSDataType::FLOAT:
            values[i] = new float[maxRowNumber];
            break;
        case TSDataType::DOUBLE:
            values[i] = new double[maxRowNumber];
            break;
        case TSDataType::STRING:
        case TSDataType::BLOB:
        case TSDataType::TEXT:
            values[i] = new string[maxRowNumber];
            break;
        default:
            throw UnSupportedDataTypeException(string("Data type ") + to_string(dataType) + " is not supported.");
        }
    }
}

void Tablet::deleteColumns() {
    for (size_t i = 0; i < schemas.size(); i++) {
        if (!values[i]) continue;
        TSDataType::TSDataType dataType = schemas[i].second;
        switch (dataType) {
        case TSDataType::BOOLEAN: {
            bool* valueBuf = (bool*)(values[i]);
            delete[] valueBuf;
            break;
        }
        case TSDataType::INT32: {
            int* valueBuf = (int*)(values[i]);
            delete[] valueBuf;
            break;
        }
        case TSDataType::DATE: {
            boost::gregorian::date* valueBuf = (boost::gregorian::date*)(values[i]);
            delete[] valueBuf;
            break;
        }
        case TSDataType::TIMESTAMP:
        case TSDataType::INT64: {
            int64_t* valueBuf = (int64_t*)(values[i]);
            delete[] valueBuf;
            break;
        }
        case TSDataType::FLOAT: {
            float* valueBuf = (float*)(values[i]);
            delete[] valueBuf;
            break;
        }
        case TSDataType::DOUBLE: {
            double* valueBuf = (double*)(values[i]);
            delete[] valueBuf;
            break;
        }
        case TSDataType::STRING:
        case TSDataType::BLOB:
        case TSDataType::TEXT: {
            string* valueBuf = (string*)(values[i]);
            delete[] valueBuf;
            break;
        }
        default:
            throw UnSupportedDataTypeException(string("Data type ") + to_string(dataType) + " is not supported.");
        }
        values[i] = nullptr;
    }
}

void Tablet::deepCopyTabletColValue(void* const* srcPtr, void** destPtr, TSDataType::TSDataType type, int maxRowNumber) {
    void *src = *srcPtr;
    switch (type) {
    case TSDataType::BOOLEAN:
        *destPtr = new bool[maxRowNumber];
        memcpy(*destPtr, src, maxRowNumber * sizeof(bool));
        break;
    case TSDataType::INT32:
        *destPtr = new int32_t[maxRowNumber];
        memcpy(*destPtr, src, maxRowNumber * sizeof(int32_t));
        break;
    case TSDataType::INT64:
    case TSDataType::TIMESTAMP:
        *destPtr = new int64_t[maxRowNumber];
        memcpy(*destPtr, src, maxRowNumber * sizeof(int64_t));
        break;
    case TSDataType::FLOAT:
        *destPtr = new float[maxRowNumber];
        memcpy(*destPtr, src, maxRowNumber * sizeof(float));
        break;
    case TSDataType::DOUBLE:
        *destPtr = new double[maxRowNumber];
        memcpy(*destPtr, src, maxRowNumber * sizeof(double));
        break;
    case TSDataType::DATE: {
        *destPtr = new boost::gregorian::date[maxRowNumber];
        boost::gregorian::date* srcDate = static_cast<boost::gregorian::date*>(src);
        boost::gregorian::date* destDate = static_cast<boost::gregorian::date*>(*destPtr);
        for (size_t j = 0; j < maxRowNumber; ++j) {
            destDate[j] = srcDate[j];
        }
        break;
    }
    case TSDataType::STRING:
    case TSDataType::TEXT:
    case TSDataType::BLOB: {
        *destPtr = new std::string[maxRowNumber];
        std::string* srcStr = static_cast<std::string*>(src);
        std::string* destStr = static_cast<std::string*>(*destPtr);
        for (size_t j = 0; j < maxRowNumber; ++j) {
            destStr[j] = srcStr[j];
        }
        break;
    }
    default:
        break;
    }
}

void Tablet::reset() {
    rowSize = 0;
    for (size_t i = 0; i < schemas.size(); i++) {
        bitMaps[i].reset();
    }
}

size_t Tablet::getTimeBytesSize() {
    return rowSize * 8;
}

size_t Tablet::getValueByteSize() {
    size_t valueOccupation = 0;
    for (size_t i = 0; i < schemas.size(); i++) {
        switch (schemas[i].second) {
        case TSDataType::BOOLEAN:
            valueOccupation += rowSize;
            break;
        case TSDataType::INT32:
            valueOccupation += rowSize * 4;
            break;
        case TSDataType::DATE:
            valueOccupation += rowSize * 4;
            break;
        case TSDataType::TIMESTAMP:
        case TSDataType::INT64:
            valueOccupation += rowSize * 8;
            break;
        case TSDataType::FLOAT:
            valueOccupation += rowSize * 4;
            break;
        case TSDataType::DOUBLE:
            valueOccupation += rowSize * 8;
            break;
        case TSDataType::STRING:
        case TSDataType::BLOB:
        case TSDataType::TEXT: {
            valueOccupation += rowSize * 4;
            string* valueBuf = (string*)(values[i]);
            for (size_t j = 0; j < rowSize; j++) {
                valueOccupation += valueBuf[j].size();
            }
            break;
        }
        default:
            throw UnSupportedDataTypeException(
                string("Data type ") + to_string(schemas[i].second) + " is not supported.");
        }
    }
    return valueOccupation;
}

void Tablet::setAligned(bool isAligned) {
    this->isAligned = isAligned;
}

std::shared_ptr<storage::IDeviceID> Tablet::getDeviceID(int row) {
    std::vector<std::string> id_array(idColumnIndexes.size() + 1);
    size_t idArrayIdx = 0;
    id_array[idArrayIdx++] = this->deviceId;
    for (auto idColumnIndex : idColumnIndexes) {
        void* strPtr = getValue(idColumnIndex, row, TSDataType::TEXT);
        id_array[idArrayIdx++] = *static_cast<std::string*>(strPtr);
    }
    return std::make_shared<storage::StringArrayDeviceID>(id_array);
}

string SessionUtils::getTime(const Tablet& tablet) {
    MyStringBuffer timeBuffer;
    unsigned int n = 8u * tablet.rowSize;
    if (n > timeBuffer.str.capacity()) {
        timeBuffer.reserve(n);
    }

    for (size_t i = 0; i < tablet.rowSize; i++) {
        timeBuffer.putInt64(tablet.timestamps[i]);
    }
    return timeBuffer.str;
}

string SessionUtils::getValue(const Tablet& tablet) {
    MyStringBuffer valueBuffer;
    unsigned int n = 8u * tablet.schemas.size() * tablet.rowSize;
    if (n > valueBuffer.str.capacity()) {
        valueBuffer.reserve(n);
    }
    for (size_t i = 0; i < tablet.schemas.size(); i++) {
        TSDataType::TSDataType dataType = tablet.schemas[i].second;
        const BitMap& bitMap = tablet.bitMaps[i];
        switch (dataType) {
        case TSDataType::BOOLEAN: {
            bool* valueBuf = (bool*)(tablet.values[i]);
            for (size_t index = 0; index < tablet.rowSize; index++) {
                if (!bitMap.isMarked(index)) {
                    valueBuffer.putBool(valueBuf[index]);
                }
                else {
                    valueBuffer.putBool(false);
                }
            }
            break;
        }
        case TSDataType::INT32: {
            int* valueBuf = (int*)(tablet.values[i]);
            for (size_t index = 0; index < tablet.rowSize; index++) {
                if (!bitMap.isMarked(index)) {
                    valueBuffer.putInt(valueBuf[index]);
                }
                else {
                    valueBuffer.putInt((numeric_limits<int32_t>::min)());
                }
            }
            break;
        }
        case TSDataType::DATE: {
            boost::gregorian::date* valueBuf = (boost::gregorian::date*)(tablet.values[i]);
            for (size_t index = 0; index < tablet.rowSize; index++) {
                if (!bitMap.isMarked(index)) {
                    valueBuffer.putDate(valueBuf[index]);
                }
                else {
                    valueBuffer.putInt(EMPTY_DATE_INT);
                }
            }
            break;
        }
        case TSDataType::TIMESTAMP:
        case TSDataType::INT64: {
            int64_t* valueBuf = (int64_t*)(tablet.values[i]);
            for (size_t index = 0; index < tablet.rowSize; index++) {
                if (!bitMap.isMarked(index)) {
                    valueBuffer.putInt64(valueBuf[index]);
                }
                else {
                    valueBuffer.putInt64((numeric_limits<int64_t>::min)());
                }
            }
            break;
        }
        case TSDataType::FLOAT: {
            float* valueBuf = (float*)(tablet.values[i]);
            for (size_t index = 0; index < tablet.rowSize; index++) {
                if (!bitMap.isMarked(index)) {
                    valueBuffer.putFloat(valueBuf[index]);
                }
                else {
                    valueBuffer.putFloat((numeric_limits<float>::min)());
                }
            }
            break;
        }
        case TSDataType::DOUBLE: {
            double* valueBuf = (double*)(tablet.values[i]);
            for (size_t index = 0; index < tablet.rowSize; index++) {
                if (!bitMap.isMarked(index)) {
                    valueBuffer.putDouble(valueBuf[index]);
                }
                else {
                    valueBuffer.putDouble((numeric_limits<double>::min)());
                }
            }
            break;
        }
        case TSDataType::STRING:
        case TSDataType::BLOB:
        case TSDataType::TEXT: {
            string* valueBuf = (string*)(tablet.values[i]);
            for (size_t index = 0; index < tablet.rowSize; index++) {
                if (!bitMap.isMarked(index)) {
                    valueBuffer.putString(valueBuf[index]);
                }
                else {
                    valueBuffer.putString("");
                }
            }
            break;
        }
        default:
            throw UnSupportedDataTypeException(string("Data type ") + to_string(dataType) + " is not supported.");
        }
    }
    for (size_t i = 0; i < tablet.schemas.size(); i++) {
        const BitMap& bitMap = tablet.bitMaps[i];
        bool columnHasNull = !bitMap.isAllUnmarked();
        valueBuffer.putChar(columnHasNull ? (char)1 : (char)0);
        if (columnHasNull) {
            const vector<char>& bytes = bitMap.getByteArray();
            for (size_t index = 0; index < tablet.rowSize / 8 + 1; index++) {
                valueBuffer.putChar(bytes[index]);
            }
        }
    }
    return valueBuffer.str;
}

bool SessionUtils::isTabletContainsSingleDevice(Tablet tablet) {
    if (tablet.rowSize == 1) {
        return true;
    }
    auto firstDeviceId = tablet.getDeviceID(0);
    for (int i = 1; i < tablet.rowSize; ++i) {
        if (*firstDeviceId != *tablet.getDeviceID(i)) {
            return false;
        }
    }
    return true;
}

string MeasurementNode::serialize() const {
    MyStringBuffer buffer;
    buffer.putString(getName());
    buffer.putChar(getDataType());
    buffer.putChar(getEncoding());
    buffer.putChar(getCompressionType());
    return buffer.str;
}

string Template::serialize() const {
    MyStringBuffer buffer;
    stack<pair<string, shared_ptr<TemplateNode>>> stack;
    unordered_set<string> alignedPrefix;
    buffer.putString(getName());
    buffer.putBool(isAligned());
    if (isAligned()) {
        alignedPrefix.emplace("");
    }

    for (const auto& child : children_) {
        stack.push(make_pair("", child.second));
    }

    while (!stack.empty()) {
        auto cur = stack.top();
        stack.pop();

        string prefix = cur.first;
        shared_ptr<TemplateNode> cur_node_ptr = cur.second;
        string fullPath(prefix);

        if (!cur_node_ptr->isMeasurement()) {
            if (!prefix.empty()) {
                fullPath.append(".");
            }
            fullPath.append(cur_node_ptr->getName());
            if (cur_node_ptr->isAligned()) {
                alignedPrefix.emplace(fullPath);
            }
            for (const auto& child : cur_node_ptr->getChildren()) {
                stack.push(make_pair(fullPath, child.second));
            }
        }
        else {
            buffer.putString(prefix);
            buffer.putBool(alignedPrefix.find(prefix) != alignedPrefix.end());
            buffer.concat(cur_node_ptr->serialize());
        }
    }

    return buffer.str;
}

/**
 * When delete variable, make sure release all resource.
 */
Session::~Session() {
    try {
        close();
    }
    catch (const exception& e) {
        log_debug(e.what());
    }
}

void Session::removeBrokenSessionConnection(shared_ptr<SessionConnection> sessionConnection) {
    if (enableRedirection_) {
        this->endPointToSessionConnection.erase(sessionConnection->getEndPoint());
    }

    auto it1 = deviceIdToEndpoint.begin();
    while (it1 != deviceIdToEndpoint.end()) {
        if (it1->second == sessionConnection->getEndPoint()) {
            it1 = deviceIdToEndpoint.erase(it1);
        }
        else {
            ++it1;
        }
    }

    auto it2 = tableModelDeviceIdToEndpoint.begin();
    while (it2 != tableModelDeviceIdToEndpoint.end()) {
        if (it2->second == sessionConnection->getEndPoint()) {
            it2 = tableModelDeviceIdToEndpoint.erase(it2);
        }
        else {
            ++it2;
        }
    }
}

/**
   * check whether the batch has been sorted
   *
   * @return whether the batch has been sorted
   */
bool Session::checkSorted(const Tablet& tablet) {
    for (size_t i = 1; i < tablet.rowSize; i++) {
        if (tablet.timestamps[i] < tablet.timestamps[i - 1]) {
            return false;
        }
    }
    return true;
}

bool Session::checkSorted(const vector<int64_t>& times) {
    for (size_t i = 1; i < times.size(); i++) {
        if (times[i] < times[i - 1]) {
            return false;
        }
    }
    return true;
}

template <typename T>
std::vector<T> sortList(const std::vector<T>& valueList, const int* index, int indexLength) {
    std::vector<T> sortedValues(valueList.size());
    for (int i = 0; i < indexLength; i++) {
        sortedValues[i] = valueList[index[i]];
    }
    return sortedValues;
}

template <typename T>
void sortValuesList(T* valueList, const int* index, size_t indexLength) {
    T* sortedValues = new T[indexLength];
    for (int i = 0; i < indexLength; i++) {
        sortedValues[i] = valueList[index[i]];
    }
    for (int i = 0; i < indexLength; i++) {
        valueList[i] = sortedValues[i];
    }
    delete[] sortedValues;
}

void Session::sortTablet(Tablet& tablet) {
    /*
     * following part of code sort the batch data by time,
     * so we can insert continuous data in value list to get a better performance
     */
    // sort to get index, and use index to sort value list
    int* index = new int[tablet.rowSize];
    for (size_t i = 0; i < tablet.rowSize; i++) {
        index[i] = i;
    }

    sortIndexByTimestamp(index, tablet.timestamps, tablet.rowSize);
    tablet.timestamps = sortList(tablet.timestamps, index, tablet.rowSize);
    for (size_t i = 0; i < tablet.schemas.size(); i++) {
        TSDataType::TSDataType dataType = tablet.schemas[i].second;
        switch (dataType) {
        case TSDataType::BOOLEAN: {
            sortValuesList((bool*)(tablet.values[i]), index, tablet.rowSize);
            break;
        }
        case TSDataType::INT32: {
            sortValuesList((int*)(tablet.values[i]), index, tablet.rowSize);
            break;
        }
        case TSDataType::DATE: {
            sortValuesList((boost::gregorian::date*)(tablet.values[i]), index, tablet.rowSize);
            break;
        }
        case TSDataType::TIMESTAMP:
        case TSDataType::INT64: {
            sortValuesList((int64_t*)(tablet.values[i]), index, tablet.rowSize);
            break;
        }
        case TSDataType::FLOAT: {
            sortValuesList((float*)(tablet.values[i]), index, tablet.rowSize);
            break;
        }
        case TSDataType::DOUBLE: {
            sortValuesList((double*)(tablet.values[i]), index, tablet.rowSize);
            break;
        }
        case TSDataType::STRING:
        case TSDataType::BLOB:
        case TSDataType::TEXT: {
            sortValuesList((string*)(tablet.values[i]), index, tablet.rowSize);
            break;
        }
        default:
            throw UnSupportedDataTypeException(string("Data type ") + to_string(dataType) + " is not supported.");
        }
    }

    delete[] index;
}

void Session::sortIndexByTimestamp(int* index, std::vector<int64_t>& timestamps, int length) {
    if (length <= 1) {
        return;
    }

    TsCompare tsCompareObj(timestamps);
    std::sort(&index[0], &index[length], tsCompareObj);
}

/**
 * Append value into buffer in Big Endian order to comply with IoTDB server
 */
void Session::appendValues(string& buffer, const char* value, int size) {
    static bool hasCheckedEndianFlag = false;
    static bool localCpuIsBigEndian = false;
    if (!hasCheckedEndianFlag) {
        hasCheckedEndianFlag = true;
        int chk = 0x0201; //used to distinguish CPU's type (BigEndian or LittleEndian)
        localCpuIsBigEndian = (0x01 != *(char*)(&chk));
    }

    if (localCpuIsBigEndian) {
        buffer.append(value, size);
    }
    else {
        for (int i = size - 1; i >= 0; i--) {
            buffer.append(value + i, 1);
        }
    }
}

void
Session::putValuesIntoBuffer(const vector<TSDataType::TSDataType>& types, const vector<char*>& values, string& buf) {
    int32_t date;
    for (size_t i = 0; i < values.size(); i++) {
        int8_t typeNum = getDataTypeNumber(types[i]);
        buf.append((char*)(&typeNum), sizeof(int8_t));
        switch (types[i]) {
        case TSDataType::BOOLEAN:
            buf.append(values[i], 1);
            break;
        case TSDataType::INT32:
            appendValues(buf, values[i], sizeof(int32_t));
            break;
        case TSDataType::DATE:
            date = parseDateExpressionToInt(*(boost::gregorian::date*)values[i]);
            appendValues(buf, (char*)&date, sizeof(int32_t));
            break;
        case TSDataType::TIMESTAMP:
        case TSDataType::INT64:
            appendValues(buf, values[i], sizeof(int64_t));
            break;
        case TSDataType::FLOAT:
            appendValues(buf, values[i], sizeof(float));
            break;
        case TSDataType::DOUBLE:
            appendValues(buf, values[i], sizeof(double));
            break;
        case TSDataType::STRING:
        case TSDataType::BLOB:
        case TSDataType::TEXT: {
            int32_t len = (uint32_t)strlen(values[i]);
            appendValues(buf, (char*)(&len), sizeof(uint32_t));
            // no need to change the byte order of string value
            buf.append(values[i], len);
            break;
        }
        default:
            break;
        }
    }
}

int8_t Session::getDataTypeNumber(TSDataType::TSDataType type) {
    switch (type) {
    case TSDataType::BOOLEAN:
        return 0;
    case TSDataType::INT32:
        return 1;
    case TSDataType::INT64:
        return 2;
    case TSDataType::FLOAT:
        return 3;
    case TSDataType::DOUBLE:
        return 4;
    case TSDataType::TEXT:
        return 5;
    case TSDataType::TIMESTAMP:
        return 8;
    case TSDataType::DATE:
        return 9;
    case TSDataType::BLOB:
        return 10;
    case TSDataType::STRING:
        return 11;
    default:
        return -1;
    }
}

string Session::getVersionString(Version::Version version) {
    switch (version) {
    case Version::V_0_12:
        return "V_0_12";
    case Version::V_0_13:
        return "V_0_13";
    case Version::V_1_0:
        return "V_1_0";
    default:
        return "V_0_12";
    }
}

void Session::initZoneId() {
    if (!zoneId_.empty()) {
        return;
    }

    time_t ts = 0;
    struct tm tmv;
#if defined(_WIN64) || defined (WIN32) || defined (_WIN32)
    localtime_s(&tmv, &ts);
#else
    localtime_r(&ts, &tmv);
#endif

    char zoneStr[32];
    strftime(zoneStr, sizeof(zoneStr), "%z", &tmv);
    zoneId_ = zoneStr;
}

void Session::initNodesSupplier() {
    std::vector<TEndPoint> endPoints;
    TEndPoint endPoint;
    endPoint.__set_ip(host_);
    endPoint.__set_port(rpcPort_);
    endPoints.emplace_back(endPoint);
    if (enableAutoFetch_) {
        nodesSupplier_ = NodesSupplier::create(endPoints, username_, password_);
    }
    else {
        nodesSupplier_ = make_shared<StaticNodesSupplier>(endPoints);
    }
}

void Session::initDefaultSessionConnection() {
    defaultEndPoint_.__set_ip(host_);
    defaultEndPoint_.__set_port(rpcPort_);
    defaultSessionConnection_ = make_shared<SessionConnection>(this, defaultEndPoint_, zoneId_, nodesSupplier_, fetchSize_,
                                                              60, 500,
                                                              sqlDialect_, database_);
}

void Session::insertStringRecordsWithLeaderCache(vector<string> deviceIds, vector<int64_t> times,
                                                 vector<vector<string>> measurementsList,
                                                 vector<vector<string>> valuesList, bool isAligned) {
    std::unordered_map<std::shared_ptr<SessionConnection>, TSInsertStringRecordsReq> recordsGroup;
    for (int i = 0; i < deviceIds.size(); i++) {
        auto connection = getSessionConnection(deviceIds[i]);
        if (recordsGroup.find(connection) == recordsGroup.end()) {
            TSInsertStringRecordsReq request;
            std::vector<std::string> emptyPrefixPaths;
            std::vector<std::vector<std::string>> emptyMeasurementsList;
            vector<vector<string>> emptyValuesList;
            std::vector<int64_t> emptyTimestamps;
            request.__set_isAligned(isAligned);
            request.__set_prefixPaths(emptyPrefixPaths);
            request.__set_timestamps(emptyTimestamps);
            request.__set_measurementsList(emptyMeasurementsList);
            request.__set_valuesList(emptyValuesList);
            recordsGroup.insert(make_pair(connection, request));
        }
        TSInsertStringRecordsReq& existingReq = recordsGroup[connection];
        existingReq.prefixPaths.emplace_back(deviceIds[i]);
        existingReq.timestamps.emplace_back(times[i]);
        existingReq.measurementsList.emplace_back(measurementsList[i]);
        existingReq.valuesList.emplace_back(valuesList[i]);
    }
    std::function<void(std::shared_ptr<SessionConnection>, const TSInsertStringRecordsReq&)> consumer =
        [](const std::shared_ptr<SessionConnection>& c, const TSInsertStringRecordsReq& r) {
        c->insertStringRecords(r);
    };
    if (recordsGroup.size() == 1) {
        insertOnce(recordsGroup, consumer);
    }
    else {
        insertByGroup(recordsGroup, consumer);
    }
}

void Session::insertRecordsWithLeaderCache(vector<string> deviceIds, vector<int64_t> times,
                                           vector<vector<string>> measurementsList,
                                           const vector<vector<TSDataType::TSDataType>>& typesList,
                                           vector<vector<char*>> valuesList, bool isAligned) {
    std::unordered_map<std::shared_ptr<SessionConnection>, TSInsertRecordsReq> recordsGroup;
    for (int i = 0; i < deviceIds.size(); i++) {
        auto connection = getSessionConnection(deviceIds[i]);
        if (recordsGroup.find(connection) == recordsGroup.end()) {
            TSInsertRecordsReq request;
            std::vector<std::string> emptyPrefixPaths;
            std::vector<std::vector<std::string>> emptyMeasurementsList;
            std::vector<std::string> emptyValuesList;
            std::vector<int64_t> emptyTimestamps;
            request.__set_isAligned(isAligned);
            request.__set_prefixPaths(emptyPrefixPaths);
            request.__set_timestamps(emptyTimestamps);
            request.__set_measurementsList(emptyMeasurementsList);
            request.__set_valuesList(emptyValuesList);
            recordsGroup.insert(make_pair(connection, request));
        }
        TSInsertRecordsReq& existingReq = recordsGroup[connection];
        existingReq.prefixPaths.emplace_back(deviceIds[i]);
        existingReq.timestamps.emplace_back(times[i]);
        existingReq.measurementsList.emplace_back(measurementsList[i]);
        vector<string> bufferList;
        string buffer;
        putValuesIntoBuffer(typesList[i], valuesList[i], buffer);
        existingReq.valuesList.emplace_back(buffer);
        recordsGroup[connection] = existingReq;
    }
    std::function<void(std::shared_ptr<SessionConnection>, const TSInsertRecordsReq&)> consumer =
        [](const std::shared_ptr<SessionConnection>& c, const TSInsertRecordsReq& r) {
        c->insertRecords(r);
    };
    if (recordsGroup.size() == 1) {
        insertOnce(recordsGroup, consumer);
    }
    else {
        insertByGroup(recordsGroup, consumer);
    }
}

void Session::insertTabletsWithLeaderCache(unordered_map<string, Tablet*> tablets, bool sorted, bool isAligned) {
    std::unordered_map<std::shared_ptr<SessionConnection>, TSInsertTabletsReq> tabletsGroup;
    if (tablets.empty()) {
        throw BatchExecutionException("No tablet is inserting!");
    }
    for (const auto& item : tablets) {
        if (isAligned != item.second->isAligned) {
            throw BatchExecutionException("The tablets should be all aligned or non-aligned!");
        }
        if (!checkSorted(*(item.second))) {
            sortTablet(*(item.second));
        }
        auto deviceId = item.first;
        auto tablet = item.second;
        auto connection = getSessionConnection(deviceId);
        auto it = tabletsGroup.find(connection);
        if (it == tabletsGroup.end()) {
            TSInsertTabletsReq request;
            tabletsGroup[connection] = request;
        }
        TSInsertTabletsReq& existingReq = tabletsGroup[connection];
        existingReq.prefixPaths.emplace_back(tablet->deviceId);
        existingReq.timestampsList.emplace_back(move(SessionUtils::getTime(*tablet)));
        existingReq.valuesList.emplace_back(move(SessionUtils::getValue(*tablet)));
        existingReq.sizeList.emplace_back(tablet->rowSize);
        vector<int> dataTypes;
        vector<string> measurements;
        for (pair<string, TSDataType::TSDataType> schema : tablet->schemas) {
            measurements.push_back(schema.first);
            dataTypes.push_back(schema.second);
        }
        existingReq.measurementsList.emplace_back(measurements);
        existingReq.typesList.emplace_back(dataTypes);
    }

    std::function<void(std::shared_ptr<SessionConnection>, const TSInsertTabletsReq&)> consumer =
        [](const std::shared_ptr<SessionConnection>& c, const TSInsertTabletsReq& r) {
        c->insertTablets(r);
    };
    if (tabletsGroup.size() == 1) {
        insertOnce(tabletsGroup, consumer);
    }
    else {
        insertByGroup(tabletsGroup, consumer);
    }
}

void Session::open() {
    open(false, DEFAULT_TIMEOUT_MS);
}

void Session::open(bool enableRPCCompression) {
    open(enableRPCCompression, DEFAULT_TIMEOUT_MS);
}

void Session::open(bool enableRPCCompression, int connectionTimeoutInMs) {
    if (!isClosed_) {
        return;
    }

    try {
        initDefaultSessionConnection();
    }
    catch (const exception& e) {
        log_debug(e.what());
        throw IoTDBException(e.what());
    }
    zoneId_ = defaultSessionConnection_->zoneId;

    if (enableRedirection_) {
        endPointToSessionConnection.insert(make_pair(defaultEndPoint_, defaultSessionConnection_));
    }

    isClosed_ = false;
}


void Session::close() {
    if (isClosed_) {
        return;
    }
    isClosed_ = true;
}


void Session::insertRecord(const string& deviceId, int64_t time,
                           const vector<string>& measurements,
                           const vector<string>& values) {
    TSInsertStringRecordReq req;
    req.__set_prefixPath(deviceId);
    req.__set_timestamp(time);
    req.__set_measurements(measurements);
    req.__set_values(values);
    req.__set_isAligned(false);
    try {
        getSessionConnection(deviceId)->insertStringRecord(req);
    }
    catch (RedirectException& e) {
        handleRedirection(deviceId, e.endPoint);
    } catch (const IoTDBConnectionException& e) {
        if (enableRedirection_ && deviceIdToEndpoint.find(deviceId) != deviceIdToEndpoint.end()) {
            deviceIdToEndpoint.erase(deviceId);
            try {
                defaultSessionConnection_->insertStringRecord(req);
            }
            catch (RedirectException& e) {
            }
        }
        else {
            throw e;
        }
    }
}

void Session::insertRecord(const string& deviceId, int64_t time,
                           const vector<string>& measurements,
                           const vector<TSDataType::TSDataType>& types,
                           const vector<char*>& values) {
    TSInsertRecordReq req;
    req.__set_prefixPath(deviceId);
    req.__set_timestamp(time);
    req.__set_measurements(measurements);
    string buffer;
    putValuesIntoBuffer(types, values, buffer);
    req.__set_values(buffer);
    req.__set_isAligned(false);
    try {
        getSessionConnection(deviceId)->insertRecord(req);
    }
    catch (RedirectException& e) {
        handleRedirection(deviceId, e.endPoint);
    } catch (const IoTDBConnectionException& e) {
        if (enableRedirection_ && deviceIdToEndpoint.find(deviceId) != deviceIdToEndpoint.end()) {
            deviceIdToEndpoint.erase(deviceId);
            try {
                defaultSessionConnection_->insertRecord(req);
            }
            catch (RedirectException& e) {
            }
        }
        else {
            throw e;
        }
    }
}

void Session::insertAlignedRecord(const string& deviceId, int64_t time,
                                  const vector<string>& measurements,
                                  const vector<string>& values) {
    TSInsertStringRecordReq req;
    req.__set_prefixPath(deviceId);
    req.__set_timestamp(time);
    req.__set_measurements(measurements);
    req.__set_values(values);
    req.__set_isAligned(true);
    try {
        getSessionConnection(deviceId)->insertStringRecord(req);
    }
    catch (RedirectException& e) {
        handleRedirection(deviceId, e.endPoint);
    } catch (const IoTDBConnectionException& e) {
        if (enableRedirection_ && deviceIdToEndpoint.find(deviceId) != deviceIdToEndpoint.end()) {
            deviceIdToEndpoint.erase(deviceId);
            try {
                defaultSessionConnection_->insertStringRecord(req);
            }
            catch (RedirectException& e) {
            }
        }
        else {
            throw e;
        }
    }
}

void Session::insertAlignedRecord(const string& deviceId, int64_t time,
                                  const vector<string>& measurements,
                                  const vector<TSDataType::TSDataType>& types,
                                  const vector<char*>& values) {
    TSInsertRecordReq req;
    req.__set_prefixPath(deviceId);
    req.__set_timestamp(time);
    req.__set_measurements(measurements);
    string buffer;
    putValuesIntoBuffer(types, values, buffer);
    req.__set_values(buffer);
    req.__set_isAligned(false);
    try {
        getSessionConnection(deviceId)->insertRecord(req);
    }
    catch (RedirectException& e) {
        handleRedirection(deviceId, e.endPoint);
    } catch (const IoTDBConnectionException& e) {
        if (enableRedirection_ && deviceIdToEndpoint.find(deviceId) != deviceIdToEndpoint.end()) {
            deviceIdToEndpoint.erase(deviceId);
            try {
                defaultSessionConnection_->insertRecord(req);
            }
            catch (RedirectException& e) {
            }
        }
        else {
            throw e;
        }
    }
}

void Session::insertRecords(const vector<string>& deviceIds,
                            const vector<int64_t>& times,
                            const vector<vector<string>>& measurementsList,
                            const vector<vector<string>>& valuesList) {
    size_t len = deviceIds.size();
    if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) {
        logic_error e("deviceIds, times, measurementsList and valuesList's size should be equal");
        throw exception(e);
    }

    if (enableRedirection_) {
        insertStringRecordsWithLeaderCache(deviceIds, times, measurementsList, valuesList, false);
    }
    else {
        TSInsertStringRecordsReq request;
        request.__set_prefixPaths(deviceIds);
        request.__set_timestamps(times);
        request.__set_measurementsList(measurementsList);
        request.__set_valuesList(valuesList);
        request.__set_isAligned(false);
        try {
            defaultSessionConnection_->insertStringRecords(request);
        }
        catch (RedirectException& e) {
        }
    }
}

void Session::insertRecords(const vector<string>& deviceIds,
                            const vector<int64_t>& times,
                            const vector<vector<string>>& measurementsList,
                            const vector<vector<TSDataType::TSDataType>>& typesList,
                            const vector<vector<char*>>& valuesList) {
    size_t len = deviceIds.size();
    if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) {
        logic_error e("deviceIds, times, measurementsList and valuesList's size should be equal");
        throw exception(e);
    }

    if (enableRedirection_) {
        insertRecordsWithLeaderCache(deviceIds, times, measurementsList, typesList, valuesList, false);
    }
    else {
        TSInsertRecordsReq request;
        request.__set_prefixPaths(deviceIds);
        request.__set_timestamps(times);
        request.__set_measurementsList(measurementsList);
        vector<string> bufferList;
        for (size_t i = 0; i < valuesList.size(); i++) {
            string buffer;
            putValuesIntoBuffer(typesList[i], valuesList[i], buffer);
            bufferList.push_back(buffer);
        }
        request.__set_valuesList(bufferList);
        request.__set_isAligned(false);
        try {
            defaultSessionConnection_->insertRecords(request);
        }
        catch (RedirectException& e) {
        }
    }
}

void Session::insertAlignedRecords(const vector<string>& deviceIds,
                                   const vector<int64_t>& times,
                                   const vector<vector<string>>& measurementsList,
                                   const vector<vector<string>>& valuesList) {
    size_t len = deviceIds.size();
    if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) {
        logic_error e("deviceIds, times, measurementsList and valuesList's size should be equal");
        throw exception(e);
    }

    if (enableRedirection_) {
        insertStringRecordsWithLeaderCache(deviceIds, times, measurementsList, valuesList, true);
    }
    else {
        TSInsertStringRecordsReq request;
        request.__set_prefixPaths(deviceIds);
        request.__set_timestamps(times);
        request.__set_measurementsList(measurementsList);
        request.__set_valuesList(valuesList);
        request.__set_isAligned(true);
        try {
            defaultSessionConnection_->insertStringRecords(request);
        }
        catch (RedirectException& e) {
        }
    }
}

void Session::insertAlignedRecords(const vector<string>& deviceIds,
                                   const vector<int64_t>& times,
                                   const vector<vector<string>>& measurementsList,
                                   const vector<vector<TSDataType::TSDataType>>& typesList,
                                   const vector<vector<char*>>& valuesList) {
    size_t len = deviceIds.size();
    if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) {
        logic_error e("deviceIds, times, measurementsList and valuesList's size should be equal");
        throw exception(e);
    }

    if (enableRedirection_) {
        insertRecordsWithLeaderCache(deviceIds, times, measurementsList, typesList, valuesList, true);
    }
    else {
        TSInsertRecordsReq request;
        request.__set_prefixPaths(deviceIds);
        request.__set_timestamps(times);
        request.__set_measurementsList(measurementsList);
        vector<string> bufferList;
        for (size_t i = 0; i < valuesList.size(); i++) {
            string buffer;
            putValuesIntoBuffer(typesList[i], valuesList[i], buffer);
            bufferList.push_back(buffer);
        }
        request.__set_valuesList(bufferList);
        request.__set_isAligned(false);
        try {
            defaultSessionConnection_->insertRecords(request);
        }
        catch (RedirectException& e) {
        }
    }
}

void Session::insertRecordsOfOneDevice(const string& deviceId,
                                       vector<int64_t>& times,
                                       vector<vector<string>>& measurementsList,
                                       vector<vector<TSDataType::TSDataType>>& typesList,
                                       vector<vector<char*>>& valuesList) {
    insertRecordsOfOneDevice(deviceId, times, measurementsList, typesList, valuesList, false);
}

void Session::insertRecordsOfOneDevice(const string& deviceId,
                                       vector<int64_t>& times,
                                       vector<vector<string>>& measurementsList,
                                       vector<vector<TSDataType::TSDataType>>& typesList,
                                       vector<vector<char*>>& valuesList,
                                       bool sorted) {
    if (!checkSorted(times)) {
        int* index = new int[times.size()];
        for (size_t i = 0; i < times.size(); i++) {
            index[i] = (int)i;
        }

        sortIndexByTimestamp(index, times, (int)(times.size()));
        times = sortList(times, index, (int)(times.size()));
        measurementsList = sortList(measurementsList, index, (int)(times.size()));
        typesList = sortList(typesList, index, (int)(times.size()));
        valuesList = sortList(valuesList, index, (int)(times.size()));
        delete[] index;
    }
    TSInsertRecordsOfOneDeviceReq request;
    request.__set_prefixPath(deviceId);
    request.__set_timestamps(times);
    request.__set_measurementsList(measurementsList);
    vector<string> bufferList;
    for (size_t i = 0; i < valuesList.size(); i++) {
        string buffer;
        putValuesIntoBuffer(typesList[i], valuesList[i], buffer);
        bufferList.push_back(buffer);
    }
    request.__set_valuesList(bufferList);
    request.__set_isAligned(false);
    TSStatus respStatus;
    try {
        getSessionConnection(deviceId)->insertRecordsOfOneDevice(request);
    }
    catch (RedirectException& e) {
        handleRedirection(deviceId, e.endPoint);
    } catch (const IoTDBConnectionException& e) {
        if (enableRedirection_ && deviceIdToEndpoint.find(deviceId) != deviceIdToEndpoint.end()) {
            deviceIdToEndpoint.erase(deviceId);
            try {
                defaultSessionConnection_->insertRecordsOfOneDevice(request);
            }
            catch (RedirectException& e) {
            }
        }
        else {
            throw e;
        }
    }
}

void Session::insertAlignedRecordsOfOneDevice(const string& deviceId,
                                              vector<int64_t>& times,
                                              vector<vector<string>>& measurementsList,
                                              vector<vector<TSDataType::TSDataType>>& typesList,
                                              vector<vector<char*>>& valuesList) {
    insertAlignedRecordsOfOneDevice(deviceId, times, measurementsList, typesList, valuesList, false);
}

void Session::insertAlignedRecordsOfOneDevice(const string& deviceId,
                                              vector<int64_t>& times,
                                              vector<vector<string>>& measurementsList,
                                              vector<vector<TSDataType::TSDataType>>& typesList,
                                              vector<vector<char*>>& valuesList,
                                              bool sorted) {
    if (!checkSorted(times)) {
        int* index = new int[times.size()];
        for (size_t i = 0; i < times.size(); i++) {
            index[i] = (int)i;
        }

        sortIndexByTimestamp(index, times, (int)(times.size()));
        times = sortList(times, index, (int)(times.size()));
        measurementsList = sortList(measurementsList, index, (int)(times.size()));
        typesList = sortList(typesList, index, (int)(times.size()));
        valuesList = sortList(valuesList, index, (int)(times.size()));
        delete[] index;
    }
    TSInsertRecordsOfOneDeviceReq request;
    request.__set_prefixPath(deviceId);
    request.__set_timestamps(times);
    request.__set_measurementsList(measurementsList);
    vector<string> bufferList;
    for (size_t i = 0; i < valuesList.size(); i++) {
        string buffer;
        putValuesIntoBuffer(typesList[i], valuesList[i], buffer);
        bufferList.push_back(buffer);
    }
    request.__set_valuesList(bufferList);
    request.__set_isAligned(true);
    TSStatus respStatus;
    try {
        getSessionConnection(deviceId)->insertRecordsOfOneDevice(request);
    }
    catch (RedirectException& e) {
        handleRedirection(deviceId, e.endPoint);
    } catch (const IoTDBConnectionException& e) {
        if (enableRedirection_ && deviceIdToEndpoint.find(deviceId) != deviceIdToEndpoint.end()) {
            deviceIdToEndpoint.erase(deviceId);
            try {
                defaultSessionConnection_->insertRecordsOfOneDevice(request);
            }
            catch (RedirectException& e) {
            }
        }
        else {
            throw e;
        }
    }
}

void Session::insertTablet(Tablet& tablet) {
    try {
        insertTablet(tablet, false);
    }
    catch (const exception& e) {
        log_debug(e.what());
        logic_error error(e.what());
        throw exception(error);
    }
}

void Session::buildInsertTabletReq(TSInsertTabletReq& request, Tablet& tablet, bool sorted) {
    if ((!sorted) && !checkSorted(tablet)) {
        sortTablet(tablet);
    }

    request.prefixPath = tablet.deviceId;

    request.measurements.reserve(tablet.schemas.size());
    request.types.reserve(tablet.schemas.size());
    for (pair<string, TSDataType::TSDataType> schema : tablet.schemas) {
        request.measurements.push_back(schema.first);
        request.types.push_back(schema.second);
    }
    request.values = move(SessionUtils::getValue(tablet));
    request.timestamps = move(SessionUtils::getTime(tablet));
    request.__set_size(tablet.rowSize);
    request.__set_isAligned(tablet.isAligned);
}

void Session::insertTablet(TSInsertTabletReq request) {
    auto deviceId = request.prefixPath;
    try {
        getSessionConnection(deviceId)->insertTablet(request);
    }
    catch (RedirectException& e) {
        handleRedirection(deviceId, e.endPoint);
    } catch (const IoTDBConnectionException& e) {
        if (enableRedirection_ && deviceIdToEndpoint.find(deviceId) != deviceIdToEndpoint.end()) {
            deviceIdToEndpoint.erase(deviceId);
            try {
                defaultSessionConnection_->insertTablet(request);
            }
            catch (RedirectException& e) {
            }
        }
        else {
            throw e;
        }
    }
}

void Session::insertTablet(Tablet& tablet, bool sorted) {
    TSInsertTabletReq request;
    buildInsertTabletReq(request, tablet, sorted);
    insertTablet(request);
}

void Session::insertAlignedTablet(Tablet& tablet) {
    insertAlignedTablet(tablet, false);
}

void Session::insertAlignedTablet(Tablet& tablet, bool sorted) {
    tablet.setAligned(true);
    try {
        insertTablet(tablet, sorted);
    }
    catch (const exception& e) {
        log_debug(e.what());
        logic_error error(e.what());
        throw exception(error);
    }
}

void Session::insertTablets(unordered_map<string, Tablet*>& tablets) {
    try {
        insertTablets(tablets, false);
    }
    catch (const exception& e) {
        log_debug(e.what());
        logic_error error(e.what());
        throw exception(error);
    }
}

void Session::insertTablets(unordered_map<string, Tablet*>& tablets, bool sorted) {
    if (tablets.empty()) {
        throw BatchExecutionException("No tablet is inserting!");
    }
    auto beginIter = tablets.begin();
    bool isAligned = ((*beginIter).second)->isAligned;
    if (enableRedirection_) {
        insertTabletsWithLeaderCache(tablets, sorted, isAligned);
    }
    else {
        TSInsertTabletsReq request;
        for (const auto& item : tablets) {
            if (isAligned != item.second->isAligned) {
                throw BatchExecutionException("The tablets should be all aligned or non-aligned!");
            }
            if (!checkSorted(*(item.second))) {
                sortTablet(*(item.second));
            }
            request.prefixPaths.push_back(item.second->deviceId);
            vector<string> measurements;
            vector<int> dataTypes;
            for (pair<string, TSDataType::TSDataType> schema : item.second->schemas) {
                measurements.push_back(schema.first);
                dataTypes.push_back(schema.second);
            }
            request.measurementsList.push_back(measurements);
            request.typesList.push_back(dataTypes);
            request.timestampsList.push_back(move(SessionUtils::getTime(*(item.second))));
            request.valuesList.push_back(move(SessionUtils::getValue(*(item.second))));
            request.sizeList.push_back(item.second->rowSize);
        }
        request.__set_isAligned(isAligned);
        try {
            TSStatus respStatus;
            defaultSessionConnection_->insertTablets(request);
            RpcUtils::verifySuccess(respStatus);
        }
        catch (RedirectException& e) {
        }
    }
}


void Session::insertAlignedTablets(unordered_map<string, Tablet*>& tablets, bool sorted) {
    for (auto iter = tablets.begin(); iter != tablets.end(); iter++) {
        iter->second->setAligned(true);
    }
    try {
        insertTablets(tablets, sorted);
    }
    catch (const exception& e) {
        log_debug(e.what());
        logic_error error(e.what());
        throw exception(error);
    }
}

void Session::testInsertRecord(const string& deviceId, int64_t time, const vector<string>& measurements,
                               const vector<string>& values) {
    TSInsertStringRecordReq req;
    req.__set_prefixPath(deviceId);
    req.__set_timestamp(time);
    req.__set_measurements(measurements);
    req.__set_values(values);
    TSStatus tsStatus;
    try {
        defaultSessionConnection_->testInsertStringRecord(req);
        RpcUtils::verifySuccess(tsStatus);
    }
    catch (const TTransportException& e) {
        log_debug(e.what());
        throw IoTDBConnectionException(e.what());
    } catch (const IoTDBException& e) {
        log_debug(e.what());
        throw;
    } catch (const exception& e) {
        log_debug(e.what());
        throw IoTDBException(e.what());
    }
}

void Session::testInsertTablet(const Tablet& tablet) {
    TSInsertTabletReq request;
    request.prefixPath = tablet.deviceId;
    for (pair<string, TSDataType::TSDataType> schema : tablet.schemas) {
        request.measurements.push_back(schema.first);
        request.types.push_back(schema.second);
    }
    request.__set_timestamps(move(SessionUtils::getTime(tablet)));
    request.__set_values(move(SessionUtils::getValue(tablet)));
    request.__set_size(tablet.rowSize);
    try {
        TSStatus tsStatus;
        defaultSessionConnection_->testInsertTablet(request);
        RpcUtils::verifySuccess(tsStatus);
    }
    catch (const TTransportException& e) {
        log_debug(e.what());
        throw IoTDBConnectionException(e.what());
    } catch (const IoTDBException& e) {
        log_debug(e.what());
        throw;
    } catch (const exception& e) {
        log_debug(e.what());
        throw IoTDBException(e.what());
    }
}

void Session::testInsertRecords(const vector<string>& deviceIds,
                                const vector<int64_t>& times,
                                const vector<vector<string>>& measurementsList,
                                const vector<vector<string>>& valuesList) {
    size_t len = deviceIds.size();
    if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) {
        logic_error error("deviceIds, times, measurementsList and valuesList's size should be equal");
        throw exception(error);
    }
    TSInsertStringRecordsReq request;
    request.__set_prefixPaths(deviceIds);
    request.__set_timestamps(times);
    request.__set_measurementsList(measurementsList);
    request.__set_valuesList(valuesList);

    try {
        TSStatus tsStatus;
        defaultSessionConnection_->getSessionClient()->insertStringRecords(tsStatus, request);
        RpcUtils::verifySuccess(tsStatus);
    }
    catch (const TTransportException& e) {
        log_debug(e.what());
        throw IoTDBConnectionException(e.what());
    } catch (const IoTDBException& e) {
        log_debug(e.what());
        throw;
    } catch (const exception& e) {
        log_debug(e.what());
        throw IoTDBException(e.what());
    }
}

void Session::deleteTimeseries(const string& path) {
    vector<string> paths;
    paths.push_back(path);
    deleteTimeseries(paths);
}

void Session::deleteTimeseries(const vector<string>& paths) {
    defaultSessionConnection_->deleteTimeseries(paths);
}

void Session::deleteData(const string& path, int64_t endTime) {
    vector<string> paths;
    paths.push_back(path);
    deleteData(paths, LONG_LONG_MIN, endTime);
}

void Session::deleteData(const vector<string>& paths, int64_t endTime) {
    deleteData(paths, LONG_LONG_MIN, endTime);
}

void Session::deleteData(const vector<string>& paths, int64_t startTime, int64_t endTime) {
    TSDeleteDataReq req;
    req.__set_paths(paths);
    req.__set_startTime(startTime);
    req.__set_endTime(endTime);
    defaultSessionConnection_->deleteData(req);
}

void Session::setStorageGroup(const string& storageGroupId) {
    defaultSessionConnection_->setStorageGroup(storageGroupId);
}

void Session::deleteStorageGroup(const string& storageGroup) {
    vector<string> storageGroups;
    storageGroups.push_back(storageGroup);
    deleteStorageGroups(storageGroups);
}

void Session::deleteStorageGroups(const vector<string>& storageGroups) {
    defaultSessionConnection_->deleteStorageGroups(storageGroups);
}

void Session::createDatabase(const string& database) {
    this->setStorageGroup(database);
}

void Session::deleteDatabase(const string& database) {
    this->deleteStorageGroups(vector<string>{database});
}

void Session::deleteDatabases(const vector<string>& databases) {
    this->deleteStorageGroups(databases);
}

void Session::createTimeseries(const string& path,
                               TSDataType::TSDataType dataType,
                               TSEncoding::TSEncoding encoding,
                               CompressionType::CompressionType compressor) {
    try {
        createTimeseries(path, dataType, encoding, compressor, nullptr, nullptr, nullptr, "");
    }
    catch (const exception& e) {
        log_debug(e.what());
        throw IoTDBException(e.what());
    }
}

void Session::createTimeseries(const string& path,
                               TSDataType::TSDataType dataType,
                               TSEncoding::TSEncoding encoding,
                               CompressionType::CompressionType compressor,
                               map<string, string>* props,
                               map<string, string>* tags,
                               map<string, string>* attributes,
                               const string& measurementAlias) {
    TSCreateTimeseriesReq req;
    req.__set_path(path);
    req.__set_dataType(dataType);
    req.__set_encoding(encoding);
    req.__set_compressor(compressor);
    if (props != nullptr) {
        req.__set_props(*props);
    }

    if (tags != nullptr) {
        req.__set_tags(*tags);
    }
    if (attributes != nullptr) {
        req.__set_attributes(*attributes);
    }
    if (!measurementAlias.empty()) {
        req.__set_measurementAlias(measurementAlias);
    }
    defaultSessionConnection_->createTimeseries(req);
}

void Session::createMultiTimeseries(const vector<string>& paths,
                                    const vector<TSDataType::TSDataType>& dataTypes,
                                    const vector<TSEncoding::TSEncoding>& encodings,
                                    const vector<CompressionType::CompressionType>& compressors,
                                    vector<map<string, string>>* propsList,
                                    vector<map<string, string>>* tagsList,
                                    vector<map<string, string>>* attributesList,
                                    vector<string>* measurementAliasList) {
    TSCreateMultiTimeseriesReq request;
    request.__set_paths(paths);

    vector<int> dataTypesOrdinal;
    dataTypesOrdinal.reserve(dataTypes.size());
    for (TSDataType::TSDataType dataType : dataTypes) {
        dataTypesOrdinal.push_back(dataType);
    }
    request.__set_dataTypes(dataTypesOrdinal);

    vector<int> encodingsOrdinal;
    encodingsOrdinal.reserve(encodings.size());
    for (TSEncoding::TSEncoding encoding : encodings) {
        encodingsOrdinal.push_back(encoding);
    }
    request.__set_encodings(encodingsOrdinal);

    vector<int> compressorsOrdinal;
    compressorsOrdinal.reserve(compressors.size());
    for (CompressionType::CompressionType compressor : compressors) {
        compressorsOrdinal.push_back(compressor);
    }
    request.__set_compressors(compressorsOrdinal);

    if (propsList != nullptr) {
        request.__set_propsList(*propsList);
    }

    if (tagsList != nullptr) {
        request.__set_tagsList(*tagsList);
    }
    if (attributesList != nullptr) {
        request.__set_attributesList(*attributesList);
    }
    if (measurementAliasList != nullptr) {
        request.__set_measurementAliasList(*measurementAliasList);
    }

    defaultSessionConnection_->createMultiTimeseries(request);
}

void Session::createAlignedTimeseries(const std::string& deviceId,
                                      const std::vector<std::string>& measurements,
                                      const std::vector<TSDataType::TSDataType>& dataTypes,
                                      const std::vector<TSEncoding::TSEncoding>& encodings,
                                      const std::vector<CompressionType::CompressionType>& compressors) {
    TSCreateAlignedTimeseriesReq request;
    request.__set_prefixPath(deviceId);
    request.__set_measurements(measurements);

    vector<int> dataTypesOrdinal;
    dataTypesOrdinal.reserve(dataTypes.size());
    for (TSDataType::TSDataType dataType : dataTypes) {
        dataTypesOrdinal.push_back(dataType);
    }
    request.__set_dataTypes(dataTypesOrdinal);

    vector<int> encodingsOrdinal;
    encodingsOrdinal.reserve(encodings.size());
    for (TSEncoding::TSEncoding encoding : encodings) {
        encodingsOrdinal.push_back(encoding);
    }
    request.__set_encodings(encodingsOrdinal);

    vector<int> compressorsOrdinal;
    compressorsOrdinal.reserve(compressors.size());
    for (CompressionType::CompressionType compressor : compressors) {
        compressorsOrdinal.push_back(compressor);
    }
    request.__set_compressors(compressorsOrdinal);

    defaultSessionConnection_->createAlignedTimeseries(request);
}

bool Session::checkTimeseriesExists(const string& path) {
    try {
        std::unique_ptr<SessionDataSet> dataset = executeQueryStatement("SHOW TIMESERIES " + path);
        bool isExisted = dataset->hasNext();
        dataset->closeOperationHandle();
        return isExisted;
    }
    catch (const exception& e) {
        log_debug(e.what());
        throw IoTDBException(e.what());
    }
}

shared_ptr<SessionConnection> Session::getQuerySessionConnection() {
    auto endPoint = nodesSupplier_->getQueryEndPoint();
    if (!endPoint.is_initialized() || endPointToSessionConnection.empty()) {
        return defaultSessionConnection_;
    }

    auto it = endPointToSessionConnection.find(endPoint.value());
    if (it != endPointToSessionConnection.end()) {
        return it->second;
    }

    shared_ptr<SessionConnection> newConnection;
    try {
        newConnection = make_shared<SessionConnection>(this, endPoint.value(), zoneId_, nodesSupplier_,
                                                       fetchSize_, 60, 500, sqlDialect_, database_);
        endPointToSessionConnection.emplace(endPoint.value(), newConnection);
        return newConnection;
    }
    catch (exception& e) {
        log_debug("Session::getQuerySessionConnection() exception: " + e.what());
        return newConnection;
    }
}

shared_ptr<SessionConnection> Session::getSessionConnection(std::string deviceId) {
    if (!enableRedirection_ ||
        deviceIdToEndpoint.find(deviceId) == deviceIdToEndpoint.end() ||
        endPointToSessionConnection.find(deviceIdToEndpoint[deviceId]) == endPointToSessionConnection.end()) {
        return defaultSessionConnection_;
    }
    return endPointToSessionConnection.find(deviceIdToEndpoint[deviceId])->second;
}

shared_ptr<SessionConnection> Session::getSessionConnection(std::shared_ptr<storage::IDeviceID> deviceId) {
    if (!enableRedirection_ ||
        tableModelDeviceIdToEndpoint.find(deviceId) == tableModelDeviceIdToEndpoint.end() ||
        endPointToSessionConnection.find(tableModelDeviceIdToEndpoint[deviceId]) == endPointToSessionConnection.end()) {
        return defaultSessionConnection_;
    }
    return endPointToSessionConnection.find(tableModelDeviceIdToEndpoint[deviceId])->second;
}

string Session::getTimeZone() {
    auto ret = defaultSessionConnection_->getTimeZone();
    return ret.timeZone;
}

void Session::setTimeZone(const string& zoneId) {
    TSSetTimeZoneReq req;
    req.__set_sessionId(defaultSessionConnection_->sessionId);
    req.__set_timeZone(zoneId);
    defaultSessionConnection_->setTimeZone(req);
}

unique_ptr<SessionDataSet> Session::executeQueryStatement(const string& sql) {
    return executeQueryStatementMayRedirect(sql, QUERY_TIMEOUT_MS);
}

unique_ptr<SessionDataSet> Session::executeQueryStatement(const string& sql, int64_t timeoutInMs) {
    return executeQueryStatementMayRedirect(sql, timeoutInMs);
}

void Session::handleQueryRedirection(TEndPoint endPoint) {
    if (!enableRedirection_) return;
    shared_ptr<SessionConnection> newConnection;
    auto it = endPointToSessionConnection.find(endPoint);
    if (it != endPointToSessionConnection.end()) {
        newConnection = it->second;
    }
    else {
        try {
            newConnection = make_shared<SessionConnection>(this, endPoint, zoneId_, nodesSupplier_,
                                                           fetchSize_, 60, 500, sqlDialect_, database_);

            endPointToSessionConnection.emplace(endPoint, newConnection);
        }
        catch (exception& e) {
            throw IoTDBConnectionException(e.what());
        }
    }
    defaultSessionConnection_ = newConnection;
}

void Session::handleRedirection(const std::string& deviceId, TEndPoint endPoint) {
    if (!enableRedirection_) return;
    if (endPoint.ip == "127.0.0.1") return;
    deviceIdToEndpoint[deviceId] = endPoint;

    shared_ptr<SessionConnection> newConnection;
    auto it = endPointToSessionConnection.find(endPoint);
    if (it != endPointToSessionConnection.end()) {
        newConnection = it->second;
    }
    else {
        try {
            newConnection = make_shared<SessionConnection>(this, endPoint, zoneId_, nodesSupplier_,
                                                           fetchSize_, 60, 500, sqlDialect_, database_);
            endPointToSessionConnection.emplace(endPoint, newConnection);
        }
        catch (exception& e) {
            deviceIdToEndpoint.erase(deviceId);
            throw IoTDBConnectionException(e.what());
        }
    }
}

void Session::handleRedirection(const std::shared_ptr<storage::IDeviceID>& deviceId, TEndPoint endPoint) {
    if (!enableRedirection_) return;
    if (endPoint.ip == "127.0.0.1") return;
    tableModelDeviceIdToEndpoint[deviceId] = endPoint;

    shared_ptr<SessionConnection> newConnection;
    auto it = endPointToSessionConnection.find(endPoint);
    if (it != endPointToSessionConnection.end()) {
        newConnection = it->second;
    }
    else {
        try {
            newConnection = make_shared<SessionConnection>(this, endPoint, zoneId_, nodesSupplier_,
                                                           fetchSize_, 60, 500, sqlDialect_, database_);
            endPointToSessionConnection.emplace(endPoint, newConnection);
        }
        catch (exception& e) {
            tableModelDeviceIdToEndpoint.erase(deviceId);
            throw IoTDBConnectionException(e.what());
        }
    }
}

std::unique_ptr<SessionDataSet> Session::executeQueryStatementMayRedirect(const std::string& sql, int64_t timeoutInMs) {
    auto sessionConnection = getQuerySessionConnection();
    if (!sessionConnection) {
        log_warn("Session connection not found");
        return nullptr;
    }
    try {
        return sessionConnection->executeQueryStatement(sql, timeoutInMs);
    }
    catch (RedirectException& e) {
        log_warn("Session connection redirect exception: " + e.what());
        handleQueryRedirection(e.endPoint);
        try {
            return defaultSessionConnection_->executeQueryStatement(sql, timeoutInMs);
        }
        catch (exception& e) {
            log_error("Exception while executing redirected query statement: %s", e.what());
            throw ExecutionException(e.what());
        }
    } catch (exception& e) {
        log_error("Exception while executing query statement: %s", e.what());
        throw e;
    }
}

void Session::executeNonQueryStatement(const string& sql) {
    try {
        defaultSessionConnection_->executeNonQueryStatement(sql);
    }
    catch (const exception& e) {
        throw IoTDBException(e.what());
    }
}

unique_ptr<SessionDataSet>
Session::executeRawDataQuery(const vector<string>& paths, int64_t startTime, int64_t endTime) {
    return defaultSessionConnection_->executeRawDataQuery(paths, startTime, endTime);
}


unique_ptr<SessionDataSet> Session::executeLastDataQuery(const vector<string>& paths) {
    return executeLastDataQuery(paths, LONG_LONG_MIN);
}

unique_ptr<SessionDataSet> Session::executeLastDataQuery(const vector<string>& paths, int64_t lastTime) {
    return defaultSessionConnection_->executeLastDataQuery(paths, lastTime);
}

void Session::createSchemaTemplate(const Template& templ) {
    TSCreateSchemaTemplateReq req;
    req.__set_name(templ.getName());
    req.__set_serializedTemplate(templ.serialize());
    defaultSessionConnection_->createSchemaTemplate(req);
}

void Session::setSchemaTemplate(const string& template_name, const string& prefix_path) {
    TSSetSchemaTemplateReq req;
    req.__set_templateName(template_name);
    req.__set_prefixPath(prefix_path);
    defaultSessionConnection_->setSchemaTemplate(req);
}

void Session::unsetSchemaTemplate(const string& prefix_path, const string& template_name) {
    TSUnsetSchemaTemplateReq req;
    req.__set_templateName(template_name);
    req.__set_prefixPath(prefix_path);
    defaultSessionConnection_->unsetSchemaTemplate(req);
}

void Session::addAlignedMeasurementsInTemplate(const string& template_name, const vector<std::string>& measurements,
                                               const vector<TSDataType::TSDataType>& dataTypes,
                                               const vector<TSEncoding::TSEncoding>& encodings,
                                               const vector<CompressionType::CompressionType>& compressors) {
    TSAppendSchemaTemplateReq req;
    req.__set_name(template_name);
    req.__set_measurements(measurements);
    req.__set_isAligned(true);

    vector<int> dataTypesOrdinal;
    dataTypesOrdinal.reserve(dataTypes.size());
    for (TSDataType::TSDataType dataType : dataTypes) {
        dataTypesOrdinal.push_back(dataType);
    }
    req.__set_dataTypes(dataTypesOrdinal);

    vector<int> encodingsOrdinal;
    encodingsOrdinal.reserve(encodings.size());
    for (TSEncoding::TSEncoding encoding : encodings) {
        encodingsOrdinal.push_back(encoding);
    }
    req.__set_encodings(encodingsOrdinal);

    vector<int> compressorsOrdinal;
    compressorsOrdinal.reserve(compressors.size());
    for (CompressionType::CompressionType compressor : compressors) {
        compressorsOrdinal.push_back(compressor);
    }
    req.__set_compressors(compressorsOrdinal);

    defaultSessionConnection_->appendSchemaTemplate(req);
}

void Session::addAlignedMeasurementsInTemplate(const string& template_name, const string& measurement,
                                               TSDataType::TSDataType dataType, TSEncoding::TSEncoding encoding,
                                               CompressionType::CompressionType compressor) {
    vector<std::string> measurements(1, measurement);
    vector<TSDataType::TSDataType> dataTypes(1, dataType);
    vector<TSEncoding::TSEncoding> encodings(1, encoding);
    vector<CompressionType::CompressionType> compressors(1, compressor);
    addAlignedMeasurementsInTemplate(template_name, measurements, dataTypes, encodings, compressors);
}

void Session::addUnalignedMeasurementsInTemplate(const string& template_name, const vector<std::string>& measurements,
                                                 const vector<TSDataType::TSDataType>& dataTypes,
                                                 const vector<TSEncoding::TSEncoding>& encodings,
                                                 const vector<CompressionType::CompressionType>& compressors) {
    TSAppendSchemaTemplateReq req;
    req.__set_name(template_name);
    req.__set_measurements(measurements);
    req.__set_isAligned(false);

    vector<int> dataTypesOrdinal;
    dataTypesOrdinal.reserve(dataTypes.size());
    for (TSDataType::TSDataType dataType : dataTypes) {
        dataTypesOrdinal.push_back(dataType);
    }
    req.__set_dataTypes(dataTypesOrdinal);

    vector<int> encodingsOrdinal;
    encodingsOrdinal.reserve(encodings.size());
    for (TSEncoding::TSEncoding encoding : encodings) {
        encodingsOrdinal.push_back(encoding);
    }
    req.__set_encodings(encodingsOrdinal);

    vector<int> compressorsOrdinal;
    compressorsOrdinal.reserve(compressors.size());
    for (CompressionType::CompressionType compressor : compressors) {
        compressorsOrdinal.push_back(compressor);
    }
    req.__set_compressors(compressorsOrdinal);

    defaultSessionConnection_->appendSchemaTemplate(req);
}

void Session::addUnalignedMeasurementsInTemplate(const string& template_name, const string& measurement,
                                                 TSDataType::TSDataType dataType, TSEncoding::TSEncoding encoding,
                                                 CompressionType::CompressionType compressor) {
    vector<std::string> measurements(1, measurement);
    vector<TSDataType::TSDataType> dataTypes(1, dataType);
    vector<TSEncoding::TSEncoding> encodings(1, encoding);
    vector<CompressionType::CompressionType> compressors(1, compressor);
    addUnalignedMeasurementsInTemplate(template_name, measurements, dataTypes, encodings, compressors);
}

void Session::deleteNodeInTemplate(const string& template_name, const string& path) {
    TSPruneSchemaTemplateReq req;
    req.__set_name(template_name);
    req.__set_path(path);
    defaultSessionConnection_->pruneSchemaTemplate(req);
}

int Session::countMeasurementsInTemplate(const string& template_name) {
    TSQueryTemplateReq req;
    req.__set_name(template_name);
    req.__set_queryType(TemplateQueryType::COUNT_MEASUREMENTS);
    TSQueryTemplateResp resp = defaultSessionConnection_->querySchemaTemplate(req);
    return resp.count;
}

bool Session::isMeasurementInTemplate(const string& template_name, const string& path) {
    TSQueryTemplateReq req;
    req.__set_name(template_name);
    req.__set_measurement(path);
    req.__set_queryType(TemplateQueryType::IS_MEASUREMENT);
    TSQueryTemplateResp resp = defaultSessionConnection_->querySchemaTemplate(req);
    return resp.result;
}

bool Session::isPathExistInTemplate(const string& template_name, const string& path) {
    TSQueryTemplateReq req;
    req.__set_name(template_name);
    req.__set_measurement(path);
    req.__set_queryType(TemplateQueryType::PATH_EXIST);
    TSQueryTemplateResp resp = defaultSessionConnection_->querySchemaTemplate(req);
    return resp.result;
}

std::vector<std::string> Session::showMeasurementsInTemplate(const string& template_name) {
    TSQueryTemplateReq req;
    req.__set_name(template_name);
    req.__set_measurement("");
    req.__set_queryType(TemplateQueryType::SHOW_MEASUREMENTS);
    TSQueryTemplateResp resp = defaultSessionConnection_->querySchemaTemplate(req);
    return resp.measurements;
}

std::vector<std::string> Session::showMeasurementsInTemplate(const string& template_name, const string& pattern) {
    TSQueryTemplateReq req;
    req.__set_name(template_name);
    req.__set_measurement(pattern);
    req.__set_queryType(TemplateQueryType::SHOW_MEASUREMENTS);
    TSQueryTemplateResp resp = defaultSessionConnection_->querySchemaTemplate(req);
    return resp.measurements;
}

bool Session::checkTemplateExists(const string& template_name) {
    try {
        std::unique_ptr<SessionDataSet> dataset = executeQueryStatement(
            "SHOW NODES IN DEVICE TEMPLATE " + template_name);
        bool isExisted = dataset->hasNext();
        dataset->closeOperationHandle();
        return isExisted;
    }
    catch (const exception& e) {
        if (strstr(e.what(), "does not exist") != NULL) {
            return false;
        }
        log_debug(e.what());
        throw IoTDBException(e.what());
    }
}
