用于 Apache IoTDB 的 Node.js 客户端,支持 SessionPool 和 TableSessionPool,提供高效的连接管理和全面的查询功能。
Apache IoTDB Node.js 客户端是一个高性能、功能丰富的客户端库,用于与 Apache IoTDB 交互。Apache IoTDB 是一个专为 IoT 数据管理设计的时序数据库。该客户端提供了树模型(时间序列)和表模型(关系型)API,实现灵活的数据管理策略。
npm install @iotdb/client
import { Session } from '@iotdb/client'; const session = new Session({ host: 'localhost', port: 6667, username: 'root', password: 'root', }); await session.open(); // 执行非查询语句 await session.executeNonQueryStatement('CREATE DATABASE root.test'); // 使用 SessionDataSet 执行查询(迭代器模式 - 内存高效) const dataSet = await session.executeQueryStatement('SELECT * FROM root.test.**'); while (await dataSet.hasNext()) { const row = dataSet.next(); console.log(row.getTimestamp(), row.getFields()); } await dataSet.close(); // 或使用 toArray() 辅助方法处理小型结果集(将所有数据加载到内存) const dataSet2 = await session.executeQueryStatement('SHOW DATABASES'); const allRows = await dataSet2.toArray(); // 返回 [[timestamp, ...fields], ...] console.log('所有行:', allRows); // 插入 Tablet 数据 await session.insertTablet({ deviceId: 'root.test.device1', measurements: ['temperature', 'humidity'], dataTypes: [3, 3], // FLOAT timestamps: [Date.now(), Date.now() + 1000], values: [ [25.5, 60.0], [26.0, 61.5], ], }); await session.close();
构建器模式提供更优雅和流畅的配置 API:
import { Session, ConfigBuilder } from '@iotdb/client'; // 构建会话配置 const session = new Session( new ConfigBuilder() .host('localhost') .port(6667) .username('root') .password('root') .fetchSize(1024) .timezone('UTC+8') .build() ); await session.open(); // ... 使用会话 await session.close();
import { SessionPool } from '@iotdb/client'; const pool = new SessionPool('localhost', 6667, { username: 'root', password: 'root', maxPoolSize: 10, minPoolSize: 2, maxIdleTime: 60000, waitTimeout: 60000, }); await pool.init(); // 使用连接池执行查询 const result = await pool.executeQueryStatement('SELECT * FROM root.test.**'); // 执行非查询语句 await pool.executeNonQueryStatement( 'CREATE TIMESERIES root.test.device1.temperature WITH DATATYPE=FLOAT' ); // 插入数据 await pool.insertTablet({ deviceId: 'root.test.device1', measurements: ['temperature'], dataTypes: [3], // FLOAT timestamps: [Date.now()], values: [[25.5]], }); // 获取连接池统计信息 console.log('连接池大小:', pool.getPoolSize()); console.log('可用连接:', pool.getAvailableSize()); await pool.close();
当节点具有不同的 host:port 组合时,使用字符串数组格式的 nodeUrls 配置:
import { SessionPool, PoolConfigBuilder } from '@iotdb/client'; // 使用字符串数组的配置对象(推荐) const pool1 = new SessionPool({ nodeUrls: [ 'node1.example.com:6667', 'node2.example.com:6668', 'node3.example.com:6669', ], username: 'root', password: 'root', maxPoolSize: 15, minPoolSize: 3, }); // 或使用字符串数组的构建器模式 const pool2 = new SessionPool( new PoolConfigBuilder() .nodeUrls([ 'node1.example.com:6667', 'node2.example.com:6668', 'node3.example.com:6669', ]) .username('root') .password('root') .maxPoolSize(15) .minPoolSize(3) .build() ); await pool1.init(); // 连接将使用轮询方式分布在所有节点上
import { Session } from '@iotdb/client'; import * as fs from 'fs'; const session = new Session({ host: 'localhost', port: 6667, username: 'root', password: 'root', enableSSL: true, sslOptions: { ca: fs.readFileSync('/path/to/ca.crt'), cert: fs.readFileSync('/path/to/client.crt'), key: fs.readFileSync('/path/to/client.key'), rejectUnauthorized: true, }, }); await session.open();
import { TableSessionPool } from '@iotdb/client'; const tablePool = new TableSessionPool('localhost', 6667, { username: 'root', password: 'root', database: 'my_database', // 为表模型设置默认数据库 maxPoolSize: 10, minPoolSize: 2, }); await tablePool.init(); // 在表模式下执行查询 const result = await tablePool.executeQueryStatement('SHOW DATABASES'); await tablePool.close();
IoTDB Node.js 客户端采用三层架构设计,针对单会话和高并发场景进行了优化:
┌─────────────────────────────────────────────────────┐
│ 应用层(您的代码) │
└─────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────┐
│ 连接池层 │
│ ┌──────────────────┐ ┌──────────────────────┐ │
│ │ SessionPool │ │ TableSessionPool │ │
│ │ - 负载均衡 │ │ - 数据库上下文 │ │
│ │ - 连接池管理 │ │ - 连接池管理 │ │
│ └──────────────────┘ └──────────────────────┘ │
└─────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────┐
│ 会话层 │
│ ┌──────────────────────────────────────────────┐ │
│ │ Session │ │
│ │ - 查询 / 非查询 │ │
│ │ - InsertTablet │ │
│ │ - 结果解析 │ │
│ └──────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────┐
│ 连接层 │
│ ┌──────────────────────────────────────────────┐ │
│ │ Connection (Thrift) │ │
│ │ - TCP/SSL 传输 │ │
│ │ - 会话生命周期 │ │
│ │ - 低级协议 │ │
│ └──────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────┘
↓
Apache IoTDB
src/connection/Connection.ts)src/client/Session.ts)executeQueryStatement()、executeNonQueryStatement()、insertTablet()src/client/BaseSessionPool.ts、SessionPool.ts、TableSessionPool.ts)src/utils/Config.ts)1. 应用调用 pool.executeQueryStatement() 2. 池获取可用的 Session(轮询) 3. Session 通过 Connection 向 IoTDB 发送查询 4. IoTDB 返回带有 queryId 的 SessionDataSet 5. SessionDataSet 批量获取数据(fetchSize) 6. 应用使用 hasNext()/next() 迭代结果 7. Session 释放回池 8. SessionDataSet.close() 释放服务器资源
1. 应用调用 pool.insertTablet() 2. 池获取可用的 Session(轮询) 3. Session 按列序列化 Tablet 数据 4. 通过 Connection 将数据发送到 IoTDB 5. IoTDB 确认写入 6. Session 释放回池
客户端使用 Apache Thrift 进行 RPC 通信:
src/thrift/generated/ 来自 IoTDB 的 .thrift 文件close() 以释放服务器资源状态:✅ 已完全实现
客户端现在支持多节点 IoTDB 集群的自动写入重定向。当写入操作发送到不拥有设备数据的节点时,服务器会响应重定向建议(状态码 400)。客户端会自动:
优势:
配置:
import { SessionPool, TableSessionPool } from '@iotdb/client'; // 带重定向的树模型连接池 const treePool = new SessionPool({ nodeUrls: ['192.168.1.100:6667', '192.168.1.101:6667', '192.168.1.102:6667'], username: 'root', password: 'root', maxPoolSize: 10, enableRedirection: true, // 启用重定向(默认:true) redirectCacheTTL: 300000, // 缓存 TTL(毫秒)(默认:5 分钟) }); // 带重定向的表模型连接池 const tablePool = new TableSessionPool({ nodeUrls: ['192.168.1.100:6667', '192.168.1.101:6667', '192.168.1.102:6667'], enableRedirection: true, });
工作原理:
// 第一次写入设备 - 服务器返回重定向建议 const tablet = { deviceId: 'root.sg.device1', measurements: ['temperature'], dataTypes: [TSDataType.FLOAT], timestamps: [Date.now()], values: [[25.5]], }; await pool.insertTablet(tablet); // → 写入到节点 A(通过轮询) // → 写入成功! // → 服务器响应代码 400:"建议未来使用节点 B 处理此设备" // → 客户端缓存:device1 → 节点 B(为下次写入做准备) // 第二次写入同一设备 - 使用缓存的端点 await pool.insertTablet({ deviceId: 'root.sg.device1', measurements: ['temperature'], dataTypes: [TSDataType.FLOAT], timestamps: [Date.now() + 1000], values: [[26.0]], }); // → 客户端检查缓存:device1 → 节点 B // → 直接写入节点 B // → 无需重定向!
测试:
重定向支持已通过 1C3D(1 个 ConfigNode,3 个 DataNode)集群配置进行测试。运行 E2E 测试:
# 启动 1C3D 集群 docker-compose -f docker-compose-1c3d.yml up -d # 运行重定向测试 MULTI_NODE=true npm run test:e2e
实现细节:
详细设计文档请参见 docs/redirection-design.md。
用于构建 Session 配置的流式 API:
import { ConfigBuilder } from '@iotdb/client'; const config = new ConfigBuilder() .host('localhost') .port(6667) .username('root') .password('root') .database('mydb') .timezone('UTC+8') .fetchSize(2048) .enableSSL(false) .build();
方法:
host(host: string): this - 设置主机port(port: number): this - 设置端口nodeUrls(nodeUrls: EndPoint[]): this - 设置多个节点 URLusername(username: string): this - 设置用户名password(password: string): this - 设置密码database(database: string): this - 设置数据库timezone(timezone: string): this - 设置时区fetchSize(fetchSize: number): this - 设置获取大小enableSSL(enable: boolean): this - 启用或禁用 SSLsslOptions(sslOptions: SSLOptions): this - 设置 SSL 选项build(): Config - 构建并返回配置用于构建 SessionPool 配置的流式 API(扩展 ConfigBuilder):
import { PoolConfigBuilder } from '@iotdb/client'; const config = new PoolConfigBuilder() .host('localhost') .port(6667) .username('root') .password('root') .maxPoolSize(20) .minPoolSize(5) .maxIdleTime(30000) .waitTimeout(45000) .build();
附加方法:
maxPoolSize(size: number): this - 设置最大连接池大小minPoolSize(size: number): this - 设置最小连接池大小maxIdleTime(time: number): this - 设置最大空闲时间(毫秒)waitTimeout(timeout: number): this - 设置等待超时(毫秒)build(): PoolConfig - 构建并返回连接池配置插入 Tablet 时,使用以下常量指定数据类型:
0 - BOOLEAN1 - INT322 - INT643 - FLOAT4 - DOUBLE5 - TEXT8 - TIMESTAMP9 - DATE10 - BLOB11 - STRING完整的数据类型参考,请参见 DATA_TYPES.md。
选项 1:使用配置对象
new Session(config: Config)
选项 2:使用构建器模式(推荐)
new Session(new ConfigBuilder()...build())
配置必须包括:
host 和 port 用于单节点连接nodeUrls 用于多节点连接(使用第一个节点)async open(): Promise<void> - 打开会话async close(): Promise<void> - 关闭会话async executeQueryStatement(sql: string, timeoutMs?: number): Promise<QueryResult> - 执行带可选超时的查询(默认:60000毫秒)async executeNonQueryStatement(sql: string): Promise<void> - 执行非查询语句async insertTablet(tablet: Tablet): Promise<void> - 插入 Tablet 数据isOpen(): boolean - 检查会话是否打开选项 1:传统 API(向后兼容)
new SessionPool(hosts: string | string[], port: number, config?: Partial<PoolConfig>)
选项 2:使用带 nodeUrls 的配置对象
new SessionPool(config: PoolConfig)
选项 3:使用构建器模式(推荐)
new SessionPool(new PoolConfigBuilder()...build())
连接管理:
async init(): Promise<void> - 初始化连接池async close(): Promise<void> - 关闭所有连接自动会话管理:
async executeQueryStatement(sql: string, timeoutMs?: number): Promise<QueryResult> - 执行带可选超时的查询(默认:60000毫秒)async executeNonQueryStatement(sql: string): Promise<void> - 执行非查询语句async insertTablet(tablet: Tablet): Promise<void> - 插入 Tablet 数据显式会话管理:
async getSession(): Promise<Session> - 从池中获取会话(必须释放)releaseSession(session: Session): void - 将会话释放回池连接池统计:
getPoolSize(): number - 获取当前连接池大小getAvailableSize(): number - 获取可用连接getInUseSize(): number - 获取当前正在使用的会话数与 SessionPool 相同,但针对表模型操作进行了优化。配置数据库时自动执行 USE DATABASE。所有查询方法都支持相同的超时参数(默认:60000毫秒)。
git clone https://github.com/CritasWang/@iotdb/client.git cd @iotdb/client
npm install
npm run build
项目使用两步构建过程:
esbuild:快速 TypeScript 编译
esbuild.config.js 中配置src/ 编译到 dist/ 目录tsc:类型声明生成
.d.ts 文件--emitDeclarationOnly 标志运行copy:thrift:复制生成的 Thrift 文件
.js 文件从 src/thrift/generated/ 复制到 dist/thrift/generated/require() 语句构建命令:
npm run build # 完整构建(esbuild + tsc + copy) npm run build:esbuild # 仅 esbuild 编译 npm run build:types # 仅类型声明
src/ 目录中进行更改npm run build 构建npm test 测试npm run lint 检查代码npm run format 格式化any如果需要更新到 IoTDB 的 Thrift 定义的新版本:
git clone --depth 1 https://github.com/apache/iotdb.git /tmp/iotdb
cp /tmp/iotdb/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift thrift/ cp /tmp/iotdb/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift thrift/
npm run generate:thrift
tests/
├── unit/ # 单元测试(快速,无外部依赖)
│ ├── Config.test.ts
│ ├── Logger.test.ts
│ └── ...
└── e2e/ # 端到端测试(需要 IoTDB)
├── Session.test.ts
├── SessionPool.test.ts
├── TableSessionPool.test.ts
└── ...
运行所有测试:
npm test
仅运行单元测试:
npm run test:unit
仅运行端到端测试(需要 IoTDB 实例):
export IOTDB_HOST=localhost export IOTDB_PORT=6667 export IOTDB_USER=root export IOTDB_PASSWORD=root npm run test:e2e
端到端测试需要运行中的 IoTDB 实例。您可以使用 Docker Compose:
单节点(1c1d):
docker-compose -f docker-compose-1c1d.yml up -d
3节点集群(3c3d):
docker-compose -f docker-compose-3c3d.yml up -d
停止容器:
docker-compose -f docker-compose-1c1d.yml down
describe('ConfigBuilder', () => { test('应使用所有选项构建配置', () => { const config = new ConfigBuilder() .host('localhost') .port(6667) .username('root') .password('root') .build(); expect(config.host).toBe('localhost'); expect(config.port).toBe(6667); }); });
describe('Session 端到端测试', () => { let session: Session; beforeAll(async () => { session = new Session({ host: process.env.IOTDB_HOST || 'localhost', port: parseInt(process.env.IOTDB_PORT || '6667'), username: process.env.IOTDB_USER || 'root', password: process.env.IOTDB_PASSWORD || 'root', }); await session.open(); }, 60000); // 连接60秒超时 afterAll(async () => { if (session?.isOpen()) { await session.close(); } }); test('应执行查询', async () => { if (!session.isOpen()) return; // 无连接时跳过 const dataSet = await session.executeQueryStatement('SHOW DATABASES'); const rows = await dataSet.toArray(); expect(Array.isArray(rows)).toBe(true); await dataSet.close(); }); });
当前测试覆盖:
调试单个测试:
npm run test:debug
调试端到端测试:
npm run test:e2e:debug
检查未关闭的句柄:
npm run test:e2e:check-handles
benchmark/ 目录中提供了全面的基准测试工具,用于性能测试和优化。
insertTablet API 测试时间序列数据模型insertTablet API 测试关系数据模型测试基准测试基础设施(不需要 IoTDB):
node benchmark/test-benchmark.js
运行树模型基准测试:
CLIENT_NUMBER=10 DEVICE_NUMBER=100 node benchmark/benchmark-tree.js
运行表模型基准测试:
CLIENT_NUMBER=10 DEVICE_NUMBER=100 node benchmark/benchmark-table.js
| 参数 | 默认值 | 描述 |
|---|---|---|
CLIENT_NUMBER | 10 | 并发客户端数 |
DEVICE_NUMBER | 100 | 要模拟的设备数 |
SENSOR_NUMBER | 10 | 每个设备的传感器数 |
BATCH_SIZE_PER_WRITE | 100 | 每次写入操作的数据行数 |
TOTAL_DATA_POINTS | 100000 | 要生成的总数据点 |
POOL_MAX_SIZE | 20 | 连接池中的最大连接数 |
CLIENT_NUMBER=50 \ DEVICE_NUMBER=1000 \ SENSOR_NUMBER=10 \ BATCH_SIZE_PER_WRITE=1000 \ TOTAL_DATA_POINTS=1000000 \ node benchmark/benchmark-tree.js
基准测试报告:
================================================================================ 基准测试结果 ================================================================================ [执行时间] 时长: 45.23秒 (45234毫秒) [操作] 总操作数: 1000 成功: 998 失败: 2 成功率: 99.80% [数据点] 写入的总数据点: 100,000 [吞吐量] 操作数/秒: 22.11 数据点数/秒: 2,210 [延迟 (毫秒)] 最小: 15.23毫秒 最大: 1250.45毫秒 平均: 45.23毫秒 P50 (中位数): 42.15毫秒 P90: 78.45毫秒 P95: 95.23毫秒 P99: 125.67毫秒 ================================================================================
POOL_MIN_SIZE 和 POOL_MAX_SIZE完整文档请参见 benchmark/README.md。
有关更多使用示例,请参见 examples/ 目录:
examples/basic-session.ts - 基本会话使用examples/session-pool.ts - SessionPool 使用examples/table-session-pool.ts - TableSessionPool 使用examples/multi-node.ts - 多节点配置examples/ssl-connection.ts - SSL/TLS 连接docs/ 目录中提供了全面的文档:
我们欢迎社区贡献!无论您是修复错误、添加功能、改进文档还是报告问题,我们都非常感谢您的帮助。
main 创建功能分支所有提交在合并前都需要审查:
报告错误时,请包括:
有关详细的贡献指南,请参见 CONTRIBUTING.md。
本项目遵循语义化版本控制(SemVer)并维护定期发布周期。
给定版本号 MAJOR.MINOR.PATCH:
更新版本和更新日志:
# 更新 package.json 中的版本 npm version [major|minor|patch] --no-git-tag-version # 使用发布说明更新 CHANGELOG.md # - 新功能 # - 错误修复 # - 破坏性更改 # - 弃用
运行全面测试:
# 单元测试 npm run test:unit # 端到端测试(需要 IoTDB) export IOTDB_HOST=localhost export IOTDB_PORT=6667 npm run test:e2e # 代码检查 npm run lint # 构建验证 npm run build
创建并推送版本标签:
# 提交版本更新 git add package.json CHANGELOG.md git commit -m "chore: bump version to X.Y.Z" # 创建标签 git tag -a vX.Y.Z -m "Release vX.Y.Z" # 推送到远程 git push origin main git push origin vX.Y.Z
构建和发布:
# 构建生产资源 npm run build # 发布到 npm(需要 npm 帐户) npm publish # 对于 beta/RC 版本 npm publish --tag beta
创建 GitHub 发布:
v X.Y.Z - 发布名称稳定版本发布前的测试:
# 创建 beta 版本 npm version 1.2.0-beta.1 --no-git-tag-version # 使用 beta 标签发布 npm publish --tag beta # 安装 beta 版本 npm install @iotdb/client@beta
对于关键生产问题:
Apache License 2.0
版权所有 © 2024 Apache IoTDB
根据 Apache 许可证 2.0 版(“许可证”)许可; 除非遵守许可证,否则您不得使用此文件。 您可以在以下地址获取许可证副本:
http://www.apache.org/licenses/LICENSE-2.0
除非适用法律要求或书面同意,否则根据许可证分发的软件 按“原样”分发,不提供任何明示或暗示的保证或条件。 有关许可证下的特定权限和限制,请参见许可证。