| #!/usr/bin/env node |
| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| /** |
| * Tree Model Benchmark (Optimized with insertTabletsParallel) |
| * |
| * Performance benchmark for IoTDB tree model (timeseries model). |
| * Uses optimized batch insert with pre-acquired sessions for maximum throughput. |
| * |
| * Usage: |
| * node benchmark-tree.js [options] |
| * |
| * Environment Variables: |
| * IOTDB_HOST - IoTDB host (default: localhost) |
| * IOTDB_PORT - IoTDB port (default: 6667) |
| * CLIENT_NUMBER - Concurrent clients (default: 10) |
| * DEVICE_NUMBER - Number of devices (default: 100) |
| * SENSOR_NUMBER - Sensors per device (default: 10) |
| * BATCH_SIZE_PER_WRITE - Batch size (default: 100) |
| * LOOP - Number of loops (default: calculated from TOTAL_DATA_POINTS) |
| * REGENERATE_DATA - Force regenerate data (default: false) |
| */ |
| |
| const { SessionPool } = require('../dist'); |
| const { createConfig, printConfig } = require('./config'); |
| const { prepareTestData } = require('./data-generator'); |
| const { createTreeModelSchema, cleanupSchema } = require('./schema-manager'); |
| const { runBatchBenchmark } = require('./benchmark-core'); |
| |
| /** |
| * Create session pool for tree model |
| * @param {Object} config - Configuration object |
| * @returns {SessionPool} Session pool instance |
| */ |
| function createSessionPool(config) { |
| if (config.NODE_URLS) { |
| // Multi-node configuration |
| return new SessionPool({ |
| nodeUrls: config.NODE_URLS, |
| username: config.IOTDB_USER, |
| password: config.IOTDB_PASSWORD, |
| maxPoolSize: config.POOL_MAX_SIZE, |
| minPoolSize: config.POOL_MIN_SIZE, |
| maxIdleTime: config.POOL_MAX_IDLE_TIME, |
| waitTimeout: config.POOL_WAIT_TIMEOUT, |
| }); |
| } else { |
| // Single node configuration |
| return new SessionPool(config.IOTDB_HOST, config.IOTDB_PORT, { |
| username: config.IOTDB_USER, |
| password: config.IOTDB_PASSWORD, |
| maxPoolSize: config.POOL_MAX_SIZE, |
| minPoolSize: config.POOL_MIN_SIZE, |
| maxIdleTime: config.POOL_MAX_IDLE_TIME, |
| waitTimeout: config.POOL_WAIT_TIMEOUT, |
| }); |
| } |
| } |
| |
| /** |
| * Build tablets for a single loop iteration (memory efficient) |
| * @param {Object} testData - Test data structure |
| * @param {Object} config - Configuration object |
| * @param {number} loopIdx - Current loop index |
| * @returns {Array} Array of tablet objects for this loop |
| */ |
| function buildTabletsForLoop(testData, config, loopIdx) { |
| const tablets = []; |
| const baseTimestamp = Date.now() + loopIdx * config.BATCH_SIZE_PER_WRITE * config.POINT_STEP; |
| const batch = testData.sharedBatches[0]; |
| |
| for (const device of testData.devices) { |
| // Update timestamps for this loop iteration |
| const updatedTimestamps = batch.timestamps.map((offset) => baseTimestamp + offset); |
| |
| tablets.push({ |
| deviceId: device.deviceId, |
| measurements: device.measurements, |
| dataTypes: device.dataTypes, |
| timestamps: updatedTimestamps, |
| values: batch.values, |
| }); |
| } |
| |
| return tablets; |
| } |
| |
| /** |
| * Main benchmark function |
| */ |
| async function main() { |
| console.log('╔════════════════════════════════════════════════════════════════════════════╗'); |
| console.log('║ IoTDB Tree Model Benchmark (Optimized Batch Insert) ║'); |
| console.log('╚════════════════════════════════════════════════════════════════════════════╝'); |
| console.log(); |
| |
| // Create configuration |
| const config = createConfig(); |
| printConfig(config); |
| |
| let pool = null; |
| |
| try { |
| // Step 1: Prepare test data |
| console.log('Step 1: Preparing test data...'); |
| const testData = await prepareTestData(config, 'tree'); |
| console.log(`✓ Test data ready: ${testData.devices.length} devices with ${config.SENSOR_NUMBER} sensors each`); |
| |
| // Step 2: Create session pool |
| console.log('\nStep 2: Initializing session pool...'); |
| pool = createSessionPool(config); |
| console.log(' Created pool object, calling init()...'); |
| await pool.init(); |
| console.log(`✓ Session pool initialized: ${pool.getPoolSize()} connections`); |
| |
| // Step 3: Create schema |
| console.log('\nStep 3: Creating schema...'); |
| await createTreeModelSchema(pool, testData, config); |
| console.log('✓ Schema creation completed'); |
| |
| // Step 4: Run streaming batch benchmark |
| console.log('\nStep 4: Running streaming batch benchmark...'); |
| const results = await runStreamingBenchmark(pool, testData, config); |
| |
| // Step 5: Summary |
| console.log('\n' + '='.repeat(80)); |
| console.log('BENCHMARK COMPLETED SUCCESSFULLY'); |
| console.log('='.repeat(80)); |
| console.log(`\nKey Metrics:`); |
| console.log(` • Total Operations: ${results.total_operations}`); |
| console.log(` • Success Rate: ${results.success_rate}`); |
| console.log(` • Throughput: ${parseFloat(results.points_per_sec).toLocaleString()} points/sec`); |
| console.log(` • Average Latency: ${results.latency.avg}ms`); |
| console.log(` • Test Duration: ${results.duration_sec}s`); |
| |
| } catch (error) { |
| console.error('\n' + '!'.repeat(80)); |
| console.error('BENCHMARK FAILED'); |
| console.error('!'.repeat(80)); |
| console.error('\nError:', error.message); |
| console.error('\nStack trace:', error.stack); |
| process.exit(1); |
| } finally { |
| // Close pool |
| if (pool) { |
| console.log('\nClosing session pool...'); |
| await pool.close(); |
| console.log('✓ Session pool closed'); |
| } |
| } |
| } |
| |
| /** |
| * Run streaming benchmark - process tablets loop by loop to avoid memory issues |
| * @param {Object} pool - Session pool |
| * @param {Object} testData - Test data |
| * @param {Object} config - Configuration |
| * @returns {Object} Benchmark results |
| */ |
| async function runStreamingBenchmark(pool, testData, config) { |
| const { MetricsCollector, ProgressReporter } = require('./benchmark-core'); |
| const { performance } = require('perf_hooks'); |
| |
| console.log("\n" + "=".repeat(80)); |
| console.log("STARTING STREAMING BATCH BENCHMARK"); |
| console.log("=".repeat(80)); |
| |
| const loopCount = config.LOOP !== null ? config.LOOP : 1; |
| const totalTablets = loopCount * testData.devices.length; |
| console.log(`\nTotal: ${loopCount} loops × ${testData.devices.length} devices = ${totalTablets} tablets`); |
| console.log(`Data points per tablet: ${config.BATCH_SIZE_PER_WRITE} rows × ${config.SENSOR_NUMBER} sensors = ${config.BATCH_SIZE_PER_WRITE * config.SENSOR_NUMBER}`); |
| |
| const metrics = new MetricsCollector(config); |
| const reporter = new ProgressReporter(config, metrics); |
| |
| // Warmup phase |
| if (config.WARMUP_ROUNDS > 0) { |
| console.log(`\n[Warmup Phase] Running ${config.WARMUP_ROUNDS} warmup rounds...`); |
| for (let round = 0; round < config.WARMUP_ROUNDS; round++) { |
| const warmupTablets = buildTabletsForLoop(testData, config, 0); |
| const warmupMetrics = new MetricsCollector(config); |
| warmupMetrics.start(); |
| |
| // Pre-acquire sessions for warmup |
| const warmupConcurrency = Math.min(config.CLIENT_NUMBER, 2); |
| const warmupSessions = []; |
| for (let i = 0; i < warmupConcurrency; i++) { |
| warmupSessions.push(await pool.getSession()); |
| } |
| |
| try { |
| let idx = 0; |
| const workers = warmupSessions.map(async (session) => { |
| while (idx < Math.min(10, warmupTablets.length)) { |
| const i = idx++; |
| if (i >= warmupTablets.length) break; |
| try { |
| await session.insertTablet(warmupTablets[i]); |
| } catch (e) { /* ignore warmup errors */ } |
| } |
| }); |
| await Promise.all(workers); |
| } finally { |
| for (const session of warmupSessions) { |
| pool.releaseSession(session); |
| } |
| } |
| |
| warmupMetrics.end(); |
| console.log(` Warmup round ${round + 1}/${config.WARMUP_ROUNDS} completed`); |
| } |
| } |
| |
| // Pre-acquire sessions for main test |
| const concurrency = Math.min(config.CLIENT_NUMBER, config.POOL_MAX_SIZE); |
| const sessions = []; |
| for (let i = 0; i < concurrency; i++) { |
| sessions.push(await pool.getSession()); |
| } |
| console.log(`\n[Test Phase] Pre-acquired ${sessions.length} sessions for ${concurrency} concurrent workers`); |
| |
| metrics.start(); |
| reporter.start(); |
| |
| try { |
| // Process loop by loop to avoid memory issues |
| for (let loopIdx = 0; loopIdx < loopCount; loopIdx++) { |
| // Build tablets for this loop only |
| const tablets = buildTabletsForLoop(testData, config, loopIdx); |
| |
| // Insert tablets using pre-acquired sessions |
| let tabletIndex = 0; |
| const workers = sessions.map(async (session) => { |
| while (tabletIndex < tablets.length) { |
| const idx = tabletIndex++; |
| if (idx >= tablets.length) break; |
| |
| const tablet = tablets[idx]; |
| const startTime = performance.now(); |
| |
| try { |
| await session.insertTablet(tablet); |
| const latency = performance.now() - startTime; |
| const dataPoints = tablet.timestamps.length * tablet.measurements.length; |
| metrics.recordOperation(latency, dataPoints, true); |
| } catch (error) { |
| const latency = performance.now() - startTime; |
| metrics.recordOperation(latency, 0, false, error); |
| } |
| } |
| }); |
| |
| await Promise.all(workers); |
| |
| // Reset tablet index for next loop |
| // tablets array will be garbage collected |
| } |
| } finally { |
| reporter.stop(); |
| metrics.end(); |
| |
| // Release all sessions |
| for (const session of sessions) { |
| pool.releaseSession(session); |
| } |
| console.log(`[Test Phase] Released ${sessions.length} sessions`); |
| } |
| |
| // Print results |
| metrics.printStats("Streaming Batch Benchmark Results"); |
| |
| return metrics.getStats(); |
| } |
| |
| // Run benchmark |
| if (require.main === module) { |
| main().catch((error) => { |
| console.error('Fatal error:', error); |
| process.exit(1); |
| }); |
| } |
| |
| module.exports = { main }; |