blob: 3b67d93559c307a80ab0bc5220b75db8912dd966 [file] [view]
# Apache IoTDB Node.js Client - 树模型用户指南
> **版本**: 1.0.0
> **最后更新**: 2024
## 目录
- [1. 简介](#1-简介)
- [2. 安装](#2-安装)
- [3. 快速入门](#3-快速入门)
- [4. SessionPool API](#4-sessionpool-api)
- [5. 配置构建器](#5-配置构建器)
- [6. 数据类型](#6-数据类型)
- [7. 代码示例](#7-代码示例)
- [8. 最佳实践](#8-最佳实践)
- [9. 故障排查](#9-故障排查)
## 1. 简介
### 1.1 概述
Apache IoTDB Node.js Client 为树模型(时间序列数据模型)提供了原生支持,使用分层设备路径实现时间序列数据的高效管理。本指南涵盖了树模型操作的 SessionPool API,提供连接池和高并发支持。
### 1.2 树模型特性
IoTDB 中的树模型采用分层组织数据:
- **基于路径的组织**: `root.{storage_group}.{device}.{measurement}`
- **时间序列管理**: 创建和管理具有特定数据类型的独立时间序列
- **高效批量插入**: 使用 `insertTablet` 实现高性能批量写入
- **灵活的查询**: 支持路径模式和通配符查询
- **连接池**: 为高并发场景提供 SessionPool
### 1.3 核心概念
- **存储组(Storage Group)**: 顶层数据组织单元(例如 `root.test`)
- **设备(Device)**: 生成数据的物理或逻辑实体(例如 `root.test.device1`)
- **测点(Measurement)**: 传感器或指标名称(例如 `temperature``humidity`)
- **时间序列(Timeseries)**: 带有数据类型的完整路径(例如 `root.test.device1.temperature FLOAT`)
## 2. 安装
### 2.1 从 npm 安装
```bash
npm install @iotdb/client
```
**系统要求:**
- Node.js >= 14.0.0
- Apache IoTDB >= 1.0.0
### 2.2 在项目中导入
**TypeScript:**
```typescript
import { SessionPool, PoolConfigBuilder, TreeTablet, TSDataType } from '@iotdb/client';
```
**JavaScript:**
```javascript
const { SessionPool, PoolConfigBuilder, TreeTablet, TSDataType } = require('@iotdb/client');
```
## 3. 快速入门
### 3.1 SessionPool 示例
```typescript
import { SessionPool, TreeTablet } from '@iotdb/client';
async function quickStart() {
// 创建并初始化连接池
const pool = new SessionPool('localhost', 6667, {
username: 'root',
password: 'root',
maxPoolSize: 10,
minPoolSize: 2,
});
await pool.init();
try {
// 创建存储组
await pool.executeNonQueryStatement('CREATE DATABASE root.test');
// 创建时间序列
await pool.executeNonQueryStatement(
'CREATE TIMESERIES root.test.device1.temperature WITH DATATYPE=FLOAT, ENCODING=RLE'
);
// 使用 TreeTablet 类与 addRow 插入数据
const tablet = new TreeTablet(
'root.test.device1',
['temperature'],
[3] // FLOAT
);
tablet.addRow(Date.now(), [25.5]);
await pool.insertTablet(tablet);
// 查询数据
const result = await pool.executeQueryStatement(
'SELECT temperature FROM root.test.device1'
);
console.log('Query result:', result);
console.log('Pool size:', pool.getPoolSize());
console.log('Available:', pool.getAvailableSize());
} finally {
await pool.close();
}
}
quickStart();
```
## 4. SessionPool API
### 4.1 概述
SessionPool 为高并发场景提供连接池。它自动管理多个 session,在节点间分配负载,并回收空闲连接。
**核心特性:**
- 跨多节点的轮询负载均衡
- 可配置的连接池大小(最小/最大)
- 自动空闲连接清理
- 连接池耗尽时的等待队列
- 线程安全操作
### 4.2 构造函数
#### 方式 1: 传统 API(相同端口)
```typescript
const pool = new SessionPool(
['node1', 'node2', 'node3'], // 主机列表
6667, // 端口
{
username: 'root',
password: 'root',
maxPoolSize: 20,
minPoolSize: 5,
}
);
```
#### 方式 2: 使用 nodeUrls(不同端口)
```typescript
const pool = new SessionPool({
nodeUrls: [
'node1:6667',
'node2:6668',
'node3:6669',
],
username: 'root',
password: 'root',
maxPoolSize: 20,
minPoolSize: 5,
});
```
#### 方式 3: 使用构建器模式(推荐)
```typescript
import { PoolConfigBuilder } from '@iotdb/client';
const pool = new SessionPool(
new PoolConfigBuilder()
.nodeUrls(['node1:6667', 'node2:6667', 'node3:6667'])
.username('root')
.password('root')
.maxPoolSize(20)
.minPoolSize(5)
.maxIdleTime(60000)
.waitTimeout(60000)
.build()
);
```
### 4.3 连接池配置选项
| 选项 | 类型 | 默认值 | 说明 |
|--------|------|---------|-------------|
| `maxPoolSize` | number | `10` | 连接池中 session 的最大数量 |
| `minPoolSize` | number | `1` | 维护的最小 session 数量 |
| `maxIdleTime` | number | `60000` | 清理前的最大空闲时间(毫秒) |
| `waitTimeout` | number | `60000` | 等待可用 session 的最大时间(毫秒) |
### 4.4 方法
#### 4.4.1 连接池管理
##### `async init(): Promise<void>`
初始化连接池并创建最小数量的 session
**示例:**
```typescript
await pool.init();
```
##### `async close(): Promise<void>`
关闭连接池中的所有 session 并释放资源。
**示例:**
```typescript
await pool.close();
```
#### 4.4.2 自动 Session 管理
连接池会自动为这些操作获取和释放 session:
##### `async executeQueryStatement(sql: string, timeoutMs?: number): Promise<QueryResult>`
使用连接池中的可用 session 执行查询。
**示例:**
```typescript
const result = await pool.executeQueryStatement('SELECT * FROM root.test.**');
```
##### `async executeNonQueryStatement(sql: string): Promise<void>`
使用可用 session 执行非查询语句。
**示例:**
```typescript
await pool.executeNonQueryStatement('CREATE DATABASE root.test');
```
##### `async insertTablet(tablet: Tablet): Promise<void>`
使用可用 session 插入数据。
**示例:**
```typescript
await pool.insertTablet({
deviceId: 'root.test.device1',
measurements: ['temperature'],
dataTypes: [3],
timestamps: [Date.now()],
values: [[25.5]],
});
```
#### 4.4.3 显式 Session 管理
对于同一 session 上的多个操作:
##### `async getSession(): Promise<Session>`
从连接池获取一个 session。使用后**必须**释放。
**示例:**
```typescript
const session = await pool.getSession();
try {
await session.executeNonQueryStatement('CREATE DATABASE root.test');
await session.insertTablet({ /* data */ });
const result = await session.executeQueryStatement('SELECT ...');
} finally {
pool.releaseSession(session);
}
```
##### `releaseSession(session: Session): void`
session 释放回连接池。
**示例:**
```typescript
pool.releaseSession(session);
```
#### 4.4.4 连接池统计
##### `getPoolSize(): number`
返回连接池中当前 session 的总数。
##### `getAvailableSize(): number`
返回可用(空闲) session 的数量。
##### `getInUseSize(): number`
返回当前正在使用的 session 数量。
**示例:**
```typescript
console.log(`Total: ${pool.getPoolSize()}`);
console.log(`Available: ${pool.getAvailableSize()}`);
console.log(`In Use: ${pool.getInUseSize()}`);
```
## 5. 配置构建器
### 5.1 PoolConfigBuilder
用于构建 SessionPool 配置的流式 API
**可用方法:**
- `host(host: string): this`
- `port(port: number): this`
- `nodeUrls(urls: string[]): this`
- `username(username: string): this`
- `password(password: string): this`
- `database(database: string): this`
- `timezone(timezone: string): this`
- `fetchSize(size: number): this`
- `enableSSL(enable: boolean): this`
- `sslOptions(options: SSLOptions): this`
- `maxPoolSize(size: number): this`
- `minPoolSize(size: number): this`
- `maxIdleTime(time: number): this`
- `waitTimeout(timeout: number): this`
- `build(): PoolConfig`
**示例:**
```typescript
const poolConfig = new PoolConfigBuilder()
.nodeUrls(['node1:6667', 'node2:6667'])
.username('root')
.password('root')
.maxPoolSize(20)
.minPoolSize(5)
.maxIdleTime(60000)
.waitTimeout(60000)
.build();
const pool = new SessionPool(poolConfig);
```
## 6. 数据类型
### 6.1 支持的数据类型
树模型支持所有 IoTDB 数据类型:
| 代码 | 类型 | JavaScript 类型 | 说明 |
|------|------|-----------------|-------------|
| 0 | BOOLEAN | boolean | True false |
| 1 | INT32 | number | 32 位有符号整数 |
| 2 | INT64 | number/string | 64 位有符号整数(大值使用字符串) |
| 3 | FLOAT | number | 32 位浮点数 |
| 4 | DOUBLE | number | 64 位浮点数 |
| 5 | TEXT | string | UTF-8 字符串 |
| 8 | TIMESTAMP | number/Date | 自纪元以来的毫秒数 |
| 9 | DATE | number/Date | 自纪元以来的天数 |
| 10 | BLOB | Buffer | 二进制数据 |
| 11 | STRING | string | TEXT 相同 |
### 6.2 在 insertTablet 中使用数据类型
**多类型示例:**
```typescript
await pool.insertTablet({
deviceId: 'root.test.sensor1',
measurements: ['temp', 'humidity', 'status', 'description', 'reading_time'],
dataTypes: [3, 4, 0, 5, 8], // FLOAT, DOUBLE, BOOLEAN, TEXT, TIMESTAMP
timestamps: [Date.now()],
values: [[
25.5, // FLOAT
60.123456, // DOUBLE
true, // BOOLEAN
'Normal operation', // TEXT
Date.now(), // TIMESTAMP
]],
});
```
### 6.3 处理 INT64
对于大于 JavaScript 安全整数范围(2^53 - 1)的 INT64 值,使用字符串:
```typescript
await pool.insertTablet({
deviceId: 'root.test.device1',
measurements: ['largeCounter'],
dataTypes: [2], // INT64
timestamps: [Date.now()],
values: [['9223372036854775807']], // 大值 INT64 使用字符串
});
```
## 7. 代码示例
### 7.1 完整的 CRUD 示例
```typescript
import { SessionPool } from '@iotdb/client';
async function crudExample() {
const pool = new SessionPool('localhost', 6667, {
username: 'root',
password: 'root',
maxPoolSize: 10,
minPoolSize: 2,
});
await pool.init();
try {
// 创建(CREATE)
await pool.executeNonQueryStatement('CREATE DATABASE root.factory');
await pool.executeNonQueryStatement(
'CREATE TIMESERIES root.factory.workshop1.temperature WITH DATATYPE=FLOAT'
);
// 插入(INSERT)
await pool.insertTablet({
deviceId: 'root.factory.workshop1',
measurements: ['temperature'],
dataTypes: [3],
timestamps: [Date.now() - 3000, Date.now() - 2000, Date.now() - 1000],
values: [[25.5], [26.0], [25.8]],
});
// 读取(READ)
const result = await pool.executeQueryStatement(
'SELECT temperature FROM root.factory.workshop1'
);
console.log('Temperature readings:', result);
// 更新(UPDATE) - 删除并重新插入
await pool.executeNonQueryStatement(
`DELETE FROM root.factory.workshop1.temperature WHERE time <= ${Date.now() - 2500}`
);
// 删除(DELETE)
await pool.executeNonQueryStatement('DELETE DATABASE root.factory');
} finally {
await pool.close();
}
}
crudExample();
```
### 7.2 多节点 SessionPool 示例
```typescript
import { SessionPool, PoolConfigBuilder } from '@iotdb/client';
async function multiNodeExample() {
const pool = new SessionPool(
new PoolConfigBuilder()
.nodeUrls([
'iotdb-node1:6667',
'iotdb-node2:6667',
'iotdb-node3:6667',
])
.username('root')
.password('root')
.maxPoolSize(30)
.minPoolSize(10)
.build()
);
await pool.init();
try {
// 模拟并发操作
const operations = [];
for (let i = 0; i < 100; i++) {
operations.push(
pool.insertTablet({
deviceId: `root.test.device${i % 10}`,
measurements: ['value'],
dataTypes: [3],
timestamps: [Date.now()],
values: [[Math.random() * 100]],
})
);
}
await Promise.all(operations);
console.log('All operations completed');
// 连接池统计
console.log('Pool Statistics:');
console.log(` Total Sessions: ${pool.getPoolSize()}`);
console.log(` Available: ${pool.getAvailableSize()}`);
console.log(` In Use: ${pool.getInUseSize()}`);
} finally {
await pool.close();
}
}
multiNodeExample();
```
### 7.3 时间范围查询示例
```typescript
async function timeRangeQuery(pool: SessionPool) {
const now = Date.now();
const hourAgo = now - 3600000;
const result = await pool.executeQueryStatement(
`SELECT temperature, humidity FROM root.test.** WHERE time >= ${hourAgo} AND time <= ${now}`
);
console.log('Query result:', result);
return result;
}
```
### 7.4 多设备批量插入
```typescript
async function batchInsertMultipleDevices(pool: SessionPool) {
const devices = ['device1', 'device2', 'device3'];
const timestamps = [];
const now = Date.now();
// 生成最近 10 分钟的时间戳
for (let i = 0; i < 600; i++) {
timestamps.push(now - (600 - i) * 1000);
}
for (const device of devices) {
const values = timestamps.map(() => [
20 + Math.random() * 10, // temperature
50 + Math.random() * 30, // humidity
]);
await pool.insertTablet({
deviceId: `root.test.${device}`,
measurements: ['temperature', 'humidity'],
dataTypes: [3, 3],
timestamps,
values,
});
}
console.log(`Inserted ${timestamps.length} records for ${devices.length} devices`);
}
```
## 8. 最佳实践
### 8.1 资源管理
**始终关闭资源:**
```typescript
// SessionPool
try {
await pool.init();
// ... 操作
} finally {
await pool.close();
}
```
### 8.2 批量插入
**优化批量大小:**
- 使用 `insertTablet` 而不是单条插入
- 每个 tablet 批量 100-1000
- 考虑内存与网络的权衡
**示例:**
```typescript
// 好的做法: 批量插入
await pool.insertTablet({
deviceId: 'root.test.device1',
measurements: ['temperature'],
dataTypes: [3],
timestamps: timestamps, // 100-1000 个时间戳
values: values, // 100-1000 个值
});
// 不好的做法: 单条插入
for (let i = 0; i < 1000; i++) {
await pool.executeNonQueryStatement(
`INSERT INTO root.test.device1(timestamp, temperature) VALUES(${timestamps[i]}, ${values[i]})`
);
}
```
### 8.3 错误处理
```typescript
try {
await pool.init();
await pool.executeNonQueryStatement('CREATE DATABASE root.test');
} catch (error) {
if (error.message.includes('already exists')) {
console.log('Database already exists, continuing...');
} else {
console.error('Failed to create database:', error);
throw error;
}
} finally {
await pool.close();
}
```
### 8.4 连接池大小设置
**指导原则:**
- `minPoolSize` 设置为平均并发负载
- `maxPoolSize` 设置为峰值负载 + 缓冲(20-30%)
- 在生产环境中监控连接池统计
- 根据服务器容量调整
**示例:**
```typescript
const pool = new SessionPool({
nodeUrls: ['localhost:6667'],
maxPoolSize: 50, // 峰值负载: 40 个客户端 + 25% 缓冲
minPoolSize: 20, // 平均负载: 20 个客户端
maxIdleTime: 60000, // 空闲 1 分钟后清理
waitTimeout: 30000, // 最多等待 30 秒获取可用 session
});
```
## 9. 故障排查
### 9.1 常见问题
#### 连接被拒绝
**症状:**
```
Error: connect ECONNREFUSED 127.0.0.1:6667
```
**解决方案:**
1. 验证 IoTDB 正在运行: `jps | grep IoTDB`
2. 检查 `iotdb-datanode.properties` 中的端口配置
3. 验证防火墙允许连接
4. 使用 telnet 测试: `telnet localhost 6667`
#### 连接池超时
**症状:**
```
Error: Timeout waiting for available session
```
**解决方案:**
1. 增加连接池配置中的 `waitTimeout`
2. 如果服务器能够处理,增加 `maxPoolSize`
3. 验证 session 正确释放
4. 检查连接泄漏(忘记 `releaseSession()`)
#### 内存不足
**症状:**
```
FATAL ERROR: Reached heap limit
```
**解决方案:**
1. 减少连接池配置中的 `fetchSize`
2. 分批处理查询结果
3. 增加 Node.js 堆: `node --max-old-space-size=4096 app.js`
### 9.2 调试技巧
**启用调试日志:**
```typescript
// 设置环境变量
process.env.LOG_LEVEL = 'debug';
// 或直接使用 logger
import { logger } from '@iotdb/client';
logger.setLevel('debug');
```
**检查连接状态:**
```typescript
console.log('Pool size:', pool.getPoolSize());
console.log('Available:', pool.getAvailableSize());
console.log('In Use:', pool.getInUseSize());
```
**测试查询执行时间:**
```typescript
const start = Date.now();
const result = await pool.executeQueryStatement('SELECT ...');
console.log(`Query took ${Date.now() - start}ms`);
```
### 9.3 性能优化
1. **启用连接池**: 用于并发操作
2. **批量插入**: 使用 insertTablet,100-1000
3. **多节点设置**: 在节点间分配负载
4. **监控资源**: 关注 CPU、内存、网络
5. **调整连接池大小**: 根据工作负载设置最小/最大连接池大小
### 9.4 获取帮助
- **文档**: [IoTDB Docs](https://iotdb.apache.org/)
- **GitHub Issues**: [报告问题](https://github.com/CritasWang/@iotdb/client/issues)
- **邮件列表**: dev@iotdb.apache.org
## 附录 A: 完整类型参考
详见 [data-types.md](data-types.md) 了解全面的数据类型文档。
## 附录 B: API 快速参考
### SessionPool 方法
- `init()` - 初始化连接池
- `close()` - 关闭所有 session
- `executeQueryStatement(sql, timeout?)` - 执行查询
- `executeNonQueryStatement(sql)` - 执行 DDL/DML
- `insertTablet(tablet)` - 批量插入
- `getSession()` - 从连接池获取 session
- `releaseSession(session)` - session 返回连接池
- `getPoolSize()` - session
- `getAvailableSize()` - 可用 session
- `getInUseSize()` - 活动 session
---
**版本**: 1.0.0
**最后更新**: 20241
**许可证**: Apache License 2.0