/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

import {
  Session,
  TreeTablet,
  TableTablet,
  ITreeTablet,
  ITableTablet,
  SessionDataSet,
} from "./Session";
import {
  PoolConfig,
  DEFAULT_POOL_CONFIG,
  EndPoint,
  parseNodeUrls,
} from "../utils/Config";
import { logger } from "../utils/Logger";
import { registerClosable, unregisterClosable } from "../utils/ProcessCleanup";
import { RedirectCache } from "./RedirectCache";
import Denque from "denque";

interface PooledSession {
  session: Session;
  lastUsed: number;
  inUse: boolean;
}

/**
 * Base class for session pooling with common functionality
 * Provides connection pooling, round-robin load balancing, and automatic cleanup
 */
export abstract class BaseSessionPool {
  protected config: PoolConfig;
  protected endPoints: EndPoint[];
  protected pool: PooledSession[] = [];
  protected waitQueue: Denque<(session: Session) => void> = new Denque();
  protected idleSessions: Denque<PooledSession> = new Denque();
  protected activeSessions: Set<PooledSession> = new Set();
  protected currentEndPointIndex = 0;
  protected cleanupInterval: ReturnType<typeof setInterval> | null = null;
  protected redirectCache: RedirectCache;
  protected endPointToSession: Map<string, PooledSession> = new Map();

  constructor(
    hostsOrConfig: string | string[] | PoolConfig,
    port?: number,
    config?: Partial<PoolConfig>,
  ) {
    // Handle different constructor signatures for backward compatibility
    if (typeof hostsOrConfig === "object" && !Array.isArray(hostsOrConfig)) {
      // New format: constructor(config: PoolConfig)
      const poolConfig = hostsOrConfig as PoolConfig;
      this.config = { ...DEFAULT_POOL_CONFIG, ...poolConfig } as PoolConfig;

      if (poolConfig.nodeUrls) {
        if (poolConfig.nodeUrls.length === 0) {
          throw new Error("nodeUrls array cannot be empty");
        }
        // Parse nodeUrls if in string format
        this.endPoints =
          typeof poolConfig.nodeUrls[0] === "string"
            ? parseNodeUrls(poolConfig.nodeUrls as string[])
            : (poolConfig.nodeUrls as EndPoint[]);
      } else if (poolConfig.host && poolConfig.port) {
        this.endPoints = [{ host: poolConfig.host, port: poolConfig.port }];
      } else {
        throw new Error(
          "Either nodeUrls or host/port must be provided in config",
        );
      }
    } else {
      // Old format: constructor(hosts: string | string[], port: number, config?: Partial<PoolConfig>)
      if (port === undefined) {
        throw new Error(
          "Port must be provided when using host-based constructor",
        );
      }

      this.config = { ...DEFAULT_POOL_CONFIG, ...config, port } as PoolConfig;

      const hostList = Array.isArray(hostsOrConfig)
        ? hostsOrConfig
        : [hostsOrConfig];
      this.endPoints = hostList.map((host) => ({ host, port }));
    }

    // Initialize redirect cache
    this.redirectCache = new RedirectCache(
      this.config.redirectCacheTTL || 300000,
      10000, // max size
    );

    logger.info(
      `${this.getPoolName()} created with ${this.endPoints.length} endpoints, max pool size: ${this.config.maxPoolSize}, redirection: ${this.config.enableRedirection ? "enabled" : "disabled"}`,
    );

    registerClosable(this);
  }

  /**
   * Get the name of the pool for logging purposes
   */
  protected abstract getPoolName(): string;

  /**
   * Create a new session with pool-specific initialization
   */
  protected abstract createPoolSession(): Promise<Session>;

  async init(): Promise<void> {
    // Create minimum pool size connections in parallel
    const minSize = this.config.minPoolSize || 1;

    try {
      await Promise.all(
        Array.from({ length: minSize }, () => this.createSession()),
      );
    } catch (error: any) {
      const errorMsg = `Failed to initialize pool: ${error.message}`;
      logger.error(errorMsg);
      if (error.stack) {
        logger.error(`Stack trace: ${error.stack}`);
      }
      throw new Error(errorMsg);
    }

    // Start cleanup interval with proper async handling
    // Use unref() so it doesn't keep the process alive
    this.cleanupInterval = setInterval(() => {
      this.cleanupIdleSessions().catch((error) => {
        logger.error("Error during scheduled session cleanup:", error);
      });
    }, 30000).unref(); // Check every 30 seconds

    logger.info(`${this.getPoolName()} initialized with ${minSize} sessions`);
  }

  private async createSession(): Promise<Session> {
    const session = await this.createPoolSession();

    const pooledSession: PooledSession = {
      session,
      lastUsed: Date.now(),
      inUse: false,
    };

    this.pool.push(pooledSession);
    this.idleSessions.push(pooledSession);

    return session;
  }

  protected getNextEndPoint(): EndPoint {
    // Round-robin selection
    const endPoint = this.endPoints[this.currentEndPointIndex];
    this.currentEndPointIndex =
      (this.currentEndPointIndex + 1) % this.endPoints.length;
    return endPoint;
  }

  /**
   * Extract device ID from tablet for caching
   */
  protected extractDeviceId(
    tablet: TreeTablet | ITreeTablet | TableTablet | ITableTablet,
  ): string {
    if ("deviceId" in tablet) {
      return tablet.deviceId;
    } else if ("tableName" in tablet) {
      return tablet.tableName;
    }
    throw new Error("Unable to extract device ID from tablet");
  }

  /**
   * Get or create a session for a specific endpoint
   */
  protected async getSessionForEndpoint(endpoint: EndPoint): Promise<Session> {
    const key = `${endpoint.host}:${endpoint.port}`;

    // Check if we already have a session for this endpoint
    let pooledSession = this.endPointToSession.get(key);

    if (pooledSession && pooledSession.session.isOpen()) {
      pooledSession.inUse = true;
      pooledSession.lastUsed = Date.now();
      return pooledSession.session;
    }

    // Create new session for this endpoint
    const session = await this.createPoolSession();

    pooledSession = {
      session,
      lastUsed: Date.now(),
      inUse: true,
    };

    this.endPointToSession.set(key, pooledSession);
    this.pool.push(pooledSession);

    logger.info(
      `Created new session for redirect endpoint: ${endpoint.host}:${endpoint.port}`,
    );

    return session;
  }

  /**
   * Get a session from the pool
   * The session must be released back to the pool using releaseSession() after use
   */
  async getSession(): Promise<Session> {
    const startTime = Date.now();

    // Try to get an idle session (O(1) operation)
    // Use atomic shift() with null check to avoid race condition
    const pooledSession = this.idleSessions.shift();
    if (pooledSession) {
      // Verify session is still open
      if (pooledSession.session.isOpen()) {
        this.activeSessions.add(pooledSession);
        pooledSession.lastUsed = Date.now();
        const duration = Date.now() - startTime;
        logger.debug(
          `[PERF] getSession (reuse): ${duration}ms, pool: ${this.pool.length}, available: ${this.getAvailableSize()}, inUse: ${this.getInUseSize()}`,
        );
        return pooledSession.session;
      } else {
        // Session is closed, remove from pool
        const index = this.pool.indexOf(pooledSession);
        if (index > -1) {
          this.pool.splice(index, 1);
        }
        // Continue to create a new session
      }
    }

    // Create new session if pool is not full
    if (this.pool.length < (this.config.maxPoolSize || 10)) {
      logger.debug(
        `[PERF] getSession creating new session, pool: ${this.pool.length}/${this.config.maxPoolSize || 10}`,
      );
      const session = await this.createSession();
      const pooledSession = this.pool.find((ps) => ps.session === session);
      if (pooledSession) {
        this.idleSessions.shift(); // Remove from idle since we just added it
        this.activeSessions.add(pooledSession);
        pooledSession.inUse = true;
      }
      const duration = Date.now() - startTime;
      logger.debug(`[PERF] getSession (new): ${duration}ms`);
      return session;
    }

    // Wait for a session to become available
    logger.debug(
      `[PERF] getSession waiting, pool full: ${this.pool.length}, waitQueue: ${this.waitQueue.length}`,
    );
    const waitTimeout = this.config.waitTimeout || 60000;
    return new Promise((resolve, reject) => {
      const timeoutId = setTimeout(() => {
        const waiters = this.waitQueue.toArray();
        const index = waiters.indexOf(resolve);
        if (index > -1) {
          this.waitQueue.remove(index, 1);
        }
        reject(new Error("Timeout waiting for available session"));
      }, waitTimeout);

      // Use unref() so timeout doesn't prevent process exit
      if (typeof timeoutId === "object" && "unref" in timeoutId) {
        timeoutId.unref();
      }

      this.waitQueue.push((session: Session) => {
        clearTimeout(timeoutId);
        const duration = Date.now() - startTime;
        logger.debug(`[PERF] getSession (waited): ${duration}ms`);
        resolve(session);
      });
    });
  }

  /**
   * Release a session back to the pool
   * Should be called after getSession() when the session is no longer needed
   */
  releaseSession(session: Session): void {
    const pooledSession = this.pool.find((ps) => ps.session === session);
    if (pooledSession) {
      // Remove from active sessions
      this.activeSessions.delete(pooledSession);

      // Update tracking state
      pooledSession.inUse = false;
      pooledSession.lastUsed = Date.now();

      // Check if there are waiting requests
      if (this.waitQueue.length > 0) {
        const waiter = this.waitQueue.shift();
        if (waiter) {
          // Move to active for the waiter
          this.activeSessions.add(pooledSession);
          pooledSession.inUse = true;
          waiter(session);
        } else {
          // No waiter actually found, add back to idle
          this.idleSessions.push(pooledSession);
        }
      } else {
        // No waiters, add back to idle
        this.idleSessions.push(pooledSession);
      }
    }
  }

  private async cleanupIdleSessions(): Promise<void> {
    const now = Date.now();
    const maxIdleTime = this.config.maxIdleTime || 60000;
    const minSize = this.config.minPoolSize || 1;

    // Only iterate idle sessions (O(k) where k = idle count, not O(n))
    const sessionsToRemove: PooledSession[] = [];
    const idleArray = this.idleSessions.toArray();

    for (const ps of idleArray) {
      if (now - ps.lastUsed > maxIdleTime && this.pool.length > minSize) {
        sessionsToRemove.push(ps);
      }
    }

    await Promise.all(
      sessionsToRemove.map(async (ps) => {
        try {
          await ps.session.close();
          const poolIndex = this.pool.indexOf(ps);
          if (poolIndex > -1) {
            this.pool.splice(poolIndex, 1);
          }
          // Remove from idle sessions deque
          const idleIndex = this.idleSessions.toArray().indexOf(ps);
          if (idleIndex > -1) {
            this.idleSessions.remove(idleIndex, 1);
          }
          logger.debug(`Removed idle session from ${this.getPoolName()}`);
        } catch (error) {
          logger.error("Error closing idle session:", error);
        }
      }),
    );
  }

  /**
   * Execute a query statement and return SessionDataSet
   * Session is held until dataset.close() is called
   */
  async executeQueryStatement(
    sql: string,
    timeoutMs: number = 60000,
  ): Promise<SessionDataSet> {
    const session = await this.getSession();

    try {
      const dataSet = await session.executeQueryStatement(sql, timeoutMs);

      // Set cleanup callback to release session when dataset is closed
      dataSet.setCleanupCallback(() => {
        this.releaseSession(session);
      });

      return dataSet;
    } catch (error) {
      // If query fails, release session immediately
      this.releaseSession(session);
      throw error;
    }
  }

  async executeNonQueryStatement(sql: string): Promise<void> {
    const session = await this.getSession();
    try {
      return await session.executeNonQueryStatement(sql);
    } finally {
      this.releaseSession(session);
    }
  }

  /**
   * Insert tablet (supports both tree and table models)
   * @param tablet TreeTablet for tree model or TableTablet for table model
   */
  async insertTablet(
    tablet: TreeTablet | ITreeTablet | TableTablet | ITableTablet,
  ): Promise<void> {
    const totalStartTime = Date.now();
    const deviceId = this.extractDeviceId(tablet);

    // Check cache for optimal endpoint if redirection is enabled
    const cachedEndpoint = this.config.enableRedirection
      ? this.redirectCache.get(deviceId)
      : null;

    const sessionStartTime = Date.now();
    let session: Session;

    if (cachedEndpoint) {
      // Use cached endpoint for optimal routing
      session = await this.getSessionForEndpoint(cachedEndpoint);
    } else {
      // Use round-robin selection
      session = await this.getSession();
    }
    const sessionDuration = Date.now() - sessionStartTime;

    try {
      // Attempt insert
      const insertStartTime = Date.now();
      await session.insertTablet(tablet);
      const insertDuration = Date.now() - insertStartTime;
      const totalDuration = Date.now() - totalStartTime;

      logger.debug(
        `[PERF] Pool insertTablet total: ${totalDuration}ms (session: ${sessionDuration}ms, insert: ${insertDuration}ms)`,
      );

      // Check if server recommended a redirect for future operations
      if (this.config.enableRedirection) {
        const redirectEndpoint = session.getAndClearLastRedirect();
        if (redirectEndpoint) {
          // Cache the recommended endpoint for future writes
          this.redirectCache.set(deviceId, redirectEndpoint);
          logger.info(
            `Cached redirect recommendation: ${deviceId} -> ${redirectEndpoint.host}:${redirectEndpoint.port}`,
          );
        }
      }
    } finally {
      // Always release session
      this.releaseSession(session);
    }
  }

  async close(): Promise<void> {
    if (this.cleanupInterval) {
      clearInterval(this.cleanupInterval);
      this.cleanupInterval = null;
    }

    // Close all sessions
    await Promise.all(
      this.pool.map(async (ps) => {
        try {
          await ps.session.close();
        } catch (error) {
          logger.error("Error closing session:", error);
        }
      }),
    );

    this.pool = [];
    this.idleSessions.clear();
    this.activeSessions.clear();
    this.waitQueue.clear();
    this.endPointToSession.clear();
    this.redirectCache.clear();
    unregisterClosable(this);
    logger.info(`${this.getPoolName()} closed`);
  }

  getPoolSize(): number {
    return this.pool.length;
  }

  getAvailableSize(): number {
    return this.idleSessions.length;
  }

  getInUseSize(): number {
    return this.activeSessions.size;
  }

  /**
   * Get total number of sessions in the pool
   * @alias getPoolSize
   */
  get totalCount(): number {
    return this.pool.length;
  }

  /**
   * Get number of idle sessions
   * @alias getAvailableSize
   */
  get idleCount(): number {
    return this.idleSessions.length;
  }

  /**
   * Get number of active (in-use) sessions
   * @alias getInUseSize
   */
  get activeCount(): number {
    return this.activeSessions.size;
  }

  /**
   * Get number of requests waiting for a session
   */
  get waitingCount(): number {
    return this.waitQueue.length;
  }

  /**
   * Get comprehensive pool statistics
   */
  getPoolStats() {
    return {
      total: this.totalCount,
      idle: this.idleCount,
      active: this.activeCount,
      waiting: this.waitingCount,
      endpoints: this.endPoints.length,
      redirectCacheSize: this.redirectCache.getStats().size,
    };
  }

  /**
   * Insert multiple tablets concurrently using the pool.
   * This method optimizes for Node.js async patterns by:
   * 1. Pre-acquiring sessions to avoid contention
   * 2. Distributing tablets across sessions for parallel execution
   * 3. Automatically releasing sessions after completion
   * 
   * This is the recommended way to achieve high throughput in Node.js.
   * 
   * @param tablets Array of tablets to insert
   * @param options Configuration options
   * @param options.concurrency Number of concurrent workers (default: pool size or 10)
   * @returns Promise that resolves when all inserts complete
   * 
   * @example
   * ```typescript
   * const tablets = generateTablets(1000);
   * // Insert 1000 tablets using pool's concurrent execution
   * await pool.insertTabletsParallel(tablets, { concurrency: 20 });
   * ```
   */
  async insertTabletsParallel(
    tablets: (TreeTablet | ITreeTablet | TableTablet | ITableTablet)[],
    options?: { concurrency?: number },
  ): Promise<void> {
    if (tablets.length === 0) {
      return;
    }

    const concurrency = Math.min(
      options?.concurrency || this.config.maxPoolSize || 10,
      tablets.length,
    );

    const totalStartTime = Date.now();
    logger.debug(
      `[PERF] Pool insertTabletsParallel START: ${tablets.length} tablets, concurrency: ${concurrency}`,
    );

    // Pre-acquire sessions to enable true concurrent execution
    const sessions: Session[] = [];
    try {
      for (let i = 0; i < concurrency; i++) {
        sessions.push(await this.getSession());
      }
    } catch (error) {
      // Release any acquired sessions on error
      for (const session of sessions) {
        this.releaseSession(session);
      }
      throw error;
    }

    // Track errors with indices for debugging - don't fail fast
    const errors: { index: number; error: Error }[] = [];
    let tabletIndex = 0;

    try {
      // Create worker promises that consume from the tablet queue
      const workers = sessions.map(async (session) => {
        while (tabletIndex < tablets.length) {
          const idx = tabletIndex++;
          if (idx >= tablets.length) break;

          const tablet = tablets[idx];
          try {
            await session.insertTablet(tablet);
          } catch (err) {
            errors.push({
              index: idx,
              error: err instanceof Error ? err : new Error(String(err)),
            });
          }
        }
      });

      // Wait for all workers to complete
      await Promise.all(workers);
    } finally {
      // Always release all sessions
      for (const session of sessions) {
        this.releaseSession(session);
      }
    }

    const totalDuration = Date.now() - totalStartTime;
    const throughput = tablets.length / (totalDuration / 1000);
    logger.debug(
      `[PERF] Pool insertTabletsParallel COMPLETE: ${totalDuration}ms, ${throughput.toFixed(2)} tablets/sec`,
    );

    // If there were errors, throw an aggregate error with details
    if (errors.length > 0) {
      const failedIndices = errors.map((e) => e.index);
      const errorDetails = errors
        .slice(0, 5)
        .map((e) => `[${e.index}]: ${e.error.message}`)
        .join('; ');
      const suffix = errors.length > 5 ? ` ... and ${errors.length - 5} more` : '';
      const aggregateError = new Error(
        `${errors.length} of ${tablets.length} tablet inserts failed. ` +
        `Failed indices: [${failedIndices.join(', ')}]. Errors: ${errorDetails}${suffix}`
      ) as Error & { failedIndices: number[]; errors: Error[] };
      aggregateError.failedIndices = failedIndices;
      aggregateError.errors = errors.map((e) => e.error);
      throw aggregateError;
    }
  }

  /**
   * Execute multiple operations concurrently using the pool.
   * This is a generic utility for running any async operations in parallel
   * while respecting pool size limits.
   * 
   * @param items Array of items to process
   * @param operation Async function to execute for each item
   * @param options Configuration options
   * @param options.concurrency Number of concurrent workers (default: pool size or 10)
   * @param options.stopOnError Whether to stop processing on first error (default: false)
   * @returns Results array (in same order as input items).
   *          Note: When stopOnError=false, failed operations will have `undefined` at their index.
   *          Check array entries for undefined to detect failures.
   * 
   * @example
   * ```typescript
   * const devices = ['d1', 'd2', 'd3'];
   * const results = await pool.executeParallel(
   *   devices,
   *   async (session, deviceId) => {
   *     await session.executeNonQueryStatement(`CREATE TIMESERIES root.sg.${deviceId}...`);
   *     return deviceId;
   *   },
   *   { concurrency: 5 }
   * );
   * // Check for failures
   * const failures = results.filter((r, i) => r === undefined);
   * ```
   */
  async executeParallel<T, R>(
    items: T[],
    operation: (session: Session, item: T, index: number) => Promise<R>,
    options?: { concurrency?: number; stopOnError?: boolean },
  ): Promise<(R | undefined)[]> {
    if (items.length === 0) {
      return [];
    }

    const concurrency = Math.min(
      options?.concurrency || this.config.maxPoolSize || 10,
      items.length,
    );
    const stopOnError = options?.stopOnError ?? false;

    // Pre-acquire sessions
    const sessions: Session[] = [];
    try {
      for (let i = 0; i < concurrency; i++) {
        sessions.push(await this.getSession());
      }
    } catch (error) {
      for (const session of sessions) {
        this.releaseSession(session);
      }
      throw error;
    }

    const results: (R | undefined)[] = new Array(items.length);
    let itemIndex = 0;
    let shouldStop = false;
    const errors: { index: number; error: Error }[] = [];

    try {
      const workers = sessions.map(async (session) => {
        while (!shouldStop && itemIndex < items.length) {
          const idx = itemIndex++;
          if (idx >= items.length) break;

          try {
            results[idx] = await operation(session, items[idx], idx);
          } catch (err) {
            const error = err instanceof Error ? err : new Error(String(err));
            errors.push({ index: idx, error });
            if (stopOnError) {
              shouldStop = true;
              break;
            }
          }
        }
      });

      await Promise.all(workers);
    } finally {
      for (const session of sessions) {
        this.releaseSession(session);
      }
    }

    if (errors.length > 0 && stopOnError) {
      throw errors[0].error;
    }

    return results;
  }
}
