Version: 1.0.0
Last Updated: 2024
The Apache IoTDB Node.js Client provides native support for the tree model (timeseries data model), enabling efficient management of time-series data using hierarchical device paths. This guide covers the SessionPool API for tree model operations with connection pooling and high-concurrency support.
The tree model in IoTDB organizes data hierarchically:
root.{storage_group}.{device}.{measurement}insertTablet for high-performance batch writesroot.test)root.test.device1)temperature, humidity)root.test.device1.temperature FLOAT)npm install @iotdb/client
Requirements:
TypeScript:
import { SessionPool, PoolConfigBuilder, TreeTablet, TSDataType } from '@iotdb/client';
JavaScript:
const { SessionPool, PoolConfigBuilder, TreeTablet, TSDataType } = require('@iotdb/client');
import { SessionPool, TreeTablet } from '@iotdb/client'; async function quickStart() { // Create and initialize pool const pool = new SessionPool('localhost', 6667, { username: 'root', password: 'root', maxPoolSize: 10, minPoolSize: 2, }); await pool.init(); try { // Create storage group await pool.executeNonQueryStatement('CREATE DATABASE root.test'); // Create timeseries await pool.executeNonQueryStatement( 'CREATE TIMESERIES root.test.device1.temperature WITH DATATYPE=FLOAT, ENCODING=RLE' ); // Insert data using TreeTablet class with addRow const tablet = new TreeTablet( 'root.test.device1', ['temperature'], [3] // FLOAT ); tablet.addRow(Date.now(), [25.5]); await pool.insertTablet(tablet); // Query data const result = await pool.executeQueryStatement( 'SELECT temperature FROM root.test.device1' ); console.log('Query result:', result); console.log('Pool size:', pool.getPoolSize()); console.log('Available:', pool.getAvailableSize()); } finally { await pool.close(); } } quickStart();
SessionPool provides connection pooling for high-concurrency scenarios. It automatically manages multiple sessions, distributes load across nodes, and recycles idle connections.
Key Features:
const pool = new SessionPool( ['node1', 'node2', 'node3'], // Hosts 6667, // Port { username: 'root', password: 'root', maxPoolSize: 20, minPoolSize: 5, } );
const pool = new SessionPool({ nodeUrls: [ 'node1:6667', 'node2:6668', 'node3:6669', ], username: 'root', password: 'root', maxPoolSize: 20, minPoolSize: 5, });
import { PoolConfigBuilder } from '@iotdb/client'; const pool = new SessionPool( new PoolConfigBuilder() .nodeUrls(['node1:6667', 'node2:6667', 'node3:6667']) .username('root') .password('root') .maxPoolSize(20) .minPoolSize(5) .maxIdleTime(60000) .waitTimeout(60000) .build() );
| Option | Type | Default | Description |
|---|---|---|---|
maxPoolSize | number | 10 | Maximum number of sessions in pool |
minPoolSize | number | 1 | Minimum number of sessions maintained |
maxIdleTime | number | 60000 | Max idle time before cleanup (ms) |
waitTimeout | number | 60000 | Max wait time for available session (ms) |
async init(): Promise<void>Initializes the connection pool and creates minimum sessions.
Example:
await pool.init();
async close(): Promise<void>Closes all sessions in the pool and releases resources.
Example:
await pool.close();
The pool automatically acquires and releases sessions for these operations:
async executeQueryStatement(sql: string, timeoutMs?: number): Promise<QueryResult>Executes a query using an available session from the pool.
Example:
const result = await pool.executeQueryStatement('SELECT * FROM root.test.**');
async executeNonQueryStatement(sql: string): Promise<void>Executes a non-query statement using an available session.
Example:
await pool.executeNonQueryStatement('CREATE DATABASE root.test');
async insertTablet(tablet: Tablet): Promise<void>Inserts data using an available session.
Example:
await pool.insertTablet({ deviceId: 'root.test.device1', measurements: ['temperature'], dataTypes: [3], timestamps: [Date.now()], values: [[25.5]], });
For multiple operations on the same session:
async getSession(): Promise<Session>Acquires a session from the pool. Must be released after use.
Example:
const session = await pool.getSession(); try { await session.executeNonQueryStatement('CREATE DATABASE root.test'); await session.insertTablet({ /* data */ }); const result = await session.executeQueryStatement('SELECT ...'); } finally { pool.releaseSession(session); }
releaseSession(session: Session): voidReleases a session back to the pool.
Example:
pool.releaseSession(session);
getPoolSize(): numberReturns the current total number of sessions in the pool.
getAvailableSize(): numberReturns the number of available (idle) sessions.
getInUseSize(): numberReturns the number of sessions currently in use.
Example:
console.log(`Total: ${pool.getPoolSize()}`); console.log(`Available: ${pool.getAvailableSize()}`); console.log(`In Use: ${pool.getInUseSize()}`);
Fluent API for building SessionPool configurations.
Available Methods:
host(host: string): thisport(port: number): thisnodeUrls(urls: string[]): thisusername(username: string): thispassword(password: string): thisdatabase(database: string): thistimezone(timezone: string): thisfetchSize(size: number): thisenableSSL(enable: boolean): thissslOptions(options: SSLOptions): thismaxPoolSize(size: number): thisminPoolSize(size: number): thismaxIdleTime(time: number): thiswaitTimeout(timeout: number): thisbuild(): PoolConfigExample:
const poolConfig = new PoolConfigBuilder() .nodeUrls(['node1:6667', 'node2:6667']) .username('root') .password('root') .maxPoolSize(20) .minPoolSize(5) .maxIdleTime(60000) .waitTimeout(60000) .build(); const pool = new SessionPool(poolConfig);
The tree model supports all IoTDB data types:
| Code | Type | JavaScript Type | Description |
|---|---|---|---|
| 0 | BOOLEAN | boolean | True or false |
| 1 | INT32 | number | 32-bit signed integer |
| 2 | INT64 | number/string | 64-bit signed integer (use string for large values) |
| 3 | FLOAT | number | 32-bit floating point |
| 4 | DOUBLE | number | 64-bit floating point |
| 5 | TEXT | string | UTF-8 string |
| 8 | TIMESTAMP | number/Date | Milliseconds since epoch |
| 9 | DATE | number/Date | Days since epoch |
| 10 | BLOB | Buffer | Binary data |
| 11 | STRING | string | Same as TEXT |
Example with Multiple Types:
await pool.insertTablet({ deviceId: 'root.test.sensor1', measurements: ['temp', 'humidity', 'status', 'description', 'reading_time'], dataTypes: [3, 4, 0, 5, 8], // FLOAT, DOUBLE, BOOLEAN, TEXT, TIMESTAMP timestamps: [Date.now()], values: [[ 25.5, // FLOAT 60.123456, // DOUBLE true, // BOOLEAN 'Normal operation', // TEXT Date.now(), // TIMESTAMP ]], });
For INT64 values larger than JavaScript's safe integer range (2^53 - 1), use strings:
await pool.insertTablet({ deviceId: 'root.test.device1', measurements: ['largeCounter'], dataTypes: [2], // INT64 timestamps: [Date.now()], values: [['9223372036854775807']], // String for large INT64 });
import { SessionPool } from '@iotdb/client'; async function crudExample() { const pool = new SessionPool('localhost', 6667, { username: 'root', password: 'root', maxPoolSize: 10, minPoolSize: 2, }); await pool.init(); try { // CREATE await pool.executeNonQueryStatement('CREATE DATABASE root.factory'); await pool.executeNonQueryStatement( 'CREATE TIMESERIES root.factory.workshop1.temperature WITH DATATYPE=FLOAT' ); // INSERT await pool.insertTablet({ deviceId: 'root.factory.workshop1', measurements: ['temperature'], dataTypes: [3], timestamps: [Date.now() - 3000, Date.now() - 2000, Date.now() - 1000], values: [[25.5], [26.0], [25.8]], }); // READ const result = await pool.executeQueryStatement( 'SELECT temperature FROM root.factory.workshop1' ); console.log('Temperature readings:', result); // UPDATE (Delete and re-insert) await pool.executeNonQueryStatement( `DELETE FROM root.factory.workshop1.temperature WHERE time <= ${Date.now() - 2500}` ); // DELETE await pool.executeNonQueryStatement('DELETE DATABASE root.factory'); } finally { await pool.close(); } } crudExample();
import { SessionPool, PoolConfigBuilder } from '@iotdb/client'; async function multiNodeExample() { const pool = new SessionPool( new PoolConfigBuilder() .nodeUrls([ 'iotdb-node1:6667', 'iotdb-node2:6667', 'iotdb-node3:6667', ]) .username('root') .password('root') .maxPoolSize(30) .minPoolSize(10) .build() ); await pool.init(); try { // Simulate concurrent operations const operations = []; for (let i = 0; i < 100; i++) { operations.push( pool.insertTablet({ deviceId: `root.test.device${i % 10}`, measurements: ['value'], dataTypes: [3], timestamps: [Date.now()], values: [[Math.random() * 100]], }) ); } await Promise.all(operations); console.log('All operations completed'); // Pool statistics console.log('Pool Statistics:'); console.log(` Total Sessions: ${pool.getPoolSize()}`); console.log(` Available: ${pool.getAvailableSize()}`); console.log(` In Use: ${pool.getInUseSize()}`); } finally { await pool.close(); } } multiNodeExample();
async function timeRangeQuery(pool: SessionPool) { const now = Date.now(); const hourAgo = now - 3600000; const result = await pool.executeQueryStatement( `SELECT temperature, humidity FROM root.test.** WHERE time >= ${hourAgo} AND time <= ${now}` ); console.log('Query result:', result); return result; }
async function batchInsertMultipleDevices(pool: SessionPool) { const devices = ['device1', 'device2', 'device3']; const timestamps = []; const now = Date.now(); // Generate timestamps for last 10 minutes for (let i = 0; i < 600; i++) { timestamps.push(now - (600 - i) * 1000); } for (const device of devices) { const values = timestamps.map(() => [ 20 + Math.random() * 10, // temperature 50 + Math.random() * 30, // humidity ]); await pool.insertTablet({ deviceId: `root.test.${device}`, measurements: ['temperature', 'humidity'], dataTypes: [3, 3], timestamps, values, }); } console.log(`Inserted ${timestamps.length} records for ${devices.length} devices`); }
Always close pool resources:
// SessionPool try { await pool.init(); // ... operations } finally { await pool.close(); }
Optimize batch size:
insertTablet instead of individual insertsExample:
// Good: Batch insert await pool.insertTablet({ deviceId: 'root.test.device1', measurements: ['temperature'], dataTypes: [3], timestamps: timestamps, // 100-1000 timestamps values: values, // 100-1000 values }); // Bad: Individual inserts for (let i = 0; i < 1000; i++) { await pool.executeNonQueryStatement( `INSERT INTO root.test.device1(timestamp, temperature) VALUES(${timestamps[i]}, ${values[i]})` ); }
try { await pool.init(); await pool.executeNonQueryStatement('CREATE DATABASE root.test'); } catch (error) { if (error.message.includes('already exists')) { console.log('Database already exists, continuing...'); } else { console.error('Failed to create database:', error); throw error; } } finally { await pool.close(); }
Guidelines:
minPoolSize to average concurrent loadmaxPoolSize to peak load + buffer (20-30%)Example:
const pool = new SessionPool({ nodeUrls: ['localhost:6667'], maxPoolSize: 50, // Peak load: 40 clients + 25% buffer minPoolSize: 20, // Average load: 20 clients maxIdleTime: 60000, // Clean up idle after 1 minute waitTimeout: 30000, // Wait max 30s for available session });
Symptoms:
Error: connect ECONNREFUSED 127.0.0.1:6667
Solutions:
jps | grep IoTDBiotdb-datanode.propertiestelnet localhost 6667Symptoms:
Error: Timeout waiting for available session
Solutions:
waitTimeout in pool configurationmaxPoolSize if server can handle itreleaseSession())Symptoms:
FATAL ERROR: Reached heap limit
Solutions:
fetchSize in pool configurationnode --max-old-space-size=4096 app.jsEnable debug logging:
// Set environment variable process.env.LOG_LEVEL = 'debug'; // Or use logger directly import { logger } from '@iotdb/client'; logger.setLevel('debug');
Check connection status:
console.log('Pool size:', pool.getPoolSize()); console.log('Available:', pool.getAvailableSize()); console.log('In Use:', pool.getInUseSize());
Test query execution time:
const start = Date.now(); const result = await pool.executeQueryStatement('SELECT ...'); console.log(`Query took ${Date.now() - start}ms`);
See data-types.md for comprehensive data type documentation.
init() - Initialize poolclose() - Close all sessionsexecuteQueryStatement(sql, timeout?) - Execute queryexecuteNonQueryStatement(sql) - Execute DDL/DMLinsertTablet(tablet) - Batch insertgetSession() - Get session from poolreleaseSession(session) - Return session to poolgetPoolSize() - Total sessionsgetAvailableSize() - Available sessionsgetInUseSize() - Active sessionsVersion: 1.0.0
Last Updated: January 2024
License: Apache License 2.0