blob: 8580daffc24c2a59dcd98ca81c351ae761acd968 [file]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import {
Session,
TreeTablet,
TableTablet,
ITreeTablet,
ITableTablet,
SessionDataSet,
} from "./Session";
import {
PoolConfig,
DEFAULT_POOL_CONFIG,
EndPoint,
parseNodeUrls,
} from "../utils/Config";
import { logger } from "../utils/Logger";
import { registerClosable, unregisterClosable } from "../utils/ProcessCleanup";
import { RedirectCache } from "./RedirectCache";
import Denque from "denque";
interface PooledSession {
session: Session;
lastUsed: number;
inUse: boolean;
}
/**
* Base class for session pooling with common functionality
* Provides connection pooling, round-robin load balancing, and automatic cleanup
*/
export abstract class BaseSessionPool {
protected config: PoolConfig;
protected endPoints: EndPoint[];
protected pool: PooledSession[] = [];
protected waitQueue: Denque<(session: Session) => void> = new Denque();
protected idleSessions: Denque<PooledSession> = new Denque();
protected activeSessions: Set<PooledSession> = new Set();
protected currentEndPointIndex = 0;
protected cleanupInterval: ReturnType<typeof setInterval> | null = null;
protected redirectCache: RedirectCache;
protected endPointToSession: Map<string, PooledSession> = new Map();
constructor(
hostsOrConfig: string | string[] | PoolConfig,
port?: number,
config?: Partial<PoolConfig>,
) {
// Handle different constructor signatures for backward compatibility
if (typeof hostsOrConfig === "object" && !Array.isArray(hostsOrConfig)) {
// New format: constructor(config: PoolConfig)
const poolConfig = hostsOrConfig as PoolConfig;
this.config = { ...DEFAULT_POOL_CONFIG, ...poolConfig } as PoolConfig;
if (poolConfig.nodeUrls) {
if (poolConfig.nodeUrls.length === 0) {
throw new Error("nodeUrls array cannot be empty");
}
// Parse nodeUrls if in string format
this.endPoints =
typeof poolConfig.nodeUrls[0] === "string"
? parseNodeUrls(poolConfig.nodeUrls as string[])
: (poolConfig.nodeUrls as EndPoint[]);
} else if (poolConfig.host && poolConfig.port) {
this.endPoints = [{ host: poolConfig.host, port: poolConfig.port }];
} else {
throw new Error(
"Either nodeUrls or host/port must be provided in config",
);
}
} else {
// Old format: constructor(hosts: string | string[], port: number, config?: Partial<PoolConfig>)
if (port === undefined) {
throw new Error(
"Port must be provided when using host-based constructor",
);
}
this.config = { ...DEFAULT_POOL_CONFIG, ...config, port } as PoolConfig;
const hostList = Array.isArray(hostsOrConfig)
? hostsOrConfig
: [hostsOrConfig];
this.endPoints = hostList.map((host) => ({ host, port }));
}
// Initialize redirect cache
this.redirectCache = new RedirectCache(
this.config.redirectCacheTTL || 300000,
10000, // max size
);
logger.info(
`${this.getPoolName()} created with ${this.endPoints.length} endpoints, max pool size: ${this.config.maxPoolSize}, redirection: ${this.config.enableRedirection ? "enabled" : "disabled"}`,
);
registerClosable(this);
}
/**
* Get the name of the pool for logging purposes
*/
protected abstract getPoolName(): string;
/**
* Create a new session with pool-specific initialization
*/
protected abstract createPoolSession(): Promise<Session>;
async init(): Promise<void> {
// Create minimum pool size connections in parallel
const minSize = this.config.minPoolSize || 1;
try {
await Promise.all(
Array.from({ length: minSize }, () => this.createSession()),
);
} catch (error: any) {
const errorMsg = `Failed to initialize pool: ${error.message}`;
logger.error(errorMsg);
if (error.stack) {
logger.error(`Stack trace: ${error.stack}`);
}
throw new Error(errorMsg);
}
// Start cleanup interval with proper async handling
// Use unref() so it doesn't keep the process alive
this.cleanupInterval = setInterval(() => {
this.cleanupIdleSessions().catch((error) => {
logger.error("Error during scheduled session cleanup:", error);
});
}, 30000).unref(); // Check every 30 seconds
logger.info(`${this.getPoolName()} initialized with ${minSize} sessions`);
}
private async createSession(): Promise<Session> {
const session = await this.createPoolSession();
const pooledSession: PooledSession = {
session,
lastUsed: Date.now(),
inUse: false,
};
this.pool.push(pooledSession);
this.idleSessions.push(pooledSession);
return session;
}
protected getNextEndPoint(): EndPoint {
// Round-robin selection
const endPoint = this.endPoints[this.currentEndPointIndex];
this.currentEndPointIndex =
(this.currentEndPointIndex + 1) % this.endPoints.length;
return endPoint;
}
/**
* Extract device ID from tablet for caching
*/
protected extractDeviceId(
tablet: TreeTablet | ITreeTablet | TableTablet | ITableTablet,
): string {
if ("deviceId" in tablet) {
return tablet.deviceId;
} else if ("tableName" in tablet) {
return tablet.tableName;
}
throw new Error("Unable to extract device ID from tablet");
}
/**
* Get or create a session for a specific endpoint
*/
protected async getSessionForEndpoint(endpoint: EndPoint): Promise<Session> {
const key = `${endpoint.host}:${endpoint.port}`;
// Check if we already have a session for this endpoint
let pooledSession = this.endPointToSession.get(key);
if (pooledSession && pooledSession.session.isOpen()) {
pooledSession.inUse = true;
pooledSession.lastUsed = Date.now();
return pooledSession.session;
}
// Create new session for this endpoint
const session = await this.createPoolSession();
pooledSession = {
session,
lastUsed: Date.now(),
inUse: true,
};
this.endPointToSession.set(key, pooledSession);
this.pool.push(pooledSession);
logger.info(
`Created new session for redirect endpoint: ${endpoint.host}:${endpoint.port}`,
);
return session;
}
/**
* Get a session from the pool
* The session must be released back to the pool using releaseSession() after use
*/
async getSession(): Promise<Session> {
const startTime = Date.now();
// Try to get an idle session (O(1) operation)
// Use atomic shift() with null check to avoid race condition
const pooledSession = this.idleSessions.shift();
if (pooledSession) {
// Verify session is still open
if (pooledSession.session.isOpen()) {
this.activeSessions.add(pooledSession);
pooledSession.lastUsed = Date.now();
const duration = Date.now() - startTime;
logger.debug(
`[PERF] getSession (reuse): ${duration}ms, pool: ${this.pool.length}, available: ${this.getAvailableSize()}, inUse: ${this.getInUseSize()}`,
);
return pooledSession.session;
} else {
// Session is closed, remove from pool
const index = this.pool.indexOf(pooledSession);
if (index > -1) {
this.pool.splice(index, 1);
}
// Continue to create a new session
}
}
// Create new session if pool is not full
if (this.pool.length < (this.config.maxPoolSize || 10)) {
logger.debug(
`[PERF] getSession creating new session, pool: ${this.pool.length}/${this.config.maxPoolSize || 10}`,
);
const session = await this.createSession();
const pooledSession = this.pool.find((ps) => ps.session === session);
if (pooledSession) {
this.idleSessions.shift(); // Remove from idle since we just added it
this.activeSessions.add(pooledSession);
pooledSession.inUse = true;
}
const duration = Date.now() - startTime;
logger.debug(`[PERF] getSession (new): ${duration}ms`);
return session;
}
// Wait for a session to become available
logger.debug(
`[PERF] getSession waiting, pool full: ${this.pool.length}, waitQueue: ${this.waitQueue.length}`,
);
const waitTimeout = this.config.waitTimeout || 60000;
return new Promise((resolve, reject) => {
const timeoutId = setTimeout(() => {
const waiters = this.waitQueue.toArray();
const index = waiters.indexOf(resolve);
if (index > -1) {
this.waitQueue.remove(index, 1);
}
reject(new Error("Timeout waiting for available session"));
}, waitTimeout);
// Use unref() so timeout doesn't prevent process exit
if (typeof timeoutId === "object" && "unref" in timeoutId) {
timeoutId.unref();
}
this.waitQueue.push((session: Session) => {
clearTimeout(timeoutId);
const duration = Date.now() - startTime;
logger.debug(`[PERF] getSession (waited): ${duration}ms`);
resolve(session);
});
});
}
/**
* Release a session back to the pool
* Should be called after getSession() when the session is no longer needed
*/
releaseSession(session: Session): void {
const pooledSession = this.pool.find((ps) => ps.session === session);
if (pooledSession) {
// Remove from active sessions
this.activeSessions.delete(pooledSession);
// Update tracking state
pooledSession.inUse = false;
pooledSession.lastUsed = Date.now();
// Check if there are waiting requests
if (this.waitQueue.length > 0) {
const waiter = this.waitQueue.shift();
if (waiter) {
// Move to active for the waiter
this.activeSessions.add(pooledSession);
pooledSession.inUse = true;
waiter(session);
} else {
// No waiter actually found, add back to idle
this.idleSessions.push(pooledSession);
}
} else {
// No waiters, add back to idle
this.idleSessions.push(pooledSession);
}
}
}
private async cleanupIdleSessions(): Promise<void> {
const now = Date.now();
const maxIdleTime = this.config.maxIdleTime || 60000;
const minSize = this.config.minPoolSize || 1;
// Only iterate idle sessions (O(k) where k = idle count, not O(n))
const sessionsToRemove: PooledSession[] = [];
const idleArray = this.idleSessions.toArray();
for (const ps of idleArray) {
if (now - ps.lastUsed > maxIdleTime && this.pool.length > minSize) {
sessionsToRemove.push(ps);
}
}
await Promise.all(
sessionsToRemove.map(async (ps) => {
try {
await ps.session.close();
const poolIndex = this.pool.indexOf(ps);
if (poolIndex > -1) {
this.pool.splice(poolIndex, 1);
}
// Remove from idle sessions deque
const idleIndex = this.idleSessions.toArray().indexOf(ps);
if (idleIndex > -1) {
this.idleSessions.remove(idleIndex, 1);
}
logger.debug(`Removed idle session from ${this.getPoolName()}`);
} catch (error) {
logger.error("Error closing idle session:", error);
}
}),
);
}
/**
* Execute a query statement and return SessionDataSet
* Session is held until dataset.close() is called
*/
async executeQueryStatement(
sql: string,
timeoutMs: number = 60000,
): Promise<SessionDataSet> {
const session = await this.getSession();
try {
const dataSet = await session.executeQueryStatement(sql, timeoutMs);
// Set cleanup callback to release session when dataset is closed
dataSet.setCleanupCallback(() => {
this.releaseSession(session);
});
return dataSet;
} catch (error) {
// If query fails, release session immediately
this.releaseSession(session);
throw error;
}
}
async executeNonQueryStatement(sql: string): Promise<void> {
const session = await this.getSession();
try {
return await session.executeNonQueryStatement(sql);
} finally {
this.releaseSession(session);
}
}
/**
* Insert tablet (supports both tree and table models)
* @param tablet TreeTablet for tree model or TableTablet for table model
*/
async insertTablet(
tablet: TreeTablet | ITreeTablet | TableTablet | ITableTablet,
): Promise<void> {
const totalStartTime = Date.now();
const deviceId = this.extractDeviceId(tablet);
// Check cache for optimal endpoint if redirection is enabled
const cachedEndpoint = this.config.enableRedirection
? this.redirectCache.get(deviceId)
: null;
const sessionStartTime = Date.now();
let session: Session;
if (cachedEndpoint) {
// Use cached endpoint for optimal routing
session = await this.getSessionForEndpoint(cachedEndpoint);
} else {
// Use round-robin selection
session = await this.getSession();
}
const sessionDuration = Date.now() - sessionStartTime;
try {
// Attempt insert
const insertStartTime = Date.now();
await session.insertTablet(tablet);
const insertDuration = Date.now() - insertStartTime;
const totalDuration = Date.now() - totalStartTime;
logger.debug(
`[PERF] Pool insertTablet total: ${totalDuration}ms (session: ${sessionDuration}ms, insert: ${insertDuration}ms)`,
);
// Check if server recommended a redirect for future operations
if (this.config.enableRedirection) {
const redirectEndpoint = session.getAndClearLastRedirect();
if (redirectEndpoint) {
// Cache the recommended endpoint for future writes
this.redirectCache.set(deviceId, redirectEndpoint);
logger.info(
`Cached redirect recommendation: ${deviceId} -> ${redirectEndpoint.host}:${redirectEndpoint.port}`,
);
}
}
} finally {
// Always release session
this.releaseSession(session);
}
}
async close(): Promise<void> {
if (this.cleanupInterval) {
clearInterval(this.cleanupInterval);
this.cleanupInterval = null;
}
// Close all sessions
await Promise.all(
this.pool.map(async (ps) => {
try {
await ps.session.close();
} catch (error) {
logger.error("Error closing session:", error);
}
}),
);
this.pool = [];
this.idleSessions.clear();
this.activeSessions.clear();
this.waitQueue.clear();
this.endPointToSession.clear();
this.redirectCache.clear();
unregisterClosable(this);
logger.info(`${this.getPoolName()} closed`);
}
getPoolSize(): number {
return this.pool.length;
}
getAvailableSize(): number {
return this.idleSessions.length;
}
getInUseSize(): number {
return this.activeSessions.size;
}
/**
* Get total number of sessions in the pool
* @alias getPoolSize
*/
get totalCount(): number {
return this.pool.length;
}
/**
* Get number of idle sessions
* @alias getAvailableSize
*/
get idleCount(): number {
return this.idleSessions.length;
}
/**
* Get number of active (in-use) sessions
* @alias getInUseSize
*/
get activeCount(): number {
return this.activeSessions.size;
}
/**
* Get number of requests waiting for a session
*/
get waitingCount(): number {
return this.waitQueue.length;
}
/**
* Get comprehensive pool statistics
*/
getPoolStats() {
return {
total: this.totalCount,
idle: this.idleCount,
active: this.activeCount,
waiting: this.waitingCount,
endpoints: this.endPoints.length,
redirectCacheSize: this.redirectCache.getStats().size,
};
}
/**
* Insert multiple tablets concurrently using the pool.
* This method optimizes for Node.js async patterns by:
* 1. Pre-acquiring sessions to avoid contention
* 2. Distributing tablets across sessions for parallel execution
* 3. Automatically releasing sessions after completion
*
* This is the recommended way to achieve high throughput in Node.js.
*
* @param tablets Array of tablets to insert
* @param options Configuration options
* @param options.concurrency Number of concurrent workers (default: pool size or 10)
* @returns Promise that resolves when all inserts complete
*
* @example
* ```typescript
* const tablets = generateTablets(1000);
* // Insert 1000 tablets using pool's concurrent execution
* await pool.insertTabletsParallel(tablets, { concurrency: 20 });
* ```
*/
async insertTabletsParallel(
tablets: (TreeTablet | ITreeTablet | TableTablet | ITableTablet)[],
options?: { concurrency?: number },
): Promise<void> {
if (tablets.length === 0) {
return;
}
const concurrency = Math.min(
options?.concurrency || this.config.maxPoolSize || 10,
tablets.length,
);
const totalStartTime = Date.now();
logger.debug(
`[PERF] Pool insertTabletsParallel START: ${tablets.length} tablets, concurrency: ${concurrency}`,
);
// Pre-acquire sessions to enable true concurrent execution
const sessions: Session[] = [];
try {
for (let i = 0; i < concurrency; i++) {
sessions.push(await this.getSession());
}
} catch (error) {
// Release any acquired sessions on error
for (const session of sessions) {
this.releaseSession(session);
}
throw error;
}
// Track errors with indices for debugging - don't fail fast
const errors: { index: number; error: Error }[] = [];
let tabletIndex = 0;
try {
// Create worker promises that consume from the tablet queue
const workers = sessions.map(async (session) => {
while (tabletIndex < tablets.length) {
const idx = tabletIndex++;
if (idx >= tablets.length) break;
const tablet = tablets[idx];
try {
await session.insertTablet(tablet);
} catch (err) {
errors.push({
index: idx,
error: err instanceof Error ? err : new Error(String(err)),
});
}
}
});
// Wait for all workers to complete
await Promise.all(workers);
} finally {
// Always release all sessions
for (const session of sessions) {
this.releaseSession(session);
}
}
const totalDuration = Date.now() - totalStartTime;
const throughput = tablets.length / (totalDuration / 1000);
logger.debug(
`[PERF] Pool insertTabletsParallel COMPLETE: ${totalDuration}ms, ${throughput.toFixed(2)} tablets/sec`,
);
// If there were errors, throw an aggregate error with details
if (errors.length > 0) {
const failedIndices = errors.map((e) => e.index);
const errorDetails = errors
.slice(0, 5)
.map((e) => `[${e.index}]: ${e.error.message}`)
.join('; ');
const suffix = errors.length > 5 ? ` ... and ${errors.length - 5} more` : '';
const aggregateError = new Error(
`${errors.length} of ${tablets.length} tablet inserts failed. ` +
`Failed indices: [${failedIndices.join(', ')}]. Errors: ${errorDetails}${suffix}`
) as Error & { failedIndices: number[]; errors: Error[] };
aggregateError.failedIndices = failedIndices;
aggregateError.errors = errors.map((e) => e.error);
throw aggregateError;
}
}
/**
* Execute multiple operations concurrently using the pool.
* This is a generic utility for running any async operations in parallel
* while respecting pool size limits.
*
* @param items Array of items to process
* @param operation Async function to execute for each item
* @param options Configuration options
* @param options.concurrency Number of concurrent workers (default: pool size or 10)
* @param options.stopOnError Whether to stop processing on first error (default: false)
* @returns Results array (in same order as input items).
* Note: When stopOnError=false, failed operations will have `undefined` at their index.
* Check array entries for undefined to detect failures.
*
* @example
* ```typescript
* const devices = ['d1', 'd2', 'd3'];
* const results = await pool.executeParallel(
* devices,
* async (session, deviceId) => {
* await session.executeNonQueryStatement(`CREATE TIMESERIES root.sg.${deviceId}...`);
* return deviceId;
* },
* { concurrency: 5 }
* );
* // Check for failures
* const failures = results.filter((r, i) => r === undefined);
* ```
*/
async executeParallel<T, R>(
items: T[],
operation: (session: Session, item: T, index: number) => Promise<R>,
options?: { concurrency?: number; stopOnError?: boolean },
): Promise<(R | undefined)[]> {
if (items.length === 0) {
return [];
}
const concurrency = Math.min(
options?.concurrency || this.config.maxPoolSize || 10,
items.length,
);
const stopOnError = options?.stopOnError ?? false;
// Pre-acquire sessions
const sessions: Session[] = [];
try {
for (let i = 0; i < concurrency; i++) {
sessions.push(await this.getSession());
}
} catch (error) {
for (const session of sessions) {
this.releaseSession(session);
}
throw error;
}
const results: (R | undefined)[] = new Array(items.length);
let itemIndex = 0;
let shouldStop = false;
const errors: { index: number; error: Error }[] = [];
try {
const workers = sessions.map(async (session) => {
while (!shouldStop && itemIndex < items.length) {
const idx = itemIndex++;
if (idx >= items.length) break;
try {
results[idx] = await operation(session, items[idx], idx);
} catch (err) {
const error = err instanceof Error ? err : new Error(String(err));
errors.push({ index: idx, error });
if (stopOnError) {
shouldStop = true;
break;
}
}
}
});
await Promise.all(workers);
} finally {
for (const session of sessions) {
this.releaseSession(session);
}
}
if (errors.length > 0 && stopOnError) {
throw errors[0].error;
}
return results;
}
}