| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| /** |
| * Benchmark Core Engine |
| * |
| * Core benchmark engine for managing test execution, client pools, |
| * and performance metrics collection. Supports concurrent operations |
| * with detailed latency and throughput tracking. |
| */ |
| |
| const { performance } = require("perf_hooks"); |
| |
| /** |
| * Performance metrics collector |
| */ |
| class MetricsCollector { |
| constructor(config) { |
| this.config = config; |
| this.reset(); |
| } |
| |
| reset() { |
| this.operations = []; |
| this.errors = []; |
| this.startTime = null; |
| this.endTime = null; |
| this.operationCount = 0; |
| this.successCount = 0; |
| this.failureCount = 0; |
| this.totalDataPoints = 0; |
| } |
| |
| start() { |
| this.startTime = performance.now(); |
| } |
| |
| recordOperation(latencyMs, dataPoints, success = true, error = null) { |
| this.operationCount++; |
| this.operations.push(latencyMs); |
| |
| if (success) { |
| this.successCount++; |
| this.totalDataPoints += dataPoints; |
| } else { |
| this.failureCount++; |
| if (error) { |
| this.errors.push(error); |
| } |
| } |
| } |
| |
| end() { |
| this.endTime = performance.now(); |
| } |
| |
| getDuration() { |
| return this.endTime - this.startTime; |
| } |
| |
| /** |
| * Calculate percentile |
| * @param {number} percentile - Percentile (0-100) |
| * @returns {number} Percentile value |
| */ |
| getPercentile(percentile) { |
| if (this.operations.length === 0) return 0; |
| |
| const sorted = [...this.operations].sort((a, b) => a - b); |
| const index = Math.ceil((percentile / 100) * sorted.length) - 1; |
| return sorted[Math.max(0, index)]; |
| } |
| |
| /** |
| * Get comprehensive statistics |
| * @returns {Object} Statistics object |
| */ |
| getStats() { |
| const durationMs = this.getDuration(); |
| const durationSec = durationMs / 1000; |
| |
| const stats = { |
| // Time |
| duration_ms: durationMs.toFixed(2), |
| duration_sec: durationSec.toFixed(2), |
| |
| // Operations |
| total_operations: this.operationCount, |
| successful_operations: this.successCount, |
| failed_operations: this.failureCount, |
| success_rate: |
| ((this.successCount / this.operationCount) * 100).toFixed(2) + "%", |
| |
| // Data points |
| total_data_points: this.totalDataPoints, |
| |
| // Throughput |
| operations_per_sec: (this.operationCount / durationSec).toFixed(2), |
| points_per_sec: (this.totalDataPoints / durationSec).toFixed(2), |
| |
| // Latency (ms) |
| latency: { |
| min: |
| this.operations.length > 0 |
| ? Math.min(...this.operations).toFixed(2) |
| : 0, |
| max: |
| this.operations.length > 0 |
| ? Math.max(...this.operations).toFixed(2) |
| : 0, |
| avg: |
| this.operations.length > 0 |
| ? ( |
| this.operations.reduce((a, b) => a + b, 0) / |
| this.operations.length |
| ).toFixed(2) |
| : 0, |
| }, |
| }; |
| |
| // Add percentiles if enabled |
| if (this.config.ENABLE_DETAILED_METRICS && this.operations.length > 0) { |
| stats.latency.p50 = this.getPercentile(50).toFixed(2); |
| stats.latency.p90 = this.getPercentile(90).toFixed(2); |
| stats.latency.p95 = this.getPercentile(95).toFixed(2); |
| stats.latency.p99 = this.getPercentile(99).toFixed(2); |
| } |
| |
| return stats; |
| } |
| |
| /** |
| * Print statistics summary |
| */ |
| printStats(label = "Benchmark Results") { |
| const stats = this.getStats(); |
| |
| console.log("\n" + "=".repeat(80)); |
| console.log(label.toUpperCase()); |
| console.log("=".repeat(80)); |
| |
| console.log("\n[Execution Time]"); |
| console.log( |
| ` Duration: ${stats.duration_sec}s (${stats.duration_ms}ms)`, |
| ); |
| |
| console.log("\n[Operations]"); |
| console.log(` Total Operations: ${stats.total_operations}`); |
| console.log(` Successful: ${stats.successful_operations}`); |
| console.log(` Failed: ${stats.failed_operations}`); |
| console.log(` Success Rate: ${stats.success_rate}`); |
| |
| console.log("\n[Data Points]"); |
| console.log( |
| ` Total Points Written: ${stats.total_data_points.toLocaleString()}`, |
| ); |
| |
| console.log("\n[Throughput]"); |
| console.log(` Operations/sec: ${stats.operations_per_sec}`); |
| console.log( |
| ` Points/sec: ${parseFloat(stats.points_per_sec).toLocaleString()}`, |
| ); |
| |
| console.log("\n[Latency (ms)]"); |
| console.log(` Min: ${stats.latency.min}ms`); |
| console.log(` Max: ${stats.latency.max}ms`); |
| console.log(` Average: ${stats.latency.avg}ms`); |
| |
| if (this.config.ENABLE_DETAILED_METRICS && stats.latency.p50) { |
| console.log(` P50 (Median): ${stats.latency.p50}ms`); |
| console.log(` P90: ${stats.latency.p90}ms`); |
| console.log(` P95: ${stats.latency.p95}ms`); |
| console.log(` P99: ${stats.latency.p99}ms`); |
| } |
| |
| if (this.failureCount > 0 && this.errors.length > 0) { |
| console.log("\n[Error Samples]"); |
| const sampleErrors = this.errors.slice(0, 5); |
| sampleErrors.forEach((err, idx) => { |
| console.log(` ${idx + 1}. ${err.message || err}`); |
| }); |
| if (this.errors.length > 5) { |
| console.log(` ... and ${this.errors.length - 5} more errors`); |
| } |
| } |
| |
| console.log("\n" + "=".repeat(80)); |
| } |
| } |
| |
| /** |
| * Progress reporter for long-running tests |
| */ |
| class ProgressReporter { |
| constructor(config, metricsCollector) { |
| this.config = config; |
| this.metrics = metricsCollector; |
| this.lastReportTime = performance.now(); |
| this.lastOperationCount = 0; |
| this.intervalId = null; |
| } |
| |
| start() { |
| if (this.config.REPORT_INTERVAL > 0) { |
| this.intervalId = setInterval(() => { |
| this.report(); |
| }, this.config.REPORT_INTERVAL); |
| } |
| } |
| |
| stop() { |
| if (this.intervalId) { |
| clearInterval(this.intervalId); |
| this.intervalId = null; |
| } |
| } |
| |
| report() { |
| const now = performance.now(); |
| const elapsed = now - this.lastReportTime; |
| const operations = this.metrics.operationCount - this.lastOperationCount; |
| const opsPerSec = (operations / elapsed) * 1000; |
| const totalPoints = this.metrics.totalDataPoints; |
| |
| console.log( |
| `[Progress] Operations: ${this.metrics.operationCount}, ` + |
| `Rate: ${opsPerSec.toFixed(2)} ops/s, ` + |
| `Total Points: ${totalPoints.toLocaleString()}`, |
| ); |
| |
| this.lastReportTime = now; |
| this.lastOperationCount = this.metrics.operationCount; |
| } |
| } |
| |
| /** |
| * Execute write operations concurrently with device-session binding |
| * @param {Object} pool - IoTDB session pool |
| * @param {Array} workload - Array of work items (should be ordered by device) |
| * @param {number} poolSize - Session pool size |
| * @param {MetricsCollector} metrics - Metrics collector |
| * @param {Function} executor - Async function to execute each work item (pool, work, session) |
| */ |
| async function executeConcurrentWithBinding( |
| pool, |
| workload, |
| poolSize, |
| metrics, |
| executor, |
| ) { |
| // Get sessions from pool and bind to devices |
| const sessions = []; |
| for (let i = 0; i < poolSize; i++) { |
| sessions.push(await pool.getSession()); |
| } |
| |
| console.log( |
| `[Device-Session Binding] Bound ${sessions.length} sessions to devices`, |
| ); |
| |
| try { |
| const workers = []; |
| |
| // Each worker handles workload items for devices bound to its session |
| // Workload is pre-ordered, so we can partition it evenly |
| const workPerSession = Math.ceil(workload.length / poolSize); |
| |
| const worker = async (sessionIdx) => { |
| const session = sessions[sessionIdx]; |
| const startIdx = sessionIdx * workPerSession; |
| const endIdx = Math.min(startIdx + workPerSession, workload.length); |
| |
| for (let i = startIdx; i < endIdx; i++) { |
| const work = workload[i]; |
| const startTime = performance.now(); |
| |
| try { |
| const dataPoints = await executor(pool, work, session); |
| const latency = performance.now() - startTime; |
| metrics.recordOperation(latency, dataPoints, true); |
| } catch (error) { |
| const latency = performance.now() - startTime; |
| metrics.recordOperation(latency, 0, false, error); |
| } |
| } |
| }; |
| |
| // Start workers, one per session |
| for (let i = 0; i < poolSize; i++) { |
| workers.push(worker(i)); |
| } |
| |
| // Wait for all workers to complete |
| await Promise.all(workers); |
| } finally { |
| // Release all sessions back to pool |
| for (const session of sessions) { |
| pool.releaseSession(session); |
| } |
| console.log( |
| `[Device-Session Binding] Released ${sessions.length} sessions`, |
| ); |
| } |
| } |
| |
| /** |
| * Execute write operations concurrently with pre-acquired sessions |
| * Each worker acquires a dedicated session at the start and uses it for all operations, |
| * which significantly improves performance by avoiding session acquisition overhead per operation. |
| * |
| * @param {Object} pool - IoTDB session pool |
| * @param {Array} workload - Array of work items |
| * @param {number} concurrency - Number of concurrent workers |
| * @param {MetricsCollector} metrics - Metrics collector |
| * @param {Function} executor - Async function to execute each work item: (pool, work, session) => Promise<dataPoints> |
| * The session parameter is the pre-acquired dedicated session for the worker. |
| * Executors should use this session directly when provided (not null) for optimal performance. |
| * If executor doesn't need the session, it can use pool methods which will handle session management. |
| */ |
| async function executeConcurrent( |
| pool, |
| workload, |
| concurrency, |
| metrics, |
| executor, |
| ) { |
| // Pre-acquire sessions for all workers to enable true concurrent execution |
| const actualConcurrency = Math.min(concurrency, workload.length); |
| |
| // Acquire all sessions in parallel |
| const sessions = await Promise.all( |
| Array.from({ length: actualConcurrency }, () => pool.getSession()), |
| ); |
| |
| console.log( |
| `[Concurrent Execution] Pre-acquired ${sessions.length} sessions for ${actualConcurrency} workers`, |
| ); |
| |
| let workIndex = 0; |
| const workers = []; |
| |
| try { |
| // Worker function - uses dedicated session for all its operations |
| const worker = async (workerSession) => { |
| while (workIndex < workload.length) { |
| const index = workIndex++; |
| if (index >= workload.length) break; |
| |
| const work = workload[index]; |
| const startTime = performance.now(); |
| |
| try { |
| // Pass the pre-acquired session to executor for direct use |
| const dataPoints = await executor(pool, work, workerSession); |
| const latency = performance.now() - startTime; |
| metrics.recordOperation(latency, dataPoints, true); |
| } catch (error) { |
| const latency = performance.now() - startTime; |
| metrics.recordOperation(latency, 0, false, error); |
| } |
| } |
| }; |
| |
| // Start workers, each with its own dedicated session |
| for (let i = 0; i < actualConcurrency; i++) { |
| workers.push(worker(sessions[i])); |
| } |
| |
| // Wait for all workers to complete |
| await Promise.all(workers); |
| } finally { |
| // Release all sessions back to pool |
| for (const session of sessions) { |
| pool.releaseSession(session); |
| } |
| console.log(`[Concurrent Execution] Released ${sessions.length} sessions`); |
| } |
| } |
| |
| /** |
| * Run benchmark test |
| * @param {Object} pool - IoTDB session pool |
| * @param {Object} testData - Test data |
| * @param {Object} config - Configuration |
| * @param {Function} executor - Async function to execute work items |
| * @param {Function} workloadGenerator - Function to generate workload |
| * @returns {Object} Test results |
| */ |
| async function runBenchmark( |
| pool, |
| testData, |
| config, |
| executor, |
| workloadGenerator, |
| ) { |
| console.log("\n" + "=".repeat(80)); |
| console.log("STARTING BENCHMARK TEST"); |
| console.log("=".repeat(80)); |
| |
| const metrics = new MetricsCollector(config); |
| const reporter = new ProgressReporter(config, metrics); |
| |
| // Generate workload |
| console.log("\nGenerating workload..."); |
| const workload = workloadGenerator(testData, config); |
| console.log(`Workload generated: ${workload.length} operations`); |
| |
| // Warmup phase |
| if (config.WARMUP_ROUNDS > 0) { |
| console.log( |
| `\n[Warmup Phase] Running ${config.WARMUP_ROUNDS} warmup rounds...`, |
| ); |
| for (let round = 0; round < config.WARMUP_ROUNDS; round++) { |
| const warmupMetrics = new MetricsCollector(config); |
| warmupMetrics.start(); |
| |
| await executeConcurrent( |
| pool, |
| workload.slice(0, Math.min(10, workload.length)), |
| Math.min(config.CLIENT_NUMBER, 2), |
| warmupMetrics, |
| executor, |
| ); |
| |
| warmupMetrics.end(); |
| console.log( |
| ` Warmup round ${round + 1}/${config.WARMUP_ROUNDS} completed`, |
| ); |
| } |
| } |
| |
| // Main test phase |
| if (config.ENABLE_DEVICE_SESSION_BINDING) { |
| console.log( |
| `\n[Test Phase] Running benchmark with device-session binding (${config.POOL_MAX_SIZE} sessions)...`, |
| ); |
| } else { |
| console.log( |
| `\n[Test Phase] Running benchmark with ${config.CLIENT_NUMBER} concurrent clients...`, |
| ); |
| } |
| |
| metrics.start(); |
| reporter.start(); |
| |
| try { |
| if (config.ENABLE_DEVICE_SESSION_BINDING) { |
| await executeConcurrentWithBinding( |
| pool, |
| workload, |
| config.POOL_MAX_SIZE, |
| metrics, |
| executor, |
| ); |
| } else { |
| await executeConcurrent( |
| pool, |
| workload, |
| config.CLIENT_NUMBER, |
| metrics, |
| executor, |
| ); |
| } |
| } finally { |
| reporter.stop(); |
| metrics.end(); |
| } |
| |
| // Print results |
| metrics.printStats("Benchmark Results"); |
| |
| return metrics.getStats(); |
| } |
| |
| /** |
| * Execute batch insert operations using insertTabletsParallel |
| * This is optimized for high throughput by pre-acquiring sessions |
| * and distributing tablets across workers. |
| * |
| * @param {Object} pool - IoTDB session pool |
| * @param {Array} tablets - Array of tablet objects ready for insertion |
| * @param {number} concurrency - Number of concurrent workers |
| * @param {MetricsCollector} metrics - Metrics collector |
| * @returns {Promise<void>} |
| */ |
| async function executeBatchInsert(pool, tablets, concurrency, metrics) { |
| const batchSize = Math.ceil(tablets.length / Math.ceil(tablets.length / 100)); // ~100 tablets per batch |
| const batches = []; |
| |
| for (let i = 0; i < tablets.length; i += batchSize) { |
| batches.push(tablets.slice(i, i + batchSize)); |
| } |
| |
| console.log(`[Batch Insert] ${tablets.length} tablets in ${batches.length} batches, concurrency: ${concurrency}`); |
| |
| const actualConcurrency = Math.min(concurrency, batches.length); |
| |
| // Pre-acquire sessions |
| const sessions = await Promise.all( |
| Array.from({ length: actualConcurrency }, () => pool.getSession()), |
| ); |
| |
| console.log(`[Batch Insert] Pre-acquired ${sessions.length} sessions`); |
| |
| let batchIndex = 0; |
| |
| try { |
| const workers = sessions.map(async (session) => { |
| while (batchIndex < batches.length) { |
| const idx = batchIndex++; |
| if (idx >= batches.length) break; |
| |
| const batch = batches[idx]; |
| const startTime = performance.now(); |
| |
| try { |
| // Insert all tablets in this batch sequentially using the same session |
| for (const tablet of batch) { |
| await session.insertTablet(tablet); |
| } |
| const latency = performance.now() - startTime; |
| // Calculate total data points in this batch |
| const dataPoints = batch.reduce((sum, t) => { |
| const rows = t.timestamps.length; |
| const cols = t.measurements?.length || t.columnNames?.length || 0; |
| return sum + rows * cols; |
| }, 0); |
| metrics.recordOperation(latency, dataPoints, true); |
| } catch (error) { |
| const latency = performance.now() - startTime; |
| metrics.recordOperation(latency, 0, false, error); |
| } |
| } |
| }); |
| |
| await Promise.all(workers); |
| } finally { |
| for (const session of sessions) { |
| pool.releaseSession(session); |
| } |
| console.log(`[Batch Insert] Released ${sessions.length} sessions`); |
| } |
| } |
| |
| /** |
| * Run benchmark using batch insert mode (optimized for high throughput) |
| * @param {Object} pool - IoTDB session pool |
| * @param {Array} tablets - Pre-built tablet array |
| * @param {Object} config - Configuration |
| * @returns {Object} Test results |
| */ |
| async function runBatchBenchmark(pool, tablets, config) { |
| console.log("\n" + "=".repeat(80)); |
| console.log("STARTING BATCH BENCHMARK TEST"); |
| console.log("=".repeat(80)); |
| |
| const metrics = new MetricsCollector(config); |
| const reporter = new ProgressReporter(config, metrics); |
| |
| console.log(`\nTotal tablets to insert: ${tablets.length}`); |
| |
| // Warmup phase |
| if (config.WARMUP_ROUNDS > 0) { |
| console.log( |
| `\n[Warmup Phase] Running ${config.WARMUP_ROUNDS} warmup rounds...`, |
| ); |
| const warmupTablets = tablets.slice(0, Math.min(20, tablets.length)); |
| for (let round = 0; round < config.WARMUP_ROUNDS; round++) { |
| const warmupMetrics = new MetricsCollector(config); |
| warmupMetrics.start(); |
| |
| await executeBatchInsert( |
| pool, |
| warmupTablets, |
| Math.min(config.CLIENT_NUMBER, 2), |
| warmupMetrics, |
| ); |
| |
| warmupMetrics.end(); |
| console.log( |
| ` Warmup round ${round + 1}/${config.WARMUP_ROUNDS} completed`, |
| ); |
| } |
| } |
| |
| // Main test phase |
| console.log( |
| `\n[Test Phase] Running batch benchmark with ${config.CLIENT_NUMBER} concurrent clients...`, |
| ); |
| |
| metrics.start(); |
| reporter.start(); |
| |
| try { |
| await executeBatchInsert(pool, tablets, config.CLIENT_NUMBER, metrics); |
| } finally { |
| reporter.stop(); |
| metrics.end(); |
| } |
| |
| // Print results |
| metrics.printStats("Batch Benchmark Results"); |
| |
| return metrics.getStats(); |
| } |
| |
| module.exports = { |
| MetricsCollector, |
| ProgressReporter, |
| executeConcurrent, |
| executeConcurrentWithBinding, |
| executeBatchInsert, |
| runBenchmark, |
| runBatchBenchmark, |
| }; |