| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include "SessionPool.h" |
| |
| void PooledSession::reset() { |
| if (session_ && pool_ != nullptr) { |
| pool_->putBack(session_, broken_); |
| } |
| pool_ = nullptr; |
| session_ = nullptr; |
| broken_ = false; |
| } |
| |
| SessionPool::SessionPool(std::string host, int rpcPort, std::string username, |
| std::string password, size_t maxSize) |
| : host_(std::move(host)), rpcPort_(rpcPort), username_(std::move(username)), |
| password_(std::move(password)), maxSize_(maxSize) { |
| if (maxSize_ == 0) { |
| throw IoTDBException("SessionPool maxSize must be greater than 0."); |
| } |
| } |
| |
| SessionPool::~SessionPool() { |
| try { |
| close(); |
| } catch (const std::exception &e) { |
| log_debug(std::string("SessionPool::~SessionPool(), ") + e.what()); |
| } |
| } |
| |
| SessionPool &SessionPool::setFetchSize(int fetchSize) { |
| fetchSize_ = fetchSize; |
| return *this; |
| } |
| |
| SessionPool &SessionPool::setZoneId(std::string zoneId) { |
| zoneId_ = std::move(zoneId); |
| return *this; |
| } |
| |
| SessionPool &SessionPool::setSqlDialect(std::string sqlDialect) { |
| sqlDialect_ = std::move(sqlDialect); |
| return *this; |
| } |
| |
| SessionPool &SessionPool::setDatabase(std::string database) { |
| database_ = std::move(database); |
| return *this; |
| } |
| |
| SessionPool &SessionPool::setEnableRedirection(bool enable) { |
| enableRedirection_ = enable; |
| return *this; |
| } |
| |
| SessionPool &SessionPool::setEnableAutoFetch(bool enable) { |
| enableAutoFetch_ = enable; |
| return *this; |
| } |
| |
| SessionPool &SessionPool::setEnableRPCCompression(bool enable) { |
| enableRPCCompression_ = enable; |
| return *this; |
| } |
| |
| SessionPool &SessionPool::setConnectTimeoutMs(int connectTimeoutMs) { |
| connectTimeoutMs_ = connectTimeoutMs; |
| return *this; |
| } |
| |
| SessionPool &SessionPool::setWaitToGetSessionTimeoutMs(int64_t timeoutMs) { |
| waitTimeoutMs_ = timeoutMs; |
| return *this; |
| } |
| |
| std::shared_ptr<Session> SessionPool::constructNewSession() { |
| AbstractSessionBuilder builder; |
| builder.host = host_; |
| builder.rpcPort = rpcPort_; |
| builder.username = username_; |
| builder.password = password_; |
| builder.zoneId = zoneId_; |
| builder.fetchSize = fetchSize_; |
| builder.sqlDialect = sqlDialect_; |
| builder.database = database_; |
| builder.enableAutoFetch = enableAutoFetch_; |
| builder.enableRedirections = enableRedirection_; |
| builder.enableRPCCompression = enableRPCCompression_; |
| |
| auto session = std::make_shared<Session>(&builder); |
| session->open(enableRPCCompression_, connectTimeoutMs_); |
| return session; |
| } |
| |
| std::shared_ptr<Session> SessionPool::acquire(int64_t timeoutMs) { |
| const int64_t effectiveTimeout = timeoutMs > 0 ? timeoutMs : waitTimeoutMs_; |
| std::unique_lock<std::mutex> lock(mutex_); |
| const auto deadline = std::chrono::steady_clock::now() + |
| std::chrono::milliseconds(effectiveTimeout); |
| |
| while (true) { |
| if (closed_) { |
| throw IoTDBException("SessionPool is closed."); |
| } |
| if (!idleQueue_.empty()) { |
| auto session = idleQueue_.front(); |
| idleQueue_.pop_front(); |
| return session; |
| } |
| if (size_ < maxSize_) { |
| // Reserve a slot, then build the connection outside the lock so other |
| // borrowers are not blocked by network/handshake latency. |
| ++size_; |
| lock.unlock(); |
| std::shared_ptr<Session> session; |
| try { |
| session = constructNewSession(); |
| } catch (...) { |
| lock.lock(); |
| --size_; |
| cv_.notify_one(); |
| throw; |
| } |
| lock.lock(); |
| if (closed_) { |
| // The pool was closed while this session was being built; do not hand |
| // it out. Release its slot and let it be torn down outside the lock. |
| --size_; |
| lock.unlock(); |
| throw IoTDBException("SessionPool is closed."); |
| } |
| return session; |
| } |
| |
| // Pool exhausted: wait for a Session to be returned. |
| if (effectiveTimeout <= 0) { |
| cv_.wait(lock); |
| } else { |
| if (cv_.wait_until(lock, deadline) == std::cv_status::timeout && |
| idleQueue_.empty() && size_ >= maxSize_ && !closed_) { |
| throw IoTDBException( |
| "Wait to get session timeout in SessionPool, maxSize=" + |
| std::to_string(maxSize_) + |
| ", waitTimeoutMs=" + std::to_string(effectiveTimeout) + "."); |
| } |
| } |
| } |
| } |
| |
| void SessionPool::putBack(const std::shared_ptr<Session> &session, |
| bool broken) { |
| std::lock_guard<std::mutex> lock(mutex_); |
| if (broken || closed_) { |
| // Drop the Session and free its slot so a healthy replacement can be |
| // created on demand. The caller (PooledSession::reset) still holds the last |
| // reference and tears the connection down after we return, i.e. outside |
| // this lock. |
| --size_; |
| } else { |
| idleQueue_.push_back(session); |
| } |
| cv_.notify_one(); |
| } |
| |
| PooledSession SessionPool::getSession() { return getSession(waitTimeoutMs_); } |
| |
| PooledSession SessionPool::getSession(int64_t timeoutMs) { |
| return PooledSession(this, acquire(timeoutMs)); |
| } |
| |
| void SessionPool::insertTablet(Tablet &tablet, bool sorted) { |
| execute([&](Session &s) { s.insertTablet(tablet, sorted); }); |
| } |
| |
| void SessionPool::insertAlignedTablet(Tablet &tablet, bool sorted) { |
| execute([&](Session &s) { s.insertAlignedTablet(tablet, sorted); }); |
| } |
| |
| void SessionPool::insertTablets( |
| std::unordered_map<std::string, Tablet *> &tablets, bool sorted) { |
| execute([&](Session &s) { s.insertTablets(tablets, sorted); }); |
| } |
| |
| void SessionPool::insertRecord(const std::string &deviceId, int64_t time, |
| const std::vector<std::string> &measurements, |
| const std::vector<std::string> &values) { |
| execute([&](Session &s) { |
| s.insertRecord(deviceId, time, measurements, values); |
| }); |
| } |
| |
| void SessionPool::insertRecords( |
| const std::vector<std::string> &deviceIds, |
| const std::vector<int64_t> ×, |
| const std::vector<std::vector<std::string>> &measurementsList, |
| const std::vector<std::vector<std::string>> &valuesList) { |
| execute([&](Session &s) { |
| s.insertRecords(deviceIds, times, measurementsList, valuesList); |
| }); |
| } |
| |
| void SessionPool::executeNonQueryStatement(const std::string &sql) { |
| execute([&](Session &s) { s.executeNonQueryStatement(sql); }); |
| } |
| |
| PooledSessionDataSet |
| SessionPool::executeQueryStatement(const std::string &sql) { |
| PooledSession lease = getSession(); |
| try { |
| auto dataSet = lease->executeQueryStatement(sql); |
| return PooledSessionDataSet(std::move(lease), std::move(dataSet)); |
| } catch (const IoTDBConnectionException &) { |
| lease.markBroken(); |
| throw; |
| } |
| } |
| |
| PooledSessionDataSet SessionPool::executeQueryStatement(const std::string &sql, |
| int64_t timeoutInMs) { |
| PooledSession lease = getSession(); |
| try { |
| auto dataSet = lease->executeQueryStatement(sql, timeoutInMs); |
| return PooledSessionDataSet(std::move(lease), std::move(dataSet)); |
| } catch (const IoTDBConnectionException &) { |
| lease.markBroken(); |
| throw; |
| } |
| } |
| |
| void SessionPool::close() { |
| std::deque<std::shared_ptr<Session>> toClose; |
| { |
| std::lock_guard<std::mutex> lock(mutex_); |
| if (closed_) { |
| return; |
| } |
| closed_ = true; |
| toClose.swap(idleQueue_); |
| size_ -= toClose.size(); |
| } |
| cv_.notify_all(); |
| // Sessions destructed here (outside the lock) close their connections. |
| toClose.clear(); |
| } |
| |
| size_t SessionPool::activeCount() { |
| std::lock_guard<std::mutex> lock(mutex_); |
| return size_ - idleQueue_.size(); |
| } |