Apache IoTDB Node.js Client - 树模型用户指南

版本: 1.0.0
最后更新: 2024

目录

1. 简介

1.1 概述

Apache IoTDB Node.js Client 为树模型(时间序列数据模型)提供了原生支持,使用分层设备路径实现时间序列数据的高效管理。本指南涵盖了树模型操作的 SessionPool API,提供连接池和高并发支持。

1.2 树模型特性

IoTDB 中的树模型采用分层组织数据:

  • 基于路径的组织: root.{storage_group}.{device}.{measurement}
  • 时间序列管理: 创建和管理具有特定数据类型的独立时间序列
  • 高效批量插入: 使用 insertTablet 实现高性能批量写入
  • 灵活的查询: 支持路径模式和通配符查询
  • 连接池: 为高并发场景提供 SessionPool

1.3 核心概念

  • 存储组(Storage Group): 顶层数据组织单元(例如 root.test)
  • 设备(Device): 生成数据的物理或逻辑实体(例如 root.test.device1)
  • 测点(Measurement): 传感器或指标名称(例如 temperaturehumidity)
  • 时间序列(Timeseries): 带有数据类型的完整路径(例如 root.test.device1.temperature FLOAT)

2. 安装

2.1 从 npm 安装

npm install @iotdb/client

系统要求:

  • Node.js >= 14.0.0
  • Apache IoTDB >= 1.0.0

2.2 在项目中导入

TypeScript:

import { SessionPool, PoolConfigBuilder, TreeTablet, TSDataType } from '@iotdb/client';

JavaScript:

const { SessionPool, PoolConfigBuilder, TreeTablet, TSDataType } = require('@iotdb/client');

3. 快速入门

3.1 SessionPool 示例

import { SessionPool, TreeTablet } from '@iotdb/client';

async function quickStart() {
  // 创建并初始化连接池
  const pool = new SessionPool('localhost', 6667, {
    username: 'root',
    password: 'root',
    maxPoolSize: 10,
    minPoolSize: 2,
  });
  
  await pool.init();
  
  try {
    // 创建存储组
    await pool.executeNonQueryStatement('CREATE DATABASE root.test');
    
    // 创建时间序列
    await pool.executeNonQueryStatement(
      'CREATE TIMESERIES root.test.device1.temperature WITH DATATYPE=FLOAT, ENCODING=RLE'
    );
    
    // 使用 TreeTablet 类与 addRow 插入数据
    const tablet = new TreeTablet(
      'root.test.device1',
      ['temperature'],
      [3] // FLOAT
    );
    tablet.addRow(Date.now(), [25.5]);
    
    await pool.insertTablet(tablet);
    
    // 查询数据
    const result = await pool.executeQueryStatement(
      'SELECT temperature FROM root.test.device1'
    );
    
    console.log('Query result:', result);
    console.log('Pool size:', pool.getPoolSize());
    console.log('Available:', pool.getAvailableSize());
  } finally {
    await pool.close();
  }
}

quickStart();

4. SessionPool API

4.1 概述

SessionPool 为高并发场景提供连接池。它自动管理多个 session,在节点间分配负载,并回收空闲连接。

核心特性:

  • 跨多节点的轮询负载均衡
  • 可配置的连接池大小(最小/最大)
  • 自动空闲连接清理
  • 连接池耗尽时的等待队列
  • 线程安全操作

4.2 构造函数

方式 1: 传统 API(相同端口)

const pool = new SessionPool(
  ['node1', 'node2', 'node3'], // 主机列表
  6667,                         // 端口
  {
    username: 'root',
    password: 'root',
    maxPoolSize: 20,
    minPoolSize: 5,
  }
);

方式 2: 使用 nodeUrls(不同端口)

const pool = new SessionPool({
  nodeUrls: [
    'node1:6667',
    'node2:6668',
    'node3:6669',
  ],
  username: 'root',
  password: 'root',
  maxPoolSize: 20,
  minPoolSize: 5,
});

方式 3: 使用构建器模式(推荐)

import { PoolConfigBuilder } from '@iotdb/client';

const pool = new SessionPool(
  new PoolConfigBuilder()
    .nodeUrls(['node1:6667', 'node2:6667', 'node3:6667'])
    .username('root')
    .password('root')
    .maxPoolSize(20)
    .minPoolSize(5)
    .maxIdleTime(60000)
    .waitTimeout(60000)
    .build()
);

4.3 连接池配置选项

选项类型默认值说明
maxPoolSizenumber10连接池中 session 的最大数量
minPoolSizenumber1维护的最小 session 数量
maxIdleTimenumber60000清理前的最大空闲时间(毫秒)
waitTimeoutnumber60000等待可用 session 的最大时间(毫秒)

4.4 方法

4.4.1 连接池管理

async init(): Promise<void>

初始化连接池并创建最小数量的 session。

示例:

await pool.init();
async close(): Promise<void>

关闭连接池中的所有 session 并释放资源。

示例:

await pool.close();

4.4.2 自动 Session 管理

连接池会自动为这些操作获取和释放 session:

async executeQueryStatement(sql: string, timeoutMs?: number): Promise<QueryResult>

使用连接池中的可用 session 执行查询。

示例:

const result = await pool.executeQueryStatement('SELECT * FROM root.test.**');
async executeNonQueryStatement(sql: string): Promise<void>

使用可用 session 执行非查询语句。

示例:

await pool.executeNonQueryStatement('CREATE DATABASE root.test');
async insertTablet(tablet: Tablet): Promise<void>

使用可用 session 插入数据。

示例:

await pool.insertTablet({
  deviceId: 'root.test.device1',
  measurements: ['temperature'],
  dataTypes: [3],
  timestamps: [Date.now()],
  values: [[25.5]],
});

4.4.3 显式 Session 管理

对于同一 session 上的多个操作:

async getSession(): Promise<Session>

从连接池获取一个 session。使用后必须释放。

示例:

const session = await pool.getSession();
try {
  await session.executeNonQueryStatement('CREATE DATABASE root.test');
  await session.insertTablet({ /* data */ });
  const result = await session.executeQueryStatement('SELECT ...');
} finally {
  pool.releaseSession(session);
}
releaseSession(session: Session): void

将 session 释放回连接池。

示例:

pool.releaseSession(session);

4.4.4 连接池统计

getPoolSize(): number

返回连接池中当前 session 的总数。

getAvailableSize(): number

返回可用(空闲) session 的数量。

getInUseSize(): number

返回当前正在使用的 session 数量。

示例:

console.log(`Total: ${pool.getPoolSize()}`);
console.log(`Available: ${pool.getAvailableSize()}`);
console.log(`In Use: ${pool.getInUseSize()}`);

5. 配置构建器

5.1 PoolConfigBuilder

用于构建 SessionPool 配置的流式 API。

可用方法:

  • host(host: string): this
  • port(port: number): this
  • nodeUrls(urls: string[]): this
  • username(username: string): this
  • password(password: string): this
  • database(database: string): this
  • timezone(timezone: string): this
  • fetchSize(size: number): this
  • enableSSL(enable: boolean): this
  • sslOptions(options: SSLOptions): this
  • maxPoolSize(size: number): this
  • minPoolSize(size: number): this
  • maxIdleTime(time: number): this
  • waitTimeout(timeout: number): this
  • build(): PoolConfig

示例:

const poolConfig = new PoolConfigBuilder()
  .nodeUrls(['node1:6667', 'node2:6667'])
  .username('root')
  .password('root')
  .maxPoolSize(20)
  .minPoolSize(5)
  .maxIdleTime(60000)
  .waitTimeout(60000)
  .build();

const pool = new SessionPool(poolConfig);

6. 数据类型

6.1 支持的数据类型

树模型支持所有 IoTDB 数据类型:

代码类型JavaScript 类型说明
0BOOLEANbooleanTrue 或 false
1INT32number32 位有符号整数
2INT64number/string64 位有符号整数(大值使用字符串)
3FLOATnumber32 位浮点数
4DOUBLEnumber64 位浮点数
5TEXTstringUTF-8 字符串
8TIMESTAMPnumber/Date自纪元以来的毫秒数
9DATEnumber/Date自纪元以来的天数
10BLOBBuffer二进制数据
11STRINGstring与 TEXT 相同

6.2 在 insertTablet 中使用数据类型

多类型示例:

await pool.insertTablet({
  deviceId: 'root.test.sensor1',
  measurements: ['temp', 'humidity', 'status', 'description', 'reading_time'],
  dataTypes: [3, 4, 0, 5, 8], // FLOAT, DOUBLE, BOOLEAN, TEXT, TIMESTAMP
  timestamps: [Date.now()],
  values: [[
    25.5,                    // FLOAT
    60.123456,              // DOUBLE
    true,                   // BOOLEAN
    'Normal operation',     // TEXT
    Date.now(),            // TIMESTAMP
  ]],
});

6.3 处理 INT64

对于大于 JavaScript 安全整数范围(2^53 - 1)的 INT64 值,使用字符串:

await pool.insertTablet({
  deviceId: 'root.test.device1',
  measurements: ['largeCounter'],
  dataTypes: [2], // INT64
  timestamps: [Date.now()],
  values: [['9223372036854775807']], // 大值 INT64 使用字符串
});

7. 代码示例

7.1 完整的 CRUD 示例

import { SessionPool } from '@iotdb/client';

async function crudExample() {
  const pool = new SessionPool('localhost', 6667, {
    username: 'root',
    password: 'root',
    maxPoolSize: 10,
    minPoolSize: 2,
  });
  
  await pool.init();
  
  try {
    // 创建(CREATE)
    await pool.executeNonQueryStatement('CREATE DATABASE root.factory');
    await pool.executeNonQueryStatement(
      'CREATE TIMESERIES root.factory.workshop1.temperature WITH DATATYPE=FLOAT'
    );
    
    // 插入(INSERT)
    await pool.insertTablet({
      deviceId: 'root.factory.workshop1',
      measurements: ['temperature'],
      dataTypes: [3],
      timestamps: [Date.now() - 3000, Date.now() - 2000, Date.now() - 1000],
      values: [[25.5], [26.0], [25.8]],
    });
    
    // 读取(READ)
    const result = await pool.executeQueryStatement(
      'SELECT temperature FROM root.factory.workshop1'
    );
    
    console.log('Temperature readings:', result);
    
    // 更新(UPDATE) - 删除并重新插入
    await pool.executeNonQueryStatement(
      `DELETE FROM root.factory.workshop1.temperature WHERE time <= ${Date.now() - 2500}`
    );
    
    // 删除(DELETE)
    await pool.executeNonQueryStatement('DELETE DATABASE root.factory');
    
  } finally {
    await pool.close();
  }
}

crudExample();

7.2 多节点 SessionPool 示例

import { SessionPool, PoolConfigBuilder } from '@iotdb/client';

async function multiNodeExample() {
  const pool = new SessionPool(
    new PoolConfigBuilder()
      .nodeUrls([
        'iotdb-node1:6667',
        'iotdb-node2:6667',
        'iotdb-node3:6667',
      ])
      .username('root')
      .password('root')
      .maxPoolSize(30)
      .minPoolSize(10)
      .build()
  );
  
  await pool.init();
  
  try {
    // 模拟并发操作
    const operations = [];
    
    for (let i = 0; i < 100; i++) {
      operations.push(
        pool.insertTablet({
          deviceId: `root.test.device${i % 10}`,
          measurements: ['value'],
          dataTypes: [3],
          timestamps: [Date.now()],
          values: [[Math.random() * 100]],
        })
      );
    }
    
    await Promise.all(operations);
    console.log('All operations completed');
    
    // 连接池统计
    console.log('Pool Statistics:');
    console.log(`  Total Sessions: ${pool.getPoolSize()}`);
    console.log(`  Available: ${pool.getAvailableSize()}`);
    console.log(`  In Use: ${pool.getInUseSize()}`);
    
  } finally {
    await pool.close();
  }
}

multiNodeExample();

7.3 时间范围查询示例

async function timeRangeQuery(pool: SessionPool) {
  const now = Date.now();
  const hourAgo = now - 3600000;
  
  const result = await pool.executeQueryStatement(
    `SELECT temperature, humidity FROM root.test.** WHERE time >= ${hourAgo} AND time <= ${now}`
  );
  
  console.log('Query result:', result);
  return result;
}

7.4 多设备批量插入

async function batchInsertMultipleDevices(pool: SessionPool) {
  const devices = ['device1', 'device2', 'device3'];
  const timestamps = [];
  const now = Date.now();
  
  // 生成最近 10 分钟的时间戳
  for (let i = 0; i < 600; i++) {
    timestamps.push(now - (600 - i) * 1000);
  }
  
  for (const device of devices) {
    const values = timestamps.map(() => [
      20 + Math.random() * 10,  // temperature
      50 + Math.random() * 30,  // humidity
    ]);
    
    await pool.insertTablet({
      deviceId: `root.test.${device}`,
      measurements: ['temperature', 'humidity'],
      dataTypes: [3, 3],
      timestamps,
      values,
    });
  }
  
  console.log(`Inserted ${timestamps.length} records for ${devices.length} devices`);
}

8. 最佳实践

8.1 资源管理

始终关闭资源:

// SessionPool
try {
  await pool.init();
  // ... 操作
} finally {
  await pool.close();
}

8.2 批量插入

优化批量大小:

  • 使用 insertTablet 而不是单条插入
  • 每个 tablet 批量 100-1000 行
  • 考虑内存与网络的权衡

示例:

// 好的做法: 批量插入
await pool.insertTablet({
  deviceId: 'root.test.device1',
  measurements: ['temperature'],
  dataTypes: [3],
  timestamps: timestamps, // 100-1000 个时间戳
  values: values,         // 100-1000 个值
});

// 不好的做法: 单条插入
for (let i = 0; i < 1000; i++) {
  await pool.executeNonQueryStatement(
    `INSERT INTO root.test.device1(timestamp, temperature) VALUES(${timestamps[i]}, ${values[i]})`
  );
}

8.3 错误处理

try {
  await pool.init();
  await pool.executeNonQueryStatement('CREATE DATABASE root.test');
} catch (error) {
  if (error.message.includes('already exists')) {
    console.log('Database already exists, continuing...');
  } else {
    console.error('Failed to create database:', error);
    throw error;
  }
} finally {
  await pool.close();
}

8.4 连接池大小设置

指导原则:

  • minPoolSize 设置为平均并发负载
  • maxPoolSize 设置为峰值负载 + 缓冲(20-30%)
  • 在生产环境中监控连接池统计
  • 根据服务器容量调整

示例:

const pool = new SessionPool({
  nodeUrls: ['localhost:6667'],
  maxPoolSize: 50,    // 峰值负载: 40 个客户端 + 25% 缓冲
  minPoolSize: 20,    // 平均负载: 20 个客户端
  maxIdleTime: 60000, // 空闲 1 分钟后清理
  waitTimeout: 30000, // 最多等待 30 秒获取可用 session
});

9. 故障排查

9.1 常见问题

连接被拒绝

症状:

Error: connect ECONNREFUSED 127.0.0.1:6667

解决方案:

  1. 验证 IoTDB 正在运行: jps | grep IoTDB
  2. 检查 iotdb-datanode.properties 中的端口配置
  3. 验证防火墙允许连接
  4. 使用 telnet 测试: telnet localhost 6667

连接池超时

症状:

Error: Timeout waiting for available session

解决方案:

  1. 增加连接池配置中的 waitTimeout
  2. 如果服务器能够处理,增加 maxPoolSize
  3. 验证 session 正确释放
  4. 检查连接泄漏(忘记 releaseSession())

内存不足

症状:

FATAL ERROR: Reached heap limit

解决方案:

  1. 减少连接池配置中的 fetchSize
  2. 分批处理查询结果
  3. 增加 Node.js 堆: node --max-old-space-size=4096 app.js

9.2 调试技巧

启用调试日志:

// 设置环境变量
process.env.LOG_LEVEL = 'debug';

// 或直接使用 logger
import { logger } from '@iotdb/client';
logger.setLevel('debug');

检查连接状态:

console.log('Pool size:', pool.getPoolSize());
console.log('Available:', pool.getAvailableSize());
console.log('In Use:', pool.getInUseSize());

测试查询执行时间:

const start = Date.now();
const result = await pool.executeQueryStatement('SELECT ...');
console.log(`Query took ${Date.now() - start}ms`);

9.3 性能优化

  1. 启用连接池: 用于并发操作
  2. 批量插入: 使用 insertTablet,100-1000 行
  3. 多节点设置: 在节点间分配负载
  4. 监控资源: 关注 CPU、内存、网络
  5. 调整连接池大小: 根据工作负载设置最小/最大连接池大小

9.4 获取帮助

附录 A: 完整类型参考

详见 data-types.md 了解全面的数据类型文档。

附录 B: API 快速参考

SessionPool 方法

  • init() - 初始化连接池
  • close() - 关闭所有 session
  • executeQueryStatement(sql, timeout?) - 执行查询
  • executeNonQueryStatement(sql) - 执行 DDL/DML
  • insertTablet(tablet) - 批量插入
  • getSession() - 从连接池获取 session
  • releaseSession(session) - 将 session 返回连接池
  • getPoolSize() - 总 session 数
  • getAvailableSize() - 可用 session 数
  • getInUseSize() - 活动 session 数

版本: 1.0.0
最后更新: 2024年1月
许可证: Apache License 2.0