blob: a828f0ac2c6da26e0f667e5615ac12877dee5ad3 [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "SessionPool.h"
void PooledSession::reset() {
if (session_ && pool_ != nullptr) {
pool_->putBack(session_, broken_);
}
pool_ = nullptr;
session_ = nullptr;
broken_ = false;
}
SessionPool::SessionPool(std::string host, int rpcPort, std::string username, std::string password,
size_t maxSize)
: host_(std::move(host)), rpcPort_(rpcPort), username_(std::move(username)),
password_(std::move(password)), maxSize_(maxSize) {
if (maxSize_ == 0) {
throw IoTDBException("SessionPool maxSize must be greater than 0.");
}
}
SessionPool::SessionPool(std::vector<std::string> nodeUrls, std::string username,
std::string password, size_t maxSize)
: rpcPort_(AbstractSessionBuilder::DEFAULT_RPC_PORT), nodeUrls_(std::move(nodeUrls)),
username_(std::move(username)), password_(std::move(password)), maxSize_(maxSize) {
if (maxSize_ == 0) {
throw IoTDBException("SessionPool maxSize must be greater than 0.");
}
}
SessionPool::~SessionPool() {
try {
close();
} catch (const std::exception& e) {
log_debug(std::string("SessionPool::~SessionPool(), ") + e.what());
}
}
SessionPool& SessionPool::setFetchSize(int fetchSize) {
fetchSize_ = fetchSize;
return *this;
}
SessionPool& SessionPool::setZoneId(std::string zoneId) {
zoneId_ = std::move(zoneId);
return *this;
}
SessionPool& SessionPool::setSqlDialect(std::string sqlDialect) {
sqlDialect_ = std::move(sqlDialect);
return *this;
}
SessionPool& SessionPool::setDatabase(std::string database) {
database_ = std::move(database);
return *this;
}
SessionPool& SessionPool::setEnableRedirection(bool enable) {
enableRedirection_ = enable;
return *this;
}
SessionPool& SessionPool::setEnableAutoFetch(bool enable) {
enableAutoFetch_ = enable;
return *this;
}
SessionPool& SessionPool::setEnableRPCCompression(bool enable) {
enableRPCCompression_ = enable;
return *this;
}
SessionPool& SessionPool::setConnectTimeoutMs(int connectTimeoutMs) {
connectTimeoutMs_ = connectTimeoutMs;
return *this;
}
SessionPool& SessionPool::setWaitToGetSessionTimeoutMs(int64_t timeoutMs) {
waitTimeoutMs_ = timeoutMs;
return *this;
}
SessionPool& SessionPool::setUseSSL(bool useSSL) {
useSSL_ = useSSL;
return *this;
}
SessionPool& SessionPool::setTrustCertFilePath(std::string path) {
trustCertFilePath_ = std::move(path);
return *this;
}
std::shared_ptr<Session> SessionPool::constructNewSession() {
AbstractSessionBuilder builder;
builder.host = host_;
builder.rpcPort = rpcPort_;
builder.nodeUrls = nodeUrls_;
builder.username = username_;
builder.password = password_;
builder.zoneId = zoneId_;
builder.fetchSize = fetchSize_;
builder.sqlDialect = sqlDialect_;
builder.database = database_;
builder.enableAutoFetch = enableAutoFetch_;
builder.enableRedirections = enableRedirection_;
builder.enableRPCCompression = enableRPCCompression_;
builder.connectTimeoutMs = connectTimeoutMs_;
builder.useSSL = useSSL_;
builder.trustCertFilePath = trustCertFilePath_;
auto session = std::make_shared<Session>(&builder);
session->open(enableRPCCompression_, connectTimeoutMs_);
return session;
}
std::shared_ptr<Session> SessionPool::acquire(int64_t timeoutMs) {
const int64_t effectiveTimeout = timeoutMs > 0 ? timeoutMs : waitTimeoutMs_;
std::unique_lock<std::mutex> lock(mutex_);
const auto deadline =
std::chrono::steady_clock::now() + std::chrono::milliseconds(effectiveTimeout);
while (true) {
if (closed_) {
throw IoTDBException("SessionPool is closed.");
}
if (!idleQueue_.empty()) {
auto session = idleQueue_.front();
idleQueue_.pop_front();
return session;
}
if (size_ < maxSize_) {
// Reserve a slot, then build the connection outside the lock so other
// borrowers are not blocked by network/handshake latency.
++size_;
lock.unlock();
std::shared_ptr<Session> session;
try {
session = constructNewSession();
} catch (...) {
lock.lock();
--size_;
cv_.notify_one();
throw;
}
lock.lock();
if (closed_) {
// The pool was closed while this session was being built; do not hand it
// out. Release its slot and let it be torn down outside the lock.
--size_;
lock.unlock();
throw IoTDBException("SessionPool is closed.");
}
return session;
}
// Pool exhausted: wait for a Session to be returned.
if (effectiveTimeout <= 0) {
cv_.wait(lock);
} else {
if (cv_.wait_until(lock, deadline) == std::cv_status::timeout && idleQueue_.empty() &&
size_ >= maxSize_ && !closed_) {
throw IoTDBException(
"Wait to get session timeout in SessionPool, maxSize=" + std::to_string(maxSize_) +
", waitTimeoutMs=" + std::to_string(effectiveTimeout) + ".");
}
}
}
}
void SessionPool::putBack(const std::shared_ptr<Session>& session, bool broken) {
std::lock_guard<std::mutex> lock(mutex_);
if (broken || closed_) {
// Drop the Session and free its slot so a healthy replacement can be created
// on demand. The caller (PooledSession::reset) still holds the last reference
// and tears the connection down after we return, i.e. outside this lock.
--size_;
} else {
idleQueue_.push_back(session);
}
cv_.notify_one();
}
PooledSession SessionPool::getSession() {
return getSession(waitTimeoutMs_);
}
PooledSession SessionPool::getSession(int64_t timeoutMs) {
return PooledSession(this, acquire(timeoutMs));
}
void SessionPool::insertTablet(Tablet& tablet, bool sorted) {
execute([&](Session& s) { s.insertTablet(tablet, sorted); });
}
void SessionPool::insertAlignedTablet(Tablet& tablet, bool sorted) {
execute([&](Session& s) { s.insertAlignedTablet(tablet, sorted); });
}
void SessionPool::insertTablets(std::unordered_map<std::string, Tablet*>& tablets, bool sorted) {
execute([&](Session& s) { s.insertTablets(tablets, sorted); });
}
void SessionPool::insertRecord(const std::string& deviceId, int64_t time,
const std::vector<std::string>& measurements,
const std::vector<std::string>& values) {
execute([&](Session& s) { s.insertRecord(deviceId, time, measurements, values); });
}
void SessionPool::insertRecords(const std::vector<std::string>& deviceIds,
const std::vector<int64_t>& times,
const std::vector<std::vector<std::string>>& measurementsList,
const std::vector<std::vector<std::string>>& valuesList) {
execute([&](Session& s) { s.insertRecords(deviceIds, times, measurementsList, valuesList); });
}
void SessionPool::executeNonQueryStatement(const std::string& sql) {
execute([&](Session& s) { s.executeNonQueryStatement(sql); });
}
PooledSessionDataSet SessionPool::executeQueryStatement(const std::string& sql) {
PooledSession lease = getSession();
try {
auto dataSet = lease->executeQueryStatement(sql);
return PooledSessionDataSet(std::move(lease), std::move(dataSet));
} catch (const IoTDBConnectionException&) {
lease.markBroken();
throw;
}
}
PooledSessionDataSet SessionPool::executeQueryStatement(const std::string& sql,
int64_t timeoutInMs) {
PooledSession lease = getSession();
try {
auto dataSet = lease->executeQueryStatement(sql, timeoutInMs);
return PooledSessionDataSet(std::move(lease), std::move(dataSet));
} catch (const IoTDBConnectionException&) {
lease.markBroken();
throw;
}
}
void SessionPool::close() {
std::deque<std::shared_ptr<Session>> toClose;
{
std::lock_guard<std::mutex> lock(mutex_);
if (closed_) {
return;
}
closed_ = true;
toClose.swap(idleQueue_);
size_ -= toClose.size();
}
cv_.notify_all();
// Sessions destructed here (outside the lock) close their connections.
toClose.clear();
}
size_t SessionPool::activeCount() {
std::lock_guard<std::mutex> lock(mutex_);
return size_ - idleQueue_.size();
}