| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| import { TableSession } from "./TableSession"; |
| import { Session, TableTablet, TreeTablet } from "./Session"; |
| import { PoolConfig, SQL_DIALECT_TABLE, InternalConfig } from "../utils/Config"; |
| import { BaseSessionPool } from "./BaseSessionPool"; |
| import { logger } from "../utils/Logger"; |
| |
| /** |
| * TableSessionPool provides connection pooling optimized for table model operations |
| * Automatically configures sessions for table mode by setting sql_dialect to 'table' |
| * Uses TableSession instances which extend Session |
| * Tracks database context across all sessions in the pool for table model operations |
| */ |
| export class TableSessionPool extends BaseSessionPool { |
| private currentDatabase?: string; // Track database context across pool (table model only) |
| |
| constructor( |
| hostsOrConfig: string | string[] | PoolConfig, |
| port?: number, |
| config?: Partial<PoolConfig>, |
| ) { |
| super(hostsOrConfig, port, config); |
| } |
| |
| protected getPoolName(): string { |
| return "TableSessionPool"; |
| } |
| |
| protected async createPoolSession(): Promise<Session> { |
| const endPoint = this.getNextEndPoint(); |
| |
| // Create internal config with sql_dialect set to 'table' |
| const internalConfig: InternalConfig = { |
| ...this.config, |
| host: endPoint.host, |
| port: endPoint.port, |
| sqlDialect: SQL_DIALECT_TABLE, |
| }; |
| |
| try { |
| // Create TableSession instance instead of Session |
| const session = new TableSession(internalConfig); |
| await session.open(); |
| |
| // Apply current database context to new session if set |
| if (this.currentDatabase) { |
| try { |
| await session.executeNonQueryStatement(`USE ${this.currentDatabase}`); |
| logger.debug( |
| `Applied database context ${this.currentDatabase} to new session`, |
| ); |
| } catch (error: any) { |
| logger.warn( |
| `Failed to apply database context to new session: ${error.message}`, |
| ); |
| } |
| } |
| |
| // If database is configured in config, try to USE it (ignore errors) |
| if (!this.currentDatabase && this.config.database) { |
| try { |
| logger.debug(`Attempting to USE database: ${this.config.database}`); |
| await session.executeNonQueryStatement(`USE ${this.config.database}`); |
| this.currentDatabase = this.config.database; |
| logger.debug( |
| `Successfully set database context to: ${this.config.database}`, |
| ); |
| } catch (error: any) { |
| // Ignore errors - database might not exist yet or other issues |
| // This is acceptable as operations can still work without pre-set database |
| logger.debug( |
| `Failed to USE database ${this.config.database}, ignoring error: ${error.message}`, |
| ); |
| } |
| } |
| |
| return session; |
| } catch (error: any) { |
| const errorMsg = `Failed to create pool session for ${endPoint.host}:${endPoint.port} - ${error.message}`; |
| throw new Error(errorMsg); |
| } |
| } |
| |
| /** |
| * Override executeNonQueryStatement to track USE statements and sync database context |
| * This is specific to table model - tree model doesn't need this |
| */ |
| async executeNonQueryStatement(sql: string): Promise<void> { |
| const session = await this.getSession(); |
| try { |
| await session.executeNonQueryStatement(sql); |
| |
| // Track database context changes from USE statements (table model only) |
| // IoTDB database names can include letters, digits, underscores, dots, and hyphens |
| const usePattern = /^\s*USE\s+([a-zA-Z0-9_.-]+)\s*;?\s*$/i; |
| const match = sql.match(usePattern); |
| if (match) { |
| const newDatabase = match[1]; |
| const previousDatabase = this.currentDatabase; |
| this.currentDatabase = newDatabase; |
| logger.debug( |
| `Pool database context changed from ${previousDatabase || "none"} to ${this.currentDatabase}`, |
| ); |
| |
| // Synchronize database context to all sessions in pool |
| await this.syncDatabaseContextToPool(newDatabase); |
| } |
| } finally { |
| this.releaseSession(session); |
| } |
| } |
| |
| /** |
| * Synchronize database context to all idle sessions in the pool |
| * This ensures all sessions use the same database after a USE statement |
| * Only applies to table model - tree model doesn't need this |
| * @param database - Database name to switch to |
| */ |
| private async syncDatabaseContextToPool(database: string): Promise<void> { |
| const syncPromises = this.pool |
| .filter((ps) => !ps.inUse) // Only sync idle sessions to avoid interfering |
| .map(async (ps) => { |
| try { |
| await ps.session.executeNonQueryStatement(`USE ${database}`); |
| logger.debug(`Synced database context to idle session`); |
| } catch (error) { |
| logger.warn(`Failed to sync database context to session: ${error}`); |
| } |
| }); |
| |
| await Promise.all(syncPromises); |
| } |
| |
| /** |
| * Override getSession to return session from pool |
| * |
| * Database context is managed through a two-step process: |
| * 1. During session creation (createPoolSession), the session gets the current database context |
| * 2. When executeNonQueryStatement detects a USE statement, syncDatabaseContextToPool() is called |
| * to synchronize the new database context to all idle sessions in the pool |
| * |
| * This ensures all sessions maintain correct database context without needing to execute USE |
| * on every getSession() call, which significantly improves performance in high-concurrency scenarios. |
| */ |
| async getSession(): Promise<Session> { |
| return super.getSession(); |
| } |
| |
| /** |
| * Get current database context |
| * @returns Current database name or undefined |
| */ |
| getCurrentDatabase(): string | undefined { |
| return this.currentDatabase; |
| } |
| |
| // insertTablet is inherited from BaseSessionPool. |
| // Previously this class had a simpler override that: |
| // 1. Did not include [PERF] debug logging for performance analysis |
| // 2. Did not support redirection cache for optimal write routing |
| // 3. Had less detailed timing metrics for session acquisition vs insert operation |
| // The inherited method from BaseSessionPool provides all these features. |
| } |
| |
| // Re-export types for backward compatibility and new types |
| export type { |
| QueryResult, |
| Tablet, |
| TreeTablet, |
| TableTablet, |
| ColumnCategory, |
| } from "./Session"; |
| export { TableSession } from "./TableSession"; |