版本: 1.0.0
最后更新: 2024
Apache IoTDB Node.js Client 为表模型(关系数据模型)提供了原生支持,使用类 SQL 的表操作实现结构化数据的高效管理。本指南涵盖了表模型操作的 TableSessionPool API。
IoTDB 中的表模型采用关系格式组织数据:
USE DATABASE 进行数据库上下文管理| 方面 | 表模型 | 树模型 |
|---|---|---|
| 组织方式 | 关系表 | 分层路径 |
| 模式 | 显式表模式 | 时间序列定义 |
| 查询语言 | 标准 SQL | 带路径的 IoTDB SQL |
| 使用场景 | 结构化关系数据 | 分层 IoT 数据 |
| 数据模型 | 标签 + 属性 + 字段 | 设备 + 测点 |
npm install @iotdb/client
系统要求:
TypeScript:
import { TableSessionPool, PoolConfigBuilder, TableTablet, ColumnCategory, TSDataType } from '@iotdb/client';
JavaScript:
const { TableSessionPool, PoolConfigBuilder, TableTablet, ColumnCategory, TSDataType } = require('@iotdb/client');
import { TableSessionPool, TableTablet, ColumnCategory } from '@iotdb/client'; async function quickStart() { // 创建并初始化表 session 连接池 const pool = new TableSessionPool('localhost', 6667, { username: 'root', password: 'root', database: 'test_db', // 可选: 设置默认数据库 maxPoolSize: 10, minPoolSize: 2, }); await pool.open(); try { // 创建数据库 await pool.executeNonQueryStatement('CREATE DATABASE test_db'); // 使用数据库 await pool.executeNonQueryStatement('USE test_db'); // 创建表 await pool.executeNonQueryStatement(` CREATE TABLE sensor_data ( region_id STRING TAG, device_id STRING TAG, model STRING ATTRIBUTE, temperature FLOAT FIELD, humidity DOUBLE FIELD ) WITH (TTL=3600000) `); // 使用 TableTablet 类与 addRow 插入数据 const tablet = new TableTablet( 'sensor_data', ['region_id', 'device_id', 'model', 'temperature', 'humidity'], [5, 5, 5, 3, 4], // STRING, STRING, STRING, FLOAT, DOUBLE [ColumnCategory.TAG, ColumnCategory.TAG, ColumnCategory.ATTRIBUTE, ColumnCategory.FIELD, ColumnCategory.FIELD] ); tablet.addRow(Date.now(), ['region1', 'device001', 'ModelA', 25.5, 60.0]); await pool.insertTablet(tablet); // 查询数据 const dataSet = await pool.executeQueryStatement(` SELECT * FROM sensor_data WHERE region_id = 'region1' AND device_id = 'device001' `); while (await dataSet.hasNext()) { const row = dataSet.next(); console.log(`Temperature: ${row.getFloat('temperature')}°C, Humidity: ${row.getDouble('humidity')}%`); } await dataSet.close(); } finally { await pool.close(); } } quickStart();
async function withDatabaseContext() { // 创建预配置了数据库的连接池 const pool = new TableSessionPool('localhost', 6667, { username: 'root', password: 'root', database: 'production_db', // 自动执行 USE DATABASE maxPoolSize: 20, }); await pool.open(); try { // 无需显式 USE DATABASE // 已经在 'production_db' 上下文中 const dataSet = await pool.executeQueryStatement('SHOW TABLES'); while (await dataSet.hasNext()) { const row = dataSet.next(); console.log('Table:', row.getFields()); } await dataSet.close(); } finally { await pool.close(); } }
TableSessionPool 是表模型操作的专用连接池。它扩展了基础 SessionPool 功能,提供表特定功能和自动数据库上下文管理。
核心特性:
USE DATABASEconst pool = new TableSessionPool( 'localhost', // 主机 6667, // 端口 { username: 'root', password: 'root', database: 'my_database', // 可选 maxPoolSize: 20, minPoolSize: 5, } );
const pool = new TableSessionPool({ nodeUrls: [ 'node1:6667', 'node2:6668', 'node3:6669', ], username: 'root', password: 'root', database: 'my_database', maxPoolSize: 20, minPoolSize: 5, });
import { PoolConfigBuilder } from '@iotdb/client'; const pool = new TableSessionPool( new PoolConfigBuilder() .nodeUrls(['node1:6667', 'node2:6667']) .username('root') .password('root') .database('my_database') .maxPoolSize(20) .minPoolSize(5) .maxIdleTime(60000) .waitTimeout(60000) .build() );
所有 SessionPool 选项加上:
| 选项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
database | string | undefined | 表操作的默认数据库 |
async open(enableRpcCompression?: boolean): Promise<void>打开连接池。可选择启用 RPC 压缩。
参数:
enableRpcCompression: 启用 RPC 压缩(默认: false)示例:
// 不启用压缩打开 await pool.open(); // 启用压缩打开 await pool.open(true);
async close(): Promise<void>关闭连接池中的所有 session。
示例:
await pool.close();
async executeQueryStatement(sql: string, timeoutMs?: number): Promise<SessionDataSet>执行 SQL 查询语句。
参数:
sql: SQL 查询语句timeoutMs: 查询超时时间(毫秒,默认: 60000)返回值: 用于遍历结果的 SessionDataSet
示例:
const dataSet = await pool.executeQueryStatement(` SELECT temperature, humidity FROM sensor_data WHERE region_id = 'region1' LIMIT 100 `); while (await dataSet.hasNext()) { const row = dataSet.next(); console.log(row.getTimestamp(), row.getFloat('temperature')); } await dataSet.close();
async executeNonQueryStatement(sql: string): Promise<void>执行 DDL 或 DML 语句。
参数:
sql: SQL 语句示例:
// 创建数据库 await pool.executeNonQueryStatement('CREATE DATABASE my_db'); // 使用数据库 await pool.executeNonQueryStatement('USE my_db'); // 创建表 await pool.executeNonQueryStatement(` CREATE TABLE devices ( device_id STRING TAG, location STRING ATTRIBUTE, value FLOAT FIELD ) `); // 删除表 await pool.executeNonQueryStatement('DROP TABLE devices'); // 删除数据库 await pool.executeNonQueryStatement('DROP DATABASE my_db');
async insertTablet(tablet: TableTablet | ITableTablet): Promise<void>使用 tablet 格式向表中插入数据。
参数:
tablet: TableTablet 对象或包含表数据的普通对象TableTablet 接口 (用于普通对象):
interface ITableTablet { tableName: string; // 表名 columnNames: string[]; // 列名 columnTypes: number[]; // 数据类型代码 (TSDataType) columnCategories: ColumnCategory[]; // 列类别 timestamps: number[]; // 时间戳(毫秒) values: any[][]; // 二维数组: [行][列] }
ColumnCategory 枚举:
enum ColumnCategory { TAG = 0, // 标签列 - 用于 WHERE 子句筛选的索引列(如 device_id、region_id) FIELD = 2, // 字段列 - 测量值(如 temperature、humidity) ATTRIBUTE = 1, // 属性列 - 未索引的元数据(如 model、firmware_version) TIME = 3, // 时间列(仅供内部使用) }
列类别说明:
TAG (0) - 用于 WHERE 子句筛选的索引列(例如 device_id、region_id)FIELD (2) - 测量值(例如 temperature、humidity)ATTRIBUTE (1) - 不用于筛选的元数据(例如 device_model、firmware_version)TIME (3) - 仅供内部使用。不要在 columnCategories 数组中使用 - 时间戳通过 timestamps 数组单独处理TableTablet 类 (带辅助方法 - 推荐):
import { TableTablet, ColumnCategory, TSDataType } from '@iotdb/client'; // 创建 tablet const tablet = new TableTablet( 'sensor_data', ['region_id', 'device_id', 'model', 'temperature', 'humidity'], [TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT, TSDataType.FLOAT, TSDataType.DOUBLE], [ColumnCategory.TAG, ColumnCategory.TAG, ColumnCategory.ATTRIBUTE, ColumnCategory.FIELD, ColumnCategory.FIELD] ); // 使用 addRow 方法逐行添加数据 tablet.addRow(Date.now(), ['region1', 'device001', 'ModelA', 25.5, 60.0]); tablet.addRow(Date.now() + 1000, ['region1', 'device001', 'ModelA', 26.0, 61.5]); tablet.addRow(Date.now() + 2000, ['region1', 'device002', 'ModelB', 24.8, 58.5]); // 插入 tablet await pool.insertTablet(tablet);
替代方案: 普通对象方法 (仍支持):
import { ColumnCategory, TSDataType } from '@iotdb/client'; await pool.insertTablet({ tableName: 'sensor_data', columnNames: ['region_id', 'device_id', 'model', 'temperature', 'humidity'], columnTypes: [TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT, TSDataType.FLOAT, TSDataType.DOUBLE], columnCategories: [ ColumnCategory.TAG, // region_id - 索引标签 ColumnCategory.TAG, // device_id - 索引标签 ColumnCategory.ATTRIBUTE, // model - 元数据 ColumnCategory.FIELD, // temperature - 测量值 ColumnCategory.FIELD, // humidity - 测量值 ], timestamps: [ Date.now(), Date.now() + 1000, Date.now() + 2000, ], values: [ ['region1', 'device001', 'ModelA', 25.5, 60.0], ['region1', 'device001', 'ModelA', 26.0, 61.5], ['region1', 'device002', 'ModelB', 24.8, 58.5], ], });
使用数字值的示例(也支持):
await pool.insertTablet({ tableName: 'sensor_data', columnNames: ['region_id', 'device_id', 'model', 'temperature', 'humidity'], columnTypes: [5, 5, 5, 3, 4], // TEXT, TEXT, TEXT, FLOAT, DOUBLE columnCategories: [0, 0, 1, 2, 2], // TAG, TAG, ATTRIBUTE, FIELD, FIELD timestamps: [Date.now()], values: [['region1', 'device001', 'ModelA', 25.5, 60.0]], });
TableTablet 类的优势:
addRow() 方法简化逐行添加数据PoolConfigBuilder 用于创建 TableSessionPool 配置。
可用方法:
host(host: string): thisport(port: number): thisnodeUrls(urls: string[]): thisusername(username: string): thispassword(password: string): thisdatabase(database: string): this - 对表模型很重要timezone(timezone: string): thisfetchSize(size: number): thismaxPoolSize(size: number): thisminPoolSize(size: number): thismaxIdleTime(time: number): thiswaitTimeout(timeout: number): thisenableSSL(enable: boolean): thissslOptions(options: SSLOptions): thisbuild(): PoolConfig示例:
const config = new PoolConfigBuilder() .nodeUrls(['iotdb1:6667', 'iotdb2:6667', 'iotdb3:6667']) .username('root') .password('root') .database('production_db') .fetchSize(2048) .maxPoolSize(30) .minPoolSize(10) .maxIdleTime(60000) .waitTimeout(60000) .build(); const pool = new TableSessionPool(config); await pool.open();
表模型支持所有 IoTDB 数据类型:
| 代码 | 类型 | JavaScript 类型 | 在表模型中的使用 |
|---|---|---|---|
| 0 | BOOLEAN | boolean | 标签、属性、字段 |
| 1 | INT32 | number | 标签、属性、字段 |
| 2 | INT64 | number/string | 标签、属性、字段 |
| 3 | FLOAT | number | 属性、字段 |
| 4 | DOUBLE | number | 属性、字段 |
| 5 | TEXT | string | 标签、属性、字段 |
| 8 | TIMESTAMP | number/Date | 字段 |
| 9 | DATE | number/Date | 字段 |
| 10 | BLOB | Buffer | 字段 |
| 11 | STRING | string | 标签、属性、字段 |
| 代码 | 类别 | 用途 | 是否索引 | 使用场景 |
|---|---|---|---|---|
| 0 | TAG | 标识符 | 是 | 在 WHERE 子句中用于过滤 |
| 1 | ATTRIBUTE | 元数据 | 否 | 描述性信息 |
| 2 | FIELD | 测量值 | 否 | 实际传感器/测量值 |
混合类型示例:
await pool.insertTablet({ tableName: 'equipment_metrics', columnNames: [ 'factory_id', // TAG 'equipment_id', // TAG 'manufacturer', // ATTRIBUTE 'model', // ATTRIBUTE 'temperature', // FIELD 'pressure', // FIELD 'is_active', // FIELD 'last_check', // FIELD ], columnTypes: [5, 5, 5, 5, 3, 4, 0, 8], // STRING, STRING, STRING, STRING, FLOAT, DOUBLE, BOOLEAN, TIMESTAMP columnCategories: [0, 0, 1, 1, 2, 2, 2, 2], // TAG, TAG, ATTR, ATTR, FIELD, FIELD, FIELD, FIELD timestamps: [Date.now()], values: [[ 'factory01', 'equip123', 'ManufacturerA', 'ModelX', 75.5, 101.325, true, Date.now(), ]], });
import { TableSessionPool, PoolConfigBuilder } from '@iotdb/client'; async function setupDatabase() { const pool = new TableSessionPool( new PoolConfigBuilder() .host('localhost') .port(6667) .username('root') .password('root') .maxPoolSize(10) .build() ); await pool.open(); try { // 创建数据库 await pool.executeNonQueryStatement('CREATE DATABASE iot_platform'); // 使用数据库 await pool.executeNonQueryStatement('USE iot_platform'); // 创建带 TTL 的表 await pool.executeNonQueryStatement(` CREATE TABLE sensor_readings ( region_id STRING TAG, building_id STRING TAG, floor INT32 TAG, device_id STRING TAG, device_type STRING ATTRIBUTE, location STRING ATTRIBUTE, temperature FLOAT FIELD, humidity FLOAT FIELD, co2_level INT32 FIELD, timestamp TIMESTAMP FIELD ) WITH (TTL=7776000000) `); console.log('Database and table created successfully'); // 显示表 const dataSet = await pool.executeQueryStatement('SHOW TABLES'); console.log('Tables in database:'); while (await dataSet.hasNext()) { console.log(dataSet.next().getFields()); } await dataSet.close(); } finally { await pool.close(); } } setupDatabase();
async function batchInsert(pool: TableSessionPool) { const regionIds = ['north', 'south', 'east', 'west']; const deviceIds = ['dev001', 'dev002', 'dev003']; const timestamps = []; const values = []; const now = Date.now(); // 生成 100 条记录 for (let i = 0; i < 100; i++) { timestamps.push(now + i * 1000); const region = regionIds[i % regionIds.length]; const device = deviceIds[i % deviceIds.length]; values.push([ region, // region_id (TAG) device, // device_id (TAG) 'SensorModelA', // model (ATTRIBUTE) 20 + Math.random() * 10, // temperature (FIELD) 50 + Math.random() * 30, // humidity (FIELD) ]); } await pool.insertTablet({ tableName: 'sensor_readings', columnNames: ['region_id', 'device_id', 'model', 'temperature', 'humidity'], columnTypes: [5, 5, 5, 3, 3], columnCategories: [0, 0, 1, 2, 2], timestamps, values, }); console.log(`Inserted ${timestamps.length} records`); }
async function queryWithFilters(pool: TableSessionPool) { // 按 TAG 查询(已索引,高效) const dataSet = await pool.executeQueryStatement(` SELECT device_id, temperature, humidity, timestamp FROM sensor_readings WHERE region_id = 'north' AND device_id IN ('dev001', 'dev002') AND temperature > 25.0 ORDER BY timestamp DESC LIMIT 100 `); const results = []; while (await dataSet.hasNext()) { const row = dataSet.next(); results.push({ deviceId: row.getString('device_id'), temperature: row.getFloat('temperature'), humidity: row.getFloat('humidity'), timestamp: new Date(row.getTimestamp()), }); } await dataSet.close(); console.log(`Found ${results.length} matching records`); return results; }
async function aggregationQuery(pool: TableSessionPool) { const dataSet = await pool.executeQueryStatement(` SELECT region_id, device_id, AVG(temperature) as avg_temp, MAX(temperature) as max_temp, MIN(temperature) as min_temp, COUNT(*) as record_count FROM sensor_readings WHERE timestamp >= ${Date.now() - 3600000} GROUP BY region_id, device_id `); console.log('Aggregation Results:'); while (await dataSet.hasNext()) { const row = dataSet.next(); console.log(`Region: ${row.getString('region_id')}, Device: ${row.getString('device_id')}`); console.log(` Avg Temp: ${row.getFloat('avg_temp').toFixed(2)}°C`); console.log(` Max Temp: ${row.getFloat('max_temp').toFixed(2)}°C`); console.log(` Min Temp: ${row.getFloat('min_temp').toFixed(2)}°C`); console.log(` Records: ${row.getInt('record_count')}`); } await dataSet.close(); }
async function multiDatabaseOps(pool: TableSessionPool) { await pool.open(); try { // 创建多个数据库 await pool.executeNonQueryStatement('CREATE DATABASE production'); await pool.executeNonQueryStatement('CREATE DATABASE staging'); // 使用生产数据库 await pool.executeNonQueryStatement('USE production'); await pool.executeNonQueryStatement(` CREATE TABLE metrics ( device_id STRING TAG, value DOUBLE FIELD ) `); // 切换到测试数据库 await pool.executeNonQueryStatement('USE staging'); await pool.executeNonQueryStatement(` CREATE TABLE test_metrics ( device_id STRING TAG, value DOUBLE FIELD ) `); // 使用完全限定名跨数据库查询 const prodData = await pool.executeQueryStatement('SELECT * FROM production.metrics LIMIT 10'); const stagingData = await pool.executeQueryStatement('SELECT * FROM staging.test_metrics LIMIT 10'); await prodData.close(); await stagingData.close(); } finally { await pool.close(); } }
有效使用 TAG:
使用 ATTRIBUTE 存储元数据:
使用 FIELD 存储测量值:
示例:
// 良好的表设计 CREATE TABLE sensor_data ( region_id STRING TAG, // 已索引,用于 WHERE device_id STRING TAG, // 已索引,用于 WHERE manufacturer STRING ATTRIBUTE, // 元数据,未索引 model STRING ATTRIBUTE, // 元数据,未索引 temperature FLOAT FIELD, // 测量值 humidity FLOAT FIELD // 测量值 ) // 不良设计 - 使用 FIELD 作为标识符 CREATE TABLE sensor_data ( temperature FLOAT FIELD, humidity FLOAT FIELD, device_id STRING FIELD // 应该是 TAG! )
在 WHERE 子句中按 TAG 过滤:
// 好: 使用索引的 TAG SELECT * FROM sensors WHERE region_id = 'north' AND device_id = 'dev001' // 差: 按未索引的 FIELD 过滤 SELECT * FROM sensors WHERE temperature > 25.0 // 没有 TAG 过滤
使用适当的 LIMIT:
// 防止加载过多数据 SELECT * FROM sensors WHERE region_id = 'north' LIMIT 1000
使用时间范围过滤器:
SELECT * FROM sensors WHERE region_id = 'north' AND timestamp >= ${Date.now() - 3600000} AND timestamp <= ${Date.now()}
合理设置连接池大小:
const pool = new TableSessionPool({ nodeUrls: ['localhost:6667'], maxPoolSize: 50, // 峰值并发查询数 minPoolSize: 10, // 保持热连接 maxIdleTime: 60000, // 空闲 1 分钟后清理 waitTimeout: 30000, // 最多等待 30 秒获取连接 });
监控连接池健康状况:
setInterval(() => { console.log('Pool Stats:'); console.log(` Total: ${pool.getPoolSize()}`); console.log(` Available: ${pool.getAvailableSize()}`); console.log(` In Use: ${pool.getInUseSize()}`); }, 60000); // 每分钟
async function robustInsert(pool: TableSessionPool, data: any) { try { await pool.insertTablet(data); console.log('Insert successful'); } catch (error) { if (error.message.includes('Table does not exist')) { console.log('Creating table...'); await createTable(pool); await pool.insertTablet(data); } else if (error.message.includes('Database does not exist')) { console.log('Creating database...'); await createDatabase(pool); await createTable(pool); await pool.insertTablet(data); } else { console.error('Insert failed:', error); throw error; } } }
async function properCleanup() { const pool = new TableSessionPool('localhost', 6667, { username: 'root', password: 'root', }); await pool.open(); try { const dataSet = await pool.executeQueryStatement('SELECT * FROM table1'); try { while (await dataSet.hasNext()) { // 处理结果 } } finally { await dataSet.close(); // 始终关闭 DataSet } } finally { await pool.close(); // 始终关闭连接池 } }
症状:
Error: Database 'my_db' does not exist
解决方案:
// 先创建数据库 await pool.executeNonQueryStatement('CREATE DATABASE my_db'); await pool.executeNonQueryStatement('USE my_db'); // 或使用现有数据库配置连接池 const pool = new TableSessionPool('localhost', 6667, { database: 'my_db', // 必须存在 });
症状:
Error: Table 'my_table' does not exist
解决方案:
// 检查表是否存在 const dataSet = await pool.executeQueryStatement('SHOW TABLES'); // ... 验证表存在 // 如果需要,创建表 await pool.executeNonQueryStatement(` CREATE TABLE my_table (...) `);
症状:
Error: Column count mismatch
解决方案:
columnNames、columnTypes 和 columnCategories 长度相同values 数组与列数匹配// 验证模式 const dataSet = await pool.executeQueryStatement('DESCRIBE my_table'); while (await dataSet.hasNext()) { console.log(dataSet.next().getFields()); }
症状: 一段时间后数据自动删除
解决方案:
// 检查 TTL 设置 const dataSet = await pool.executeQueryStatement('SHOW TABLES'); // 查看表属性中的 TTL // 修改 TTL await pool.executeNonQueryStatement(` ALTER TABLE my_table SET PROPERTIES TTL=31536000000 `); // 1 年(毫秒)
查询慢:
插入慢:
启用调试日志:
process.env.LOG_LEVEL = 'debug';
检查 SQL 语法:
try { await pool.executeQueryStatement('EXPLAIN SELECT * FROM my_table'); } catch (error) { console.error('Invalid SQL:', error.message); }
监控查询执行:
const start = Date.now(); const dataSet = await pool.executeQueryStatement('SELECT ...'); console.log(`Query took ${Date.now() - start}ms`); let rowCount = 0; while (await dataSet.hasNext()) { dataSet.next(); rowCount++; } console.log(`Returned ${rowCount} rows`);
open(enableRpcCompression?) - 打开连接池close() - 关闭所有 sessionexecuteQueryStatement(sql, timeout?) - 执行 SQL 查询executeNonQueryStatement(sql) - 执行 DDL/DMLinsertTablet(tablet) - 批量插入到表getPoolSize() - 总 session 数getAvailableSize() - 可用 session 数getInUseSize() - 活动 session 数CREATE DATABASE database_nameDROP DATABASE database_nameUSE database_nameSHOW DATABASESSHOW TABLESCREATE TABLE table_name (...)DROP TABLE table_nameALTER TABLE table_name SET PROPERTIES TTL=<ms>SELECT ... FROM table_name WHERE ... LIMIT ...DESCRIBE table_name详见 data-types.md 了解全面的数据类型文档。
版本: 1.0.0
最后更新: 2024年1月
许可证: Apache License 2.0