| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #ifndef IOTDB_SESSIONPOOL_H |
| #define IOTDB_SESSIONPOOL_H |
| |
| #include <chrono> |
| #include <condition_variable> |
| #include <cstdint> |
| #include <deque> |
| #include <memory> |
| #include <mutex> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include "Session.h" |
| |
| /* |
| * A thread-safe pool of opened Session objects. |
| * |
| * A Session is NOT safe to use from multiple threads concurrently. SessionPool |
| * solves this by lending each Session to exactly one borrower at a time and |
| * reclaiming it afterwards, so many application threads can share a bounded set |
| * of physical connections without external locking. |
| * |
| * Two usage styles are supported: |
| * |
| * 1. RAII lease (recommended for arbitrary calls): |
| * { |
| * PooledSession s = pool.getSession(); // blocks up to the timeout |
| * s->insertTablet(tablet); // call any Session method |
| * } // automatically returned here |
| * |
| * 2. Convenience wrappers / generic execute() (recommended for hot paths): |
| * pool.insertTablet(tablet); |
| * pool.execute([&](Session& s) { s.insertRecord(...); }); |
| * |
| * Both styles evict a Session from the pool (instead of recycling it) when the |
| * operation throws IoTDBConnectionException, so a dead connection is never |
| * handed to the next borrower; a fresh one is created lazily on demand. |
| * |
| * Lifetime: a PooledSession returns its Session to the owning SessionPool when |
| * destroyed, so every PooledSession must not outlive the pool it came from. |
| */ |
| class SessionPool; |
| |
| class PooledSession { |
| public: |
| PooledSession() noexcept : pool_(nullptr), session_(nullptr), broken_(false) {} |
| |
| PooledSession(SessionPool* pool, std::shared_ptr<Session> session) |
| : pool_(pool), session_(std::move(session)), broken_(false) {} |
| |
| // Non-copyable: a leased Session is owned by exactly one borrower. |
| PooledSession(const PooledSession&) = delete; |
| PooledSession& operator=(const PooledSession&) = delete; |
| |
| PooledSession(PooledSession&& other) noexcept |
| : pool_(other.pool_), session_(std::move(other.session_)), broken_(other.broken_) { |
| other.pool_ = nullptr; |
| other.session_ = nullptr; |
| other.broken_ = false; |
| } |
| |
| PooledSession& operator=(PooledSession&& other) noexcept { |
| if (this != &other) { |
| reset(); |
| pool_ = other.pool_; |
| session_ = std::move(other.session_); |
| broken_ = other.broken_; |
| other.pool_ = nullptr; |
| other.session_ = nullptr; |
| other.broken_ = false; |
| } |
| return *this; |
| } |
| |
| ~PooledSession() { |
| reset(); |
| } |
| |
| Session* operator->() const { |
| return session_.get(); |
| } |
| |
| Session& operator*() const { |
| return *session_; |
| } |
| |
| explicit operator bool() const { |
| return static_cast<bool>(session_); |
| } |
| |
| // Mark the underlying connection as unusable so it is discarded (not recycled) |
| // when this lease is returned. Call this if you caught a connection error. |
| void markBroken() { |
| broken_ = true; |
| } |
| |
| // Eagerly return the Session to the pool before scope exit. |
| void release() { |
| reset(); |
| } |
| |
| private: |
| void reset(); |
| |
| SessionPool* pool_; |
| std::shared_ptr<Session> session_; |
| bool broken_; |
| }; |
| |
| /* |
| * Couples a query result set with the pooled Session that produced it. |
| * |
| * A SessionDataSet lazily fetches further result blocks over its Session's |
| * connection, so that Session must stay exclusively leased until iteration is |
| * finished. This wrapper holds the lease for exactly that long and returns the |
| * Session to the pool when destroyed. |
| */ |
| class PooledSessionDataSet { |
| public: |
| PooledSessionDataSet(PooledSession session, std::unique_ptr<SessionDataSet> dataSet) |
| : session_(std::move(session)), dataSet_(std::move(dataSet)) {} |
| |
| PooledSessionDataSet(const PooledSessionDataSet&) = delete; |
| PooledSessionDataSet& operator=(const PooledSessionDataSet&) = delete; |
| PooledSessionDataSet(PooledSessionDataSet&&) noexcept = default; |
| PooledSessionDataSet& operator=(PooledSessionDataSet&&) noexcept = default; |
| |
| SessionDataSet* operator->() const { |
| return dataSet_.get(); |
| } |
| SessionDataSet& operator*() const { |
| return *dataSet_; |
| } |
| |
| private: |
| PooledSession session_; |
| std::unique_ptr<SessionDataSet> dataSet_; |
| }; |
| |
| class SessionPool { |
| public: |
| static constexpr size_t DEFAULT_MAX_SIZE = 5; |
| static constexpr int64_t DEFAULT_WAIT_TIMEOUT_MS = 60 * 1000; |
| |
| // Single-host constructor. |
| SessionPool(std::string host, int rpcPort, std::string username, std::string password, |
| size_t maxSize = DEFAULT_MAX_SIZE); |
| |
| // Multi-node constructor. |
| SessionPool(std::vector<std::string> nodeUrls, std::string username, std::string password, |
| size_t maxSize = DEFAULT_MAX_SIZE); |
| |
| ~SessionPool(); |
| |
| // Non-copyable, non-movable: the pool owns mutex/condition state. |
| SessionPool(const SessionPool&) = delete; |
| SessionPool& operator=(const SessionPool&) = delete; |
| |
| // ---- configuration (apply before the first getSession()) ---- |
| SessionPool& setFetchSize(int fetchSize); |
| SessionPool& setZoneId(std::string zoneId); |
| SessionPool& setSqlDialect(std::string sqlDialect); |
| SessionPool& setDatabase(std::string database); |
| SessionPool& setEnableRedirection(bool enable); |
| SessionPool& setEnableAutoFetch(bool enable); |
| SessionPool& setEnableRPCCompression(bool enable); |
| SessionPool& setConnectTimeoutMs(int connectTimeoutMs); |
| SessionPool& setWaitToGetSessionTimeoutMs(int64_t timeoutMs); |
| SessionPool& setUseSSL(bool useSSL); |
| SessionPool& setTrustCertFilePath(std::string path); |
| |
| // Borrow a Session. Blocks until one is free or a new one can be created, |
| // up to timeoutMs (<= 0 means use the pool default). Throws IoTDBException on |
| // timeout or when the pool is closed. |
| PooledSession getSession(); |
| PooledSession getSession(int64_t timeoutMs); |
| |
| // Generic helper: borrow a Session, run func(Session&), return/evict it, and |
| // forward the result. Evicts the Session on IoTDBConnectionException. |
| template <typename Func> auto execute(Func&& func) -> decltype(func(std::declval<Session&>())); |
| |
| // ---- convenience wrappers for common operations (with eviction on failure) ---- |
| void insertTablet(Tablet& tablet, bool sorted = false); |
| void insertAlignedTablet(Tablet& tablet, bool sorted = false); |
| void insertTablets(std::unordered_map<std::string, Tablet*>& tablets, bool sorted = false); |
| void insertRecord(const std::string& deviceId, int64_t time, |
| const std::vector<std::string>& measurements, |
| const std::vector<std::string>& values); |
| void insertRecords(const std::vector<std::string>& deviceIds, const std::vector<int64_t>& times, |
| const std::vector<std::vector<std::string>>& measurementsList, |
| const std::vector<std::vector<std::string>>& valuesList); |
| void executeNonQueryStatement(const std::string& sql); |
| // The returned wrapper keeps the underlying Session leased until it is |
| // destroyed, so it is safe to iterate the result set across multiple fetches. |
| PooledSessionDataSet executeQueryStatement(const std::string& sql); |
| PooledSessionDataSet executeQueryStatement(const std::string& sql, int64_t timeoutInMs); |
| |
| // Close the pool: idle Sessions are closed immediately, in-use Sessions are |
| // closed when they are returned. Idempotent. |
| void close(); |
| |
| // ---- observability ---- |
| size_t getMaxSize() const { |
| return maxSize_; |
| } |
| // Number of Sessions currently borrowed. |
| size_t activeCount(); |
| |
| private: |
| friend class PooledSession; |
| |
| std::shared_ptr<Session> constructNewSession(); |
| std::shared_ptr<Session> acquire(int64_t timeoutMs); |
| void putBack(const std::shared_ptr<Session>& session, bool broken); |
| |
| // connection parameters |
| std::string host_; |
| int rpcPort_; |
| std::vector<std::string> nodeUrls_; |
| std::string username_; |
| std::string password_; |
| std::string zoneId_; |
| int fetchSize_ = AbstractSessionBuilder::DEFAULT_FETCH_SIZE; |
| std::string sqlDialect_ = AbstractSessionBuilder::DEFAULT_SQL_DIALECT; |
| std::string database_; |
| bool enableRedirection_ = AbstractSessionBuilder::DEFAULT_ENABLE_REDIRECTIONS; |
| bool enableAutoFetch_ = AbstractSessionBuilder::DEFAULT_ENABLE_AUTO_FETCH; |
| bool enableRPCCompression_ = AbstractSessionBuilder::DEFAULT_ENABLE_RPC_COMPRESSION; |
| int connectTimeoutMs_ = AbstractSessionBuilder::DEFAULT_CONNECT_TIMEOUT_MS; |
| bool useSSL_ = false; |
| std::string trustCertFilePath_; |
| |
| // pool sizing / waiting policy |
| size_t maxSize_; |
| int64_t waitTimeoutMs_ = DEFAULT_WAIT_TIMEOUT_MS; |
| |
| // pool state, guarded by mutex_ |
| std::mutex mutex_; |
| std::condition_variable cv_; |
| std::deque<std::shared_ptr<Session>> idleQueue_; |
| size_t size_ = 0; // total live Sessions (idle + borrowed) |
| bool closed_ = false; |
| }; |
| |
| template <typename Func> |
| auto SessionPool::execute(Func&& func) -> decltype(func(std::declval<Session&>())) { |
| PooledSession lease = getSession(); |
| try { |
| return func(*lease); |
| } catch (const IoTDBConnectionException&) { |
| lease.markBroken(); |
| throw; |
| } |
| } |
| |
| /* |
| * Fluent builder for SessionPool, mirroring SessionBuilder / TableSessionBuilder. |
| * |
| * auto pool = SessionPoolBuilder() |
| * .host("127.0.0.1")->rpcPort(6667) |
| * ->username("root")->password("root") |
| * ->maxSize(10)->build(); |
| */ |
| class SessionPoolBuilder : public AbstractSessionBuilder { |
| public: |
| SessionPoolBuilder* host(const std::string& v) { |
| AbstractSessionBuilder::host = v; |
| return this; |
| } |
| SessionPoolBuilder* rpcPort(int v) { |
| AbstractSessionBuilder::rpcPort = v; |
| return this; |
| } |
| SessionPoolBuilder* nodeUrls(const std::vector<std::string>& v) { |
| AbstractSessionBuilder::nodeUrls = v; |
| return this; |
| } |
| SessionPoolBuilder* username(const std::string& v) { |
| AbstractSessionBuilder::username = v; |
| return this; |
| } |
| SessionPoolBuilder* password(const std::string& v) { |
| AbstractSessionBuilder::password = v; |
| return this; |
| } |
| SessionPoolBuilder* zoneId(const std::string& v) { |
| AbstractSessionBuilder::zoneId = v; |
| return this; |
| } |
| SessionPoolBuilder* fetchSize(int v) { |
| AbstractSessionBuilder::fetchSize = v; |
| return this; |
| } |
| SessionPoolBuilder* database(const std::string& v) { |
| AbstractSessionBuilder::database = v; |
| return this; |
| } |
| SessionPoolBuilder* enableAutoFetch(bool v) { |
| AbstractSessionBuilder::enableAutoFetch = v; |
| return this; |
| } |
| SessionPoolBuilder* enableRedirections(bool v) { |
| AbstractSessionBuilder::enableRedirections = v; |
| return this; |
| } |
| SessionPoolBuilder* enableRPCCompression(bool v) { |
| AbstractSessionBuilder::enableRPCCompression = v; |
| return this; |
| } |
| SessionPoolBuilder* connectTimeoutMs(int v) { |
| AbstractSessionBuilder::connectTimeoutMs = v; |
| return this; |
| } |
| SessionPoolBuilder* useSSL(bool v) { |
| AbstractSessionBuilder::useSSL = v; |
| return this; |
| } |
| SessionPoolBuilder* trustCertFilePath(const std::string& v) { |
| AbstractSessionBuilder::trustCertFilePath = v; |
| return this; |
| } |
| SessionPoolBuilder* maxSize(size_t v) { |
| maxSize_ = v; |
| return this; |
| } |
| SessionPoolBuilder* waitToGetSessionTimeoutMs(int64_t v) { |
| waitTimeoutMs_ = v; |
| return this; |
| } |
| SessionPoolBuilder* sqlDialect(const std::string& v) { |
| AbstractSessionBuilder::sqlDialect = v; |
| return this; |
| } |
| |
| std::shared_ptr<SessionPool> build() { |
| if (!AbstractSessionBuilder::nodeUrls.empty() && |
| (AbstractSessionBuilder::host != DEFAULT_HOST || |
| AbstractSessionBuilder::rpcPort != DEFAULT_RPC_PORT)) { |
| throw IoTDBException( |
| "SessionPool builder does not support setting node urls and host/rpcPort at the same " |
| "time."); |
| } |
| std::shared_ptr<SessionPool> pool; |
| if (!AbstractSessionBuilder::nodeUrls.empty()) { |
| pool = std::make_shared<SessionPool>(AbstractSessionBuilder::nodeUrls, |
| AbstractSessionBuilder::username, |
| AbstractSessionBuilder::password, maxSize_); |
| } else { |
| pool = std::make_shared<SessionPool>( |
| AbstractSessionBuilder::host, AbstractSessionBuilder::rpcPort, |
| AbstractSessionBuilder::username, AbstractSessionBuilder::password, maxSize_); |
| } |
| pool->setFetchSize(AbstractSessionBuilder::fetchSize) |
| .setZoneId(AbstractSessionBuilder::zoneId) |
| .setSqlDialect(AbstractSessionBuilder::sqlDialect) |
| .setDatabase(AbstractSessionBuilder::database) |
| .setEnableRedirection(AbstractSessionBuilder::enableRedirections) |
| .setEnableAutoFetch(AbstractSessionBuilder::enableAutoFetch) |
| .setEnableRPCCompression(AbstractSessionBuilder::enableRPCCompression) |
| .setConnectTimeoutMs(AbstractSessionBuilder::connectTimeoutMs) |
| .setWaitToGetSessionTimeoutMs(waitTimeoutMs_) |
| .setUseSSL(AbstractSessionBuilder::useSSL) |
| .setTrustCertFilePath(AbstractSessionBuilder::trustCertFilePath); |
| return pool; |
| } |
| |
| private: |
| size_t maxSize_ = SessionPool::DEFAULT_MAX_SIZE; |
| int64_t waitTimeoutMs_ = SessionPool::DEFAULT_WAIT_TIMEOUT_MS; |
| }; |
| |
| #endif // IOTDB_SESSIONPOOL_H |