版本: 1.0.0
最后更新: 2024
Apache IoTDB Node.js Client 为树模型(时间序列数据模型)提供了原生支持,使用分层设备路径实现时间序列数据的高效管理。本指南涵盖了树模型操作的 SessionPool API,提供连接池和高并发支持。
IoTDB 中的树模型采用分层组织数据:
root.{storage_group}.{device}.{measurement}insertTablet 实现高性能批量写入root.test)root.test.device1)temperature、humidity)root.test.device1.temperature FLOAT)npm install @iotdb/client
系统要求:
TypeScript:
import { SessionPool, PoolConfigBuilder, TreeTablet, TSDataType } from '@iotdb/client';
JavaScript:
const { SessionPool, PoolConfigBuilder, TreeTablet, TSDataType } = require('@iotdb/client');
import { SessionPool, TreeTablet } from '@iotdb/client'; async function quickStart() { // 创建并初始化连接池 const pool = new SessionPool('localhost', 6667, { username: 'root', password: 'root', maxPoolSize: 10, minPoolSize: 2, }); await pool.init(); try { // 创建存储组 await pool.executeNonQueryStatement('CREATE DATABASE root.test'); // 创建时间序列 await pool.executeNonQueryStatement( 'CREATE TIMESERIES root.test.device1.temperature WITH DATATYPE=FLOAT, ENCODING=RLE' ); // 使用 TreeTablet 类与 addRow 插入数据 const tablet = new TreeTablet( 'root.test.device1', ['temperature'], [3] // FLOAT ); tablet.addRow(Date.now(), [25.5]); await pool.insertTablet(tablet); // 查询数据 const result = await pool.executeQueryStatement( 'SELECT temperature FROM root.test.device1' ); console.log('Query result:', result); console.log('Pool size:', pool.getPoolSize()); console.log('Available:', pool.getAvailableSize()); } finally { await pool.close(); } } quickStart();
SessionPool 为高并发场景提供连接池。它自动管理多个 session,在节点间分配负载,并回收空闲连接。
核心特性:
const pool = new SessionPool( ['node1', 'node2', 'node3'], // 主机列表 6667, // 端口 { username: 'root', password: 'root', maxPoolSize: 20, minPoolSize: 5, } );
const pool = new SessionPool({ nodeUrls: [ 'node1:6667', 'node2:6668', 'node3:6669', ], username: 'root', password: 'root', maxPoolSize: 20, minPoolSize: 5, });
import { PoolConfigBuilder } from '@iotdb/client'; const pool = new SessionPool( new PoolConfigBuilder() .nodeUrls(['node1:6667', 'node2:6667', 'node3:6667']) .username('root') .password('root') .maxPoolSize(20) .minPoolSize(5) .maxIdleTime(60000) .waitTimeout(60000) .build() );
| 选项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
maxPoolSize | number | 10 | 连接池中 session 的最大数量 |
minPoolSize | number | 1 | 维护的最小 session 数量 |
maxIdleTime | number | 60000 | 清理前的最大空闲时间(毫秒) |
waitTimeout | number | 60000 | 等待可用 session 的最大时间(毫秒) |
async init(): Promise<void>初始化连接池并创建最小数量的 session。
示例:
await pool.init();
async close(): Promise<void>关闭连接池中的所有 session 并释放资源。
示例:
await pool.close();
连接池会自动为这些操作获取和释放 session:
async executeQueryStatement(sql: string, timeoutMs?: number): Promise<QueryResult>使用连接池中的可用 session 执行查询。
示例:
const result = await pool.executeQueryStatement('SELECT * FROM root.test.**');
async executeNonQueryStatement(sql: string): Promise<void>使用可用 session 执行非查询语句。
示例:
await pool.executeNonQueryStatement('CREATE DATABASE root.test');
async insertTablet(tablet: Tablet): Promise<void>使用可用 session 插入数据。
示例:
await pool.insertTablet({ deviceId: 'root.test.device1', measurements: ['temperature'], dataTypes: [3], timestamps: [Date.now()], values: [[25.5]], });
对于同一 session 上的多个操作:
async getSession(): Promise<Session>从连接池获取一个 session。使用后必须释放。
示例:
const session = await pool.getSession(); try { await session.executeNonQueryStatement('CREATE DATABASE root.test'); await session.insertTablet({ /* data */ }); const result = await session.executeQueryStatement('SELECT ...'); } finally { pool.releaseSession(session); }
releaseSession(session: Session): void将 session 释放回连接池。
示例:
pool.releaseSession(session);
getPoolSize(): number返回连接池中当前 session 的总数。
getAvailableSize(): number返回可用(空闲) session 的数量。
getInUseSize(): number返回当前正在使用的 session 数量。
示例:
console.log(`Total: ${pool.getPoolSize()}`); console.log(`Available: ${pool.getAvailableSize()}`); console.log(`In Use: ${pool.getInUseSize()}`);
用于构建 SessionPool 配置的流式 API。
可用方法:
host(host: string): thisport(port: number): thisnodeUrls(urls: string[]): thisusername(username: string): thispassword(password: string): thisdatabase(database: string): thistimezone(timezone: string): thisfetchSize(size: number): thisenableSSL(enable: boolean): thissslOptions(options: SSLOptions): thismaxPoolSize(size: number): thisminPoolSize(size: number): thismaxIdleTime(time: number): thiswaitTimeout(timeout: number): thisbuild(): PoolConfig示例:
const poolConfig = new PoolConfigBuilder() .nodeUrls(['node1:6667', 'node2:6667']) .username('root') .password('root') .maxPoolSize(20) .minPoolSize(5) .maxIdleTime(60000) .waitTimeout(60000) .build(); const pool = new SessionPool(poolConfig);
树模型支持所有 IoTDB 数据类型:
| 代码 | 类型 | JavaScript 类型 | 说明 |
|---|---|---|---|
| 0 | BOOLEAN | boolean | True 或 false |
| 1 | INT32 | number | 32 位有符号整数 |
| 2 | INT64 | number/string | 64 位有符号整数(大值使用字符串) |
| 3 | FLOAT | number | 32 位浮点数 |
| 4 | DOUBLE | number | 64 位浮点数 |
| 5 | TEXT | string | UTF-8 字符串 |
| 8 | TIMESTAMP | number/Date | 自纪元以来的毫秒数 |
| 9 | DATE | number/Date | 自纪元以来的天数 |
| 10 | BLOB | Buffer | 二进制数据 |
| 11 | STRING | string | 与 TEXT 相同 |
多类型示例:
await pool.insertTablet({ deviceId: 'root.test.sensor1', measurements: ['temp', 'humidity', 'status', 'description', 'reading_time'], dataTypes: [3, 4, 0, 5, 8], // FLOAT, DOUBLE, BOOLEAN, TEXT, TIMESTAMP timestamps: [Date.now()], values: [[ 25.5, // FLOAT 60.123456, // DOUBLE true, // BOOLEAN 'Normal operation', // TEXT Date.now(), // TIMESTAMP ]], });
对于大于 JavaScript 安全整数范围(2^53 - 1)的 INT64 值,使用字符串:
await pool.insertTablet({ deviceId: 'root.test.device1', measurements: ['largeCounter'], dataTypes: [2], // INT64 timestamps: [Date.now()], values: [['9223372036854775807']], // 大值 INT64 使用字符串 });
import { SessionPool } from '@iotdb/client'; async function crudExample() { const pool = new SessionPool('localhost', 6667, { username: 'root', password: 'root', maxPoolSize: 10, minPoolSize: 2, }); await pool.init(); try { // 创建(CREATE) await pool.executeNonQueryStatement('CREATE DATABASE root.factory'); await pool.executeNonQueryStatement( 'CREATE TIMESERIES root.factory.workshop1.temperature WITH DATATYPE=FLOAT' ); // 插入(INSERT) await pool.insertTablet({ deviceId: 'root.factory.workshop1', measurements: ['temperature'], dataTypes: [3], timestamps: [Date.now() - 3000, Date.now() - 2000, Date.now() - 1000], values: [[25.5], [26.0], [25.8]], }); // 读取(READ) const result = await pool.executeQueryStatement( 'SELECT temperature FROM root.factory.workshop1' ); console.log('Temperature readings:', result); // 更新(UPDATE) - 删除并重新插入 await pool.executeNonQueryStatement( `DELETE FROM root.factory.workshop1.temperature WHERE time <= ${Date.now() - 2500}` ); // 删除(DELETE) await pool.executeNonQueryStatement('DELETE DATABASE root.factory'); } finally { await pool.close(); } } crudExample();
import { SessionPool, PoolConfigBuilder } from '@iotdb/client'; async function multiNodeExample() { const pool = new SessionPool( new PoolConfigBuilder() .nodeUrls([ 'iotdb-node1:6667', 'iotdb-node2:6667', 'iotdb-node3:6667', ]) .username('root') .password('root') .maxPoolSize(30) .minPoolSize(10) .build() ); await pool.init(); try { // 模拟并发操作 const operations = []; for (let i = 0; i < 100; i++) { operations.push( pool.insertTablet({ deviceId: `root.test.device${i % 10}`, measurements: ['value'], dataTypes: [3], timestamps: [Date.now()], values: [[Math.random() * 100]], }) ); } await Promise.all(operations); console.log('All operations completed'); // 连接池统计 console.log('Pool Statistics:'); console.log(` Total Sessions: ${pool.getPoolSize()}`); console.log(` Available: ${pool.getAvailableSize()}`); console.log(` In Use: ${pool.getInUseSize()}`); } finally { await pool.close(); } } multiNodeExample();
async function timeRangeQuery(pool: SessionPool) { const now = Date.now(); const hourAgo = now - 3600000; const result = await pool.executeQueryStatement( `SELECT temperature, humidity FROM root.test.** WHERE time >= ${hourAgo} AND time <= ${now}` ); console.log('Query result:', result); return result; }
async function batchInsertMultipleDevices(pool: SessionPool) { const devices = ['device1', 'device2', 'device3']; const timestamps = []; const now = Date.now(); // 生成最近 10 分钟的时间戳 for (let i = 0; i < 600; i++) { timestamps.push(now - (600 - i) * 1000); } for (const device of devices) { const values = timestamps.map(() => [ 20 + Math.random() * 10, // temperature 50 + Math.random() * 30, // humidity ]); await pool.insertTablet({ deviceId: `root.test.${device}`, measurements: ['temperature', 'humidity'], dataTypes: [3, 3], timestamps, values, }); } console.log(`Inserted ${timestamps.length} records for ${devices.length} devices`); }
始终关闭资源:
// SessionPool try { await pool.init(); // ... 操作 } finally { await pool.close(); }
优化批量大小:
insertTablet 而不是单条插入示例:
// 好的做法: 批量插入 await pool.insertTablet({ deviceId: 'root.test.device1', measurements: ['temperature'], dataTypes: [3], timestamps: timestamps, // 100-1000 个时间戳 values: values, // 100-1000 个值 }); // 不好的做法: 单条插入 for (let i = 0; i < 1000; i++) { await pool.executeNonQueryStatement( `INSERT INTO root.test.device1(timestamp, temperature) VALUES(${timestamps[i]}, ${values[i]})` ); }
try { await pool.init(); await pool.executeNonQueryStatement('CREATE DATABASE root.test'); } catch (error) { if (error.message.includes('already exists')) { console.log('Database already exists, continuing...'); } else { console.error('Failed to create database:', error); throw error; } } finally { await pool.close(); }
指导原则:
minPoolSize 设置为平均并发负载maxPoolSize 设置为峰值负载 + 缓冲(20-30%)示例:
const pool = new SessionPool({ nodeUrls: ['localhost:6667'], maxPoolSize: 50, // 峰值负载: 40 个客户端 + 25% 缓冲 minPoolSize: 20, // 平均负载: 20 个客户端 maxIdleTime: 60000, // 空闲 1 分钟后清理 waitTimeout: 30000, // 最多等待 30 秒获取可用 session });
症状:
Error: connect ECONNREFUSED 127.0.0.1:6667
解决方案:
jps | grep IoTDBiotdb-datanode.properties 中的端口配置telnet localhost 6667症状:
Error: Timeout waiting for available session
解决方案:
waitTimeoutmaxPoolSizereleaseSession())症状:
FATAL ERROR: Reached heap limit
解决方案:
fetchSizenode --max-old-space-size=4096 app.js启用调试日志:
// 设置环境变量 process.env.LOG_LEVEL = 'debug'; // 或直接使用 logger import { logger } from '@iotdb/client'; logger.setLevel('debug');
检查连接状态:
console.log('Pool size:', pool.getPoolSize()); console.log('Available:', pool.getAvailableSize()); console.log('In Use:', pool.getInUseSize());
测试查询执行时间:
const start = Date.now(); const result = await pool.executeQueryStatement('SELECT ...'); console.log(`Query took ${Date.now() - start}ms`);
详见 data-types.md 了解全面的数据类型文档。
init() - 初始化连接池close() - 关闭所有 sessionexecuteQueryStatement(sql, timeout?) - 执行查询executeNonQueryStatement(sql) - 执行 DDL/DMLinsertTablet(tablet) - 批量插入getSession() - 从连接池获取 sessionreleaseSession(session) - 将 session 返回连接池getPoolSize() - 总 session 数getAvailableSize() - 可用 session 数getInUseSize() - 活动 session 数版本: 1.0.0
最后更新: 2024年1月
许可证: Apache License 2.0