blob: 4483dab0c5144e32da7d5d1728861ce5a4e2db14 [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef IOTDB_SESSIONPOOL_H
#define IOTDB_SESSIONPOOL_H
#include <chrono>
#include <condition_variable>
#include <cstdint>
#include <deque>
#include <memory>
#include <mutex>
#include <string>
#include <utility>
#include <vector>
#include "Session.h"
/*
* A thread-safe pool of opened Session objects.
*
* A Session is NOT safe to use from multiple threads concurrently. SessionPool
* solves this by lending each Session to exactly one borrower at a time and
* reclaiming it afterwards, so many application threads can share a bounded set
* of physical connections without external locking.
*
* Two usage styles are supported:
*
* 1. RAII lease (recommended for arbitrary calls):
* {
* PooledSession s = pool.getSession(); // blocks up to the timeout
* s->insertTablet(tablet); // call any Session method
* } // automatically returned here
*
* 2. Convenience wrappers / generic execute() (recommended for hot paths):
* pool.insertTablet(tablet);
* pool.execute([&](Session& s) { s.insertRecord(...); });
*
* Both styles evict a Session from the pool (instead of recycling it) when the
* operation throws IoTDBConnectionException, so a dead connection is never
* handed to the next borrower; a fresh one is created lazily on demand.
*
* Lifetime: a PooledSession returns its Session to the owning SessionPool when
* destroyed, so every PooledSession must not outlive the pool it came from.
*/
class SessionPool;
class PooledSession {
public:
PooledSession() noexcept : pool_(nullptr), session_(nullptr), broken_(false) {}
PooledSession(SessionPool* pool, std::shared_ptr<Session> session)
: pool_(pool), session_(std::move(session)), broken_(false) {}
// Non-copyable: a leased Session is owned by exactly one borrower.
PooledSession(const PooledSession&) = delete;
PooledSession& operator=(const PooledSession&) = delete;
PooledSession(PooledSession&& other) noexcept
: pool_(other.pool_), session_(std::move(other.session_)), broken_(other.broken_) {
other.pool_ = nullptr;
other.session_ = nullptr;
other.broken_ = false;
}
PooledSession& operator=(PooledSession&& other) noexcept {
if (this != &other) {
reset();
pool_ = other.pool_;
session_ = std::move(other.session_);
broken_ = other.broken_;
other.pool_ = nullptr;
other.session_ = nullptr;
other.broken_ = false;
}
return *this;
}
~PooledSession() {
reset();
}
Session* operator->() const {
return session_.get();
}
Session& operator*() const {
return *session_;
}
explicit operator bool() const {
return static_cast<bool>(session_);
}
// Mark the underlying connection as unusable so it is discarded (not recycled)
// when this lease is returned. Call this if you caught a connection error.
void markBroken() {
broken_ = true;
}
// Eagerly return the Session to the pool before scope exit.
void release() {
reset();
}
private:
void reset();
SessionPool* pool_;
std::shared_ptr<Session> session_;
bool broken_;
};
/*
* Couples a query result set with the pooled Session that produced it.
*
* A SessionDataSet lazily fetches further result blocks over its Session's
* connection, so that Session must stay exclusively leased until iteration is
* finished. This wrapper holds the lease for exactly that long and returns the
* Session to the pool when destroyed.
*/
class PooledSessionDataSet {
public:
PooledSessionDataSet(PooledSession session, std::unique_ptr<SessionDataSet> dataSet)
: session_(std::move(session)), dataSet_(std::move(dataSet)) {}
PooledSessionDataSet(const PooledSessionDataSet&) = delete;
PooledSessionDataSet& operator=(const PooledSessionDataSet&) = delete;
PooledSessionDataSet(PooledSessionDataSet&&) noexcept = default;
PooledSessionDataSet& operator=(PooledSessionDataSet&&) noexcept = default;
SessionDataSet* operator->() const {
return dataSet_.get();
}
SessionDataSet& operator*() const {
return *dataSet_;
}
private:
PooledSession session_;
std::unique_ptr<SessionDataSet> dataSet_;
};
class SessionPool {
public:
static constexpr size_t DEFAULT_MAX_SIZE = 5;
static constexpr int64_t DEFAULT_WAIT_TIMEOUT_MS = 60 * 1000;
// Single-host constructor.
SessionPool(std::string host, int rpcPort, std::string username, std::string password,
size_t maxSize = DEFAULT_MAX_SIZE);
// Multi-node constructor.
SessionPool(std::vector<std::string> nodeUrls, std::string username, std::string password,
size_t maxSize = DEFAULT_MAX_SIZE);
~SessionPool();
// Non-copyable, non-movable: the pool owns mutex/condition state.
SessionPool(const SessionPool&) = delete;
SessionPool& operator=(const SessionPool&) = delete;
// ---- configuration (apply before the first getSession()) ----
SessionPool& setFetchSize(int fetchSize);
SessionPool& setZoneId(std::string zoneId);
SessionPool& setSqlDialect(std::string sqlDialect);
SessionPool& setDatabase(std::string database);
SessionPool& setEnableRedirection(bool enable);
SessionPool& setEnableAutoFetch(bool enable);
SessionPool& setEnableRPCCompression(bool enable);
SessionPool& setConnectTimeoutMs(int connectTimeoutMs);
SessionPool& setWaitToGetSessionTimeoutMs(int64_t timeoutMs);
SessionPool& setUseSSL(bool useSSL);
SessionPool& setTrustCertFilePath(std::string path);
// Borrow a Session. Blocks until one is free or a new one can be created,
// up to timeoutMs (<= 0 means use the pool default). Throws IoTDBException on
// timeout or when the pool is closed.
PooledSession getSession();
PooledSession getSession(int64_t timeoutMs);
// Generic helper: borrow a Session, run func(Session&), return/evict it, and
// forward the result. Evicts the Session on IoTDBConnectionException.
template <typename Func> auto execute(Func&& func) -> decltype(func(std::declval<Session&>()));
// ---- convenience wrappers for common operations (with eviction on failure) ----
void insertTablet(Tablet& tablet, bool sorted = false);
void insertAlignedTablet(Tablet& tablet, bool sorted = false);
void insertTablets(std::unordered_map<std::string, Tablet*>& tablets, bool sorted = false);
void insertRecord(const std::string& deviceId, int64_t time,
const std::vector<std::string>& measurements,
const std::vector<std::string>& values);
void insertRecords(const std::vector<std::string>& deviceIds, const std::vector<int64_t>& times,
const std::vector<std::vector<std::string>>& measurementsList,
const std::vector<std::vector<std::string>>& valuesList);
void executeNonQueryStatement(const std::string& sql);
// The returned wrapper keeps the underlying Session leased until it is
// destroyed, so it is safe to iterate the result set across multiple fetches.
PooledSessionDataSet executeQueryStatement(const std::string& sql);
PooledSessionDataSet executeQueryStatement(const std::string& sql, int64_t timeoutInMs);
// Close the pool: idle Sessions are closed immediately, in-use Sessions are
// closed when they are returned. Idempotent.
void close();
// ---- observability ----
size_t getMaxSize() const {
return maxSize_;
}
// Number of Sessions currently borrowed.
size_t activeCount();
private:
friend class PooledSession;
std::shared_ptr<Session> constructNewSession();
std::shared_ptr<Session> acquire(int64_t timeoutMs);
void putBack(const std::shared_ptr<Session>& session, bool broken);
// connection parameters
std::string host_;
int rpcPort_;
std::vector<std::string> nodeUrls_;
std::string username_;
std::string password_;
std::string zoneId_;
int fetchSize_ = AbstractSessionBuilder::DEFAULT_FETCH_SIZE;
std::string sqlDialect_ = AbstractSessionBuilder::DEFAULT_SQL_DIALECT;
std::string database_;
bool enableRedirection_ = AbstractSessionBuilder::DEFAULT_ENABLE_REDIRECTIONS;
bool enableAutoFetch_ = AbstractSessionBuilder::DEFAULT_ENABLE_AUTO_FETCH;
bool enableRPCCompression_ = AbstractSessionBuilder::DEFAULT_ENABLE_RPC_COMPRESSION;
int connectTimeoutMs_ = AbstractSessionBuilder::DEFAULT_CONNECT_TIMEOUT_MS;
bool useSSL_ = false;
std::string trustCertFilePath_;
// pool sizing / waiting policy
size_t maxSize_;
int64_t waitTimeoutMs_ = DEFAULT_WAIT_TIMEOUT_MS;
// pool state, guarded by mutex_
std::mutex mutex_;
std::condition_variable cv_;
std::deque<std::shared_ptr<Session>> idleQueue_;
size_t size_ = 0; // total live Sessions (idle + borrowed)
bool closed_ = false;
};
template <typename Func>
auto SessionPool::execute(Func&& func) -> decltype(func(std::declval<Session&>())) {
PooledSession lease = getSession();
try {
return func(*lease);
} catch (const IoTDBConnectionException&) {
lease.markBroken();
throw;
}
}
/*
* Fluent builder for SessionPool, mirroring SessionBuilder / TableSessionBuilder.
*
* auto pool = SessionPoolBuilder()
* .host("127.0.0.1")->rpcPort(6667)
* ->username("root")->password("root")
* ->maxSize(10)->build();
*/
class SessionPoolBuilder : public AbstractSessionBuilder {
public:
SessionPoolBuilder* host(const std::string& v) {
AbstractSessionBuilder::host = v;
return this;
}
SessionPoolBuilder* rpcPort(int v) {
AbstractSessionBuilder::rpcPort = v;
return this;
}
SessionPoolBuilder* nodeUrls(const std::vector<std::string>& v) {
AbstractSessionBuilder::nodeUrls = v;
return this;
}
SessionPoolBuilder* username(const std::string& v) {
AbstractSessionBuilder::username = v;
return this;
}
SessionPoolBuilder* password(const std::string& v) {
AbstractSessionBuilder::password = v;
return this;
}
SessionPoolBuilder* zoneId(const std::string& v) {
AbstractSessionBuilder::zoneId = v;
return this;
}
SessionPoolBuilder* fetchSize(int v) {
AbstractSessionBuilder::fetchSize = v;
return this;
}
SessionPoolBuilder* database(const std::string& v) {
AbstractSessionBuilder::database = v;
return this;
}
SessionPoolBuilder* enableAutoFetch(bool v) {
AbstractSessionBuilder::enableAutoFetch = v;
return this;
}
SessionPoolBuilder* enableRedirections(bool v) {
AbstractSessionBuilder::enableRedirections = v;
return this;
}
SessionPoolBuilder* enableRPCCompression(bool v) {
AbstractSessionBuilder::enableRPCCompression = v;
return this;
}
SessionPoolBuilder* connectTimeoutMs(int v) {
AbstractSessionBuilder::connectTimeoutMs = v;
return this;
}
SessionPoolBuilder* useSSL(bool v) {
AbstractSessionBuilder::useSSL = v;
return this;
}
SessionPoolBuilder* trustCertFilePath(const std::string& v) {
AbstractSessionBuilder::trustCertFilePath = v;
return this;
}
SessionPoolBuilder* maxSize(size_t v) {
maxSize_ = v;
return this;
}
SessionPoolBuilder* waitToGetSessionTimeoutMs(int64_t v) {
waitTimeoutMs_ = v;
return this;
}
SessionPoolBuilder* sqlDialect(const std::string& v) {
AbstractSessionBuilder::sqlDialect = v;
return this;
}
std::shared_ptr<SessionPool> build() {
if (!AbstractSessionBuilder::nodeUrls.empty() &&
(AbstractSessionBuilder::host != DEFAULT_HOST ||
AbstractSessionBuilder::rpcPort != DEFAULT_RPC_PORT)) {
throw IoTDBException(
"SessionPool builder does not support setting node urls and host/rpcPort at the same "
"time.");
}
std::shared_ptr<SessionPool> pool;
if (!AbstractSessionBuilder::nodeUrls.empty()) {
pool = std::make_shared<SessionPool>(AbstractSessionBuilder::nodeUrls,
AbstractSessionBuilder::username,
AbstractSessionBuilder::password, maxSize_);
} else {
pool = std::make_shared<SessionPool>(
AbstractSessionBuilder::host, AbstractSessionBuilder::rpcPort,
AbstractSessionBuilder::username, AbstractSessionBuilder::password, maxSize_);
}
pool->setFetchSize(AbstractSessionBuilder::fetchSize)
.setZoneId(AbstractSessionBuilder::zoneId)
.setSqlDialect(AbstractSessionBuilder::sqlDialect)
.setDatabase(AbstractSessionBuilder::database)
.setEnableRedirection(AbstractSessionBuilder::enableRedirections)
.setEnableAutoFetch(AbstractSessionBuilder::enableAutoFetch)
.setEnableRPCCompression(AbstractSessionBuilder::enableRPCCompression)
.setConnectTimeoutMs(AbstractSessionBuilder::connectTimeoutMs)
.setWaitToGetSessionTimeoutMs(waitTimeoutMs_)
.setUseSSL(AbstractSessionBuilder::useSSL)
.setTrustCertFilePath(AbstractSessionBuilder::trustCertFilePath);
return pool;
}
private:
size_t maxSize_ = SessionPool::DEFAULT_MAX_SIZE;
int64_t waitTimeoutMs_ = SessionPool::DEFAULT_WAIT_TIMEOUT_MS;
};
#endif // IOTDB_SESSIONPOOL_H