Apache IoTDB Node.js Client - 表模型用户指南

版本: 1.0.0
最后更新: 2024

目录

1. 简介

1.1 概述

Apache IoTDB Node.js Client 为表模型(关系数据模型)提供了原生支持,使用类 SQL 的表操作实现结构化数据的高效管理。本指南涵盖了表模型操作的 TableSessionPool API。

1.2 表模型特性

IoTDB 中的表模型采用关系格式组织数据:

  • 基于数据库的组织: 创建和管理包含多个表的数据库
  • 表模式: 使用标签(Tag)、属性(Attribute)和字段(Field)定义表
  • SQL 操作: 使用熟悉的 SQL 语法进行查询和数据操作
  • 连接池: 为高并发场景提供内置连接池
  • 自动上下文: 使用 USE DATABASE 进行数据库上下文管理

1.3 核心概念

  • 数据库(Database): 相关表的逻辑分组
  • 表(Table): 包含列和列类别的模式定义
  • 标签(Tag): 时间序列标识符(已索引,用于 WHERE 子句)
  • 属性(Attribute): 时间序列元数据(未索引)
  • 字段(Field): 实际测量值

1.4 表模型 vs 树模型

方面表模型树模型
组织方式关系表分层路径
模式显式表模式时间序列定义
查询语言标准 SQL带路径的 IoTDB SQL
使用场景结构化关系数据分层 IoT 数据
数据模型标签 + 属性 + 字段设备 + 测点

2. 安装

2.1 从 npm 安装

npm install @iotdb/client

系统要求:

  • Node.js >= 14.0.0
  • Apache IoTDB >= 1.0.0 (支持表模型)

2.2 在项目中导入

TypeScript:

import { TableSessionPool, PoolConfigBuilder, TableTablet, ColumnCategory, TSDataType } from '@iotdb/client';

JavaScript:

const { TableSessionPool, PoolConfigBuilder, TableTablet, ColumnCategory, TSDataType } = require('@iotdb/client');

3. 快速入门

3.1 基础 TableSessionPool 示例

import { TableSessionPool, TableTablet, ColumnCategory } from '@iotdb/client';

async function quickStart() {
  // 创建并初始化表 session 连接池
  const pool = new TableSessionPool('localhost', 6667, {
    username: 'root',
    password: 'root',
    database: 'test_db', // 可选: 设置默认数据库
    maxPoolSize: 10,
    minPoolSize: 2,
  });
  
  await pool.open();
  
  try {
    // 创建数据库
    await pool.executeNonQueryStatement('CREATE DATABASE test_db');
    
    // 使用数据库
    await pool.executeNonQueryStatement('USE test_db');
    
    // 创建表
    await pool.executeNonQueryStatement(`
      CREATE TABLE sensor_data (
        region_id STRING TAG,
        device_id STRING TAG,
        model STRING ATTRIBUTE,
        temperature FLOAT FIELD,
        humidity DOUBLE FIELD
      ) WITH (TTL=3600000)
    `);
    
    // 使用 TableTablet 类与 addRow 插入数据
    const tablet = new TableTablet(
      'sensor_data',
      ['region_id', 'device_id', 'model', 'temperature', 'humidity'],
      [5, 5, 5, 3, 4], // STRING, STRING, STRING, FLOAT, DOUBLE
      [ColumnCategory.TAG, ColumnCategory.TAG, ColumnCategory.ATTRIBUTE, ColumnCategory.FIELD, ColumnCategory.FIELD]
    );
    tablet.addRow(Date.now(), ['region1', 'device001', 'ModelA', 25.5, 60.0]);
    
    await pool.insertTablet(tablet);
    
    // 查询数据
    const dataSet = await pool.executeQueryStatement(`
      SELECT * FROM sensor_data 
      WHERE region_id = 'region1' AND device_id = 'device001'
    `);
    
    while (await dataSet.hasNext()) {
      const row = dataSet.next();
      console.log(`Temperature: ${row.getFloat('temperature')}°C, Humidity: ${row.getDouble('humidity')}%`);
    }
    
    await dataSet.close();
  } finally {
    await pool.close();
  }
}

quickStart();

3.2 使用数据库上下文

async function withDatabaseContext() {
  // 创建预配置了数据库的连接池
  const pool = new TableSessionPool('localhost', 6667, {
    username: 'root',
    password: 'root',
    database: 'production_db', // 自动执行 USE DATABASE
    maxPoolSize: 20,
  });
  
  await pool.open();
  
  try {
    // 无需显式 USE DATABASE
    // 已经在 'production_db' 上下文中
    
    const dataSet = await pool.executeQueryStatement('SHOW TABLES');
    
    while (await dataSet.hasNext()) {
      const row = dataSet.next();
      console.log('Table:', row.getFields());
    }
    
    await dataSet.close();
  } finally {
    await pool.close();
  }
}

4. TableSessionPool API

4.1 概述

TableSessionPool 是表模型操作的专用连接池。它扩展了基础 SessionPool 功能,提供表特定功能和自动数据库上下文管理。

核心特性:

  • 与 SessionPool 相同的连接池功能
  • 配置数据库时自动执行 USE DATABASE
  • 带列类别的表特定 insertTablet
  • 基于 SQL 的操作
  • 轮询负载均衡

4.2 构造函数

方式 1: 传统 API(相同端口)

const pool = new TableSessionPool(
  'localhost',                    // 主机
  6667,                           // 端口
  {
    username: 'root',
    password: 'root',
    database: 'my_database',      // 可选
    maxPoolSize: 20,
    minPoolSize: 5,
  }
);

方式 2: 使用 nodeUrls(不同端口)

const pool = new TableSessionPool({
  nodeUrls: [
    'node1:6667',
    'node2:6668',
    'node3:6669',
  ],
  username: 'root',
  password: 'root',
  database: 'my_database',
  maxPoolSize: 20,
  minPoolSize: 5,
});

方式 3: 使用构建器模式(推荐)

import { PoolConfigBuilder } from '@iotdb/client';

const pool = new TableSessionPool(
  new PoolConfigBuilder()
    .nodeUrls(['node1:6667', 'node2:6667'])
    .username('root')
    .password('root')
    .database('my_database')
    .maxPoolSize(20)
    .minPoolSize(5)
    .maxIdleTime(60000)
    .waitTimeout(60000)
    .build()
);

4.3 配置选项

所有 SessionPool 选项加上:

选项类型默认值说明
databasestringundefined表操作的默认数据库

4.4 方法

4.4.1 连接管理

async open(enableRpcCompression?: boolean): Promise<void>

打开连接池。可选择启用 RPC 压缩。

参数:

  • enableRpcCompression: 启用 RPC 压缩(默认: false)

示例:

// 不启用压缩打开
await pool.open();

// 启用压缩打开
await pool.open(true);
async close(): Promise<void>

关闭连接池中的所有 session。

示例:

await pool.close();

4.4.2 查询操作

async executeQueryStatement(sql: string, timeoutMs?: number): Promise<SessionDataSet>

执行 SQL 查询语句。

参数:

  • sql: SQL 查询语句
  • timeoutMs: 查询超时时间(毫秒,默认: 60000)

返回值: 用于遍历结果的 SessionDataSet

示例:

const dataSet = await pool.executeQueryStatement(`
  SELECT temperature, humidity 
  FROM sensor_data 
  WHERE region_id = 'region1'
  LIMIT 100
`);

while (await dataSet.hasNext()) {
  const row = dataSet.next();
  console.log(row.getTimestamp(), row.getFloat('temperature'));
}

await dataSet.close();

4.4.3 非查询操作

async executeNonQueryStatement(sql: string): Promise<void>

执行 DDL 或 DML 语句。

参数:

  • sql: SQL 语句

示例:

// 创建数据库
await pool.executeNonQueryStatement('CREATE DATABASE my_db');

// 使用数据库
await pool.executeNonQueryStatement('USE my_db');

// 创建表
await pool.executeNonQueryStatement(`
  CREATE TABLE devices (
    device_id STRING TAG,
    location STRING ATTRIBUTE,
    value FLOAT FIELD
  )
`);

// 删除表
await pool.executeNonQueryStatement('DROP TABLE devices');

// 删除数据库
await pool.executeNonQueryStatement('DROP DATABASE my_db');

4.4.4 数据插入

async insertTablet(tablet: TableTablet | ITableTablet): Promise<void>

使用 tablet 格式向表中插入数据。

参数:

  • tablet: TableTablet 对象或包含表数据的普通对象

TableTablet 接口 (用于普通对象):

interface ITableTablet {
  tableName: string;                    // 表名
  columnNames: string[];                // 列名
  columnTypes: number[];                // 数据类型代码 (TSDataType)
  columnCategories: ColumnCategory[];   // 列类别
  timestamps: number[];                 // 时间戳(毫秒)
  values: any[][];                     // 二维数组: [行][列]
}

ColumnCategory 枚举:

enum ColumnCategory {
  TAG = 0,        // 标签列 - 用于 WHERE 子句筛选的索引列(如 device_id、region_id)
  FIELD = 2,      // 字段列 - 测量值(如 temperature、humidity)
  ATTRIBUTE = 1,  // 属性列 - 未索引的元数据(如 model、firmware_version)
  TIME = 3,       // 时间列(仅供内部使用)
}

列类别说明:

  • TAG (0) - 用于 WHERE 子句筛选的索引列(例如 device_id、region_id)
  • FIELD (2) - 测量值(例如 temperature、humidity)
  • ATTRIBUTE (1) - 不用于筛选的元数据(例如 device_model、firmware_version)
  • TIME (3) - 仅供内部使用。不要在 columnCategories 数组中使用 - 时间戳通过 timestamps 数组单独处理

TableTablet 类 (带辅助方法 - 推荐):

import { TableTablet, ColumnCategory, TSDataType } from '@iotdb/client';

// 创建 tablet
const tablet = new TableTablet(
  'sensor_data',
  ['region_id', 'device_id', 'model', 'temperature', 'humidity'],
  [TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT, TSDataType.FLOAT, TSDataType.DOUBLE],
  [ColumnCategory.TAG, ColumnCategory.TAG, ColumnCategory.ATTRIBUTE, ColumnCategory.FIELD, ColumnCategory.FIELD]
);

// 使用 addRow 方法逐行添加数据
tablet.addRow(Date.now(), ['region1', 'device001', 'ModelA', 25.5, 60.0]);
tablet.addRow(Date.now() + 1000, ['region1', 'device001', 'ModelA', 26.0, 61.5]);
tablet.addRow(Date.now() + 2000, ['region1', 'device002', 'ModelB', 24.8, 58.5]);

// 插入 tablet
await pool.insertTablet(tablet);

替代方案: 普通对象方法 (仍支持):

import { ColumnCategory, TSDataType } from '@iotdb/client';

await pool.insertTablet({
  tableName: 'sensor_data',
  columnNames: ['region_id', 'device_id', 'model', 'temperature', 'humidity'],
  columnTypes: [TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT, TSDataType.FLOAT, TSDataType.DOUBLE],
  columnCategories: [
    ColumnCategory.TAG,        // region_id - 索引标签
    ColumnCategory.TAG,        // device_id - 索引标签
    ColumnCategory.ATTRIBUTE,  // model - 元数据
    ColumnCategory.FIELD,      // temperature - 测量值
    ColumnCategory.FIELD,      // humidity - 测量值
  ],
  timestamps: [
    Date.now(),
    Date.now() + 1000,
    Date.now() + 2000,
  ],
  values: [
    ['region1', 'device001', 'ModelA', 25.5, 60.0],
    ['region1', 'device001', 'ModelA', 26.0, 61.5],
    ['region1', 'device002', 'ModelB', 24.8, 58.5],
  ],
});

使用数字值的示例(也支持):

await pool.insertTablet({
  tableName: 'sensor_data',
  columnNames: ['region_id', 'device_id', 'model', 'temperature', 'humidity'],
  columnTypes: [5, 5, 5, 3, 4],           // TEXT, TEXT, TEXT, FLOAT, DOUBLE
  columnCategories: [0, 0, 1, 2, 2],      // TAG, TAG, ATTRIBUTE, FIELD, FIELD
  timestamps: [Date.now()],
  values: [['region1', 'device001', 'ModelA', 25.5, 60.0]],
});

TableTablet 类的优势:

  • 便捷: addRow() 方法简化逐行添加数据
  • 类型安全: 构造函数验证参数长度
  • 已验证: 自动检查值是否与列数量匹配
  • 流式友好: 轻松在数据到达时添加行

5. 配置构建器

5.1 用于表模型的 PoolConfigBuilder

PoolConfigBuilder 用于创建 TableSessionPool 配置。

可用方法:

  • host(host: string): this
  • port(port: number): this
  • nodeUrls(urls: string[]): this
  • username(username: string): this
  • password(password: string): this
  • database(database: string): this - 对表模型很重要
  • timezone(timezone: string): this
  • fetchSize(size: number): this
  • maxPoolSize(size: number): this
  • minPoolSize(size: number): this
  • maxIdleTime(time: number): this
  • waitTimeout(timeout: number): this
  • enableSSL(enable: boolean): this
  • sslOptions(options: SSLOptions): this
  • build(): PoolConfig

示例:

const config = new PoolConfigBuilder()
  .nodeUrls(['iotdb1:6667', 'iotdb2:6667', 'iotdb3:6667'])
  .username('root')
  .password('root')
  .database('production_db')
  .fetchSize(2048)
  .maxPoolSize(30)
  .minPoolSize(10)
  .maxIdleTime(60000)
  .waitTimeout(60000)
  .build();

const pool = new TableSessionPool(config);
await pool.open();

6. 数据类型

6.1 支持的数据类型

表模型支持所有 IoTDB 数据类型:

代码类型JavaScript 类型在表模型中的使用
0BOOLEANboolean标签、属性、字段
1INT32number标签、属性、字段
2INT64number/string标签、属性、字段
3FLOATnumber属性、字段
4DOUBLEnumber属性、字段
5TEXTstring标签、属性、字段
8TIMESTAMPnumber/Date字段
9DATEnumber/Date字段
10BLOBBuffer字段
11STRINGstring标签、属性、字段

6.2 列类别

代码类别用途是否索引使用场景
0TAG标识符在 WHERE 子句中用于过滤
1ATTRIBUTE元数据描述性信息
2FIELD测量值实际传感器/测量值

6.3 在 insertTablet 中使用数据类型

混合类型示例:

await pool.insertTablet({
  tableName: 'equipment_metrics',
  columnNames: [
    'factory_id',    // TAG
    'equipment_id',  // TAG
    'manufacturer',  // ATTRIBUTE
    'model',         // ATTRIBUTE
    'temperature',   // FIELD
    'pressure',      // FIELD
    'is_active',     // FIELD
    'last_check',    // FIELD
  ],
  columnTypes: [5, 5, 5, 5, 3, 4, 0, 8],  // STRING, STRING, STRING, STRING, FLOAT, DOUBLE, BOOLEAN, TIMESTAMP
  columnCategories: [0, 0, 1, 1, 2, 2, 2, 2],  // TAG, TAG, ATTR, ATTR, FIELD, FIELD, FIELD, FIELD
  timestamps: [Date.now()],
  values: [[
    'factory01',
    'equip123',
    'ManufacturerA',
    'ModelX',
    75.5,
    101.325,
    true,
    Date.now(),
  ]],
});

7. 代码示例

7.1 完整的数据库和表设置

import { TableSessionPool, PoolConfigBuilder } from '@iotdb/client';

async function setupDatabase() {
  const pool = new TableSessionPool(
    new PoolConfigBuilder()
      .host('localhost')
      .port(6667)
      .username('root')
      .password('root')
      .maxPoolSize(10)
      .build()
  );
  
  await pool.open();
  
  try {
    // 创建数据库
    await pool.executeNonQueryStatement('CREATE DATABASE iot_platform');
    
    // 使用数据库
    await pool.executeNonQueryStatement('USE iot_platform');
    
    // 创建带 TTL 的表
    await pool.executeNonQueryStatement(`
      CREATE TABLE sensor_readings (
        region_id STRING TAG,
        building_id STRING TAG,
        floor INT32 TAG,
        device_id STRING TAG,
        device_type STRING ATTRIBUTE,
        location STRING ATTRIBUTE,
        temperature FLOAT FIELD,
        humidity FLOAT FIELD,
        co2_level INT32 FIELD,
        timestamp TIMESTAMP FIELD
      ) WITH (TTL=7776000000)
    `);
    
    console.log('Database and table created successfully');
    
    // 显示表
    const dataSet = await pool.executeQueryStatement('SHOW TABLES');
    console.log('Tables in database:');
    while (await dataSet.hasNext()) {
      console.log(dataSet.next().getFields());
    }
    await dataSet.close();
    
  } finally {
    await pool.close();
  }
}

setupDatabase();

7.2 批量插入多条记录

async function batchInsert(pool: TableSessionPool) {
  const regionIds = ['north', 'south', 'east', 'west'];
  const deviceIds = ['dev001', 'dev002', 'dev003'];
  
  const timestamps = [];
  const values = [];
  const now = Date.now();
  
  // 生成 100 条记录
  for (let i = 0; i < 100; i++) {
    timestamps.push(now + i * 1000);
    
    const region = regionIds[i % regionIds.length];
    const device = deviceIds[i % deviceIds.length];
    
    values.push([
      region,                         // region_id (TAG)
      device,                         // device_id (TAG)
      'SensorModelA',                 // model (ATTRIBUTE)
      20 + Math.random() * 10,       // temperature (FIELD)
      50 + Math.random() * 30,       // humidity (FIELD)
    ]);
  }
  
  await pool.insertTablet({
    tableName: 'sensor_readings',
    columnNames: ['region_id', 'device_id', 'model', 'temperature', 'humidity'],
    columnTypes: [5, 5, 5, 3, 3],
    columnCategories: [0, 0, 1, 2, 2],
    timestamps,
    values,
  });
  
  console.log(`Inserted ${timestamps.length} records`);
}

7.3 带过滤的查询

async function queryWithFilters(pool: TableSessionPool) {
  // 按 TAG 查询(已索引,高效)
  const dataSet = await pool.executeQueryStatement(`
    SELECT 
      device_id,
      temperature,
      humidity,
      timestamp
    FROM sensor_readings
    WHERE 
      region_id = 'north' 
      AND device_id IN ('dev001', 'dev002')
      AND temperature > 25.0
    ORDER BY timestamp DESC
    LIMIT 100
  `);
  
  const results = [];
  while (await dataSet.hasNext()) {
    const row = dataSet.next();
    results.push({
      deviceId: row.getString('device_id'),
      temperature: row.getFloat('temperature'),
      humidity: row.getFloat('humidity'),
      timestamp: new Date(row.getTimestamp()),
    });
  }
  
  await dataSet.close();
  
  console.log(`Found ${results.length} matching records`);
  return results;
}

7.4 聚合查询

async function aggregationQuery(pool: TableSessionPool) {
  const dataSet = await pool.executeQueryStatement(`
    SELECT 
      region_id,
      device_id,
      AVG(temperature) as avg_temp,
      MAX(temperature) as max_temp,
      MIN(temperature) as min_temp,
      COUNT(*) as record_count
    FROM sensor_readings
    WHERE timestamp >= ${Date.now() - 3600000}
    GROUP BY region_id, device_id
  `);
  
  console.log('Aggregation Results:');
  while (await dataSet.hasNext()) {
    const row = dataSet.next();
    console.log(`Region: ${row.getString('region_id')}, Device: ${row.getString('device_id')}`);
    console.log(`  Avg Temp: ${row.getFloat('avg_temp').toFixed(2)}°C`);
    console.log(`  Max Temp: ${row.getFloat('max_temp').toFixed(2)}°C`);
    console.log(`  Min Temp: ${row.getFloat('min_temp').toFixed(2)}°C`);
    console.log(`  Records: ${row.getInt('record_count')}`);
  }
  
  await dataSet.close();
}

7.5 多数据库操作

async function multiDatabaseOps(pool: TableSessionPool) {
  await pool.open();
  
  try {
    // 创建多个数据库
    await pool.executeNonQueryStatement('CREATE DATABASE production');
    await pool.executeNonQueryStatement('CREATE DATABASE staging');
    
    // 使用生产数据库
    await pool.executeNonQueryStatement('USE production');
    await pool.executeNonQueryStatement(`
      CREATE TABLE metrics (
        device_id STRING TAG,
        value DOUBLE FIELD
      )
    `);
    
    // 切换到测试数据库
    await pool.executeNonQueryStatement('USE staging');
    await pool.executeNonQueryStatement(`
      CREATE TABLE test_metrics (
        device_id STRING TAG,
        value DOUBLE FIELD
      )
    `);
    
    // 使用完全限定名跨数据库查询
    const prodData = await pool.executeQueryStatement('SELECT * FROM production.metrics LIMIT 10');
    const stagingData = await pool.executeQueryStatement('SELECT * FROM staging.test_metrics LIMIT 10');
    
    await prodData.close();
    await stagingData.close();
    
  } finally {
    await pool.close();
  }
}

8. 最佳实践

8.1 表设计

有效使用 TAG:

  • 将频繁用于 WHERE 子句的列设置为 TAG
  • TAG 已索引,可实现快速查询
  • 保持 TAG 基数合理(避免数百万个唯一值)

使用 ATTRIBUTE 存储元数据:

  • 不需要索引的描述性信息
  • 设备型号、制造商、位置等
  • 不在 WHERE 子句中使用

使用 FIELD 存储测量值:

  • 实际传感器读数和指标
  • 时间序列数据值

示例:

// 良好的表设计
CREATE TABLE sensor_data (
  region_id STRING TAG,        // 已索引,用于 WHERE
  device_id STRING TAG,        // 已索引,用于 WHERE
  manufacturer STRING ATTRIBUTE, // 元数据,未索引
  model STRING ATTRIBUTE,       // 元数据,未索引
  temperature FLOAT FIELD,     // 测量值
  humidity FLOAT FIELD         // 测量值
)

// 不良设计 - 使用 FIELD 作为标识符
CREATE TABLE sensor_data (
  temperature FLOAT FIELD,
  humidity FLOAT FIELD,
  device_id STRING FIELD       // 应该是 TAG!
)

8.2 查询优化

在 WHERE 子句中按 TAG 过滤:

// 好: 使用索引的 TAG
SELECT * FROM sensors 
WHERE region_id = 'north' AND device_id = 'dev001'

// 差: 按未索引的 FIELD 过滤
SELECT * FROM sensors 
WHERE temperature > 25.0  // 没有 TAG 过滤

使用适当的 LIMIT:

// 防止加载过多数据
SELECT * FROM sensors 
WHERE region_id = 'north'
LIMIT 1000

使用时间范围过滤器:

SELECT * FROM sensors 
WHERE region_id = 'north'
  AND timestamp >= ${Date.now() - 3600000}
  AND timestamp <= ${Date.now()}

8.3 连接池管理

合理设置连接池大小:

const pool = new TableSessionPool({
  nodeUrls: ['localhost:6667'],
  maxPoolSize: 50,    // 峰值并发查询数
  minPoolSize: 10,    // 保持热连接
  maxIdleTime: 60000, // 空闲 1 分钟后清理
  waitTimeout: 30000, // 最多等待 30 秒获取连接
});

监控连接池健康状况:

setInterval(() => {
  console.log('Pool Stats:');
  console.log(`  Total: ${pool.getPoolSize()}`);
  console.log(`  Available: ${pool.getAvailableSize()}`);
  console.log(`  In Use: ${pool.getInUseSize()}`);
}, 60000); // 每分钟

8.4 错误处理

async function robustInsert(pool: TableSessionPool, data: any) {
  try {
    await pool.insertTablet(data);
    console.log('Insert successful');
  } catch (error) {
    if (error.message.includes('Table does not exist')) {
      console.log('Creating table...');
      await createTable(pool);
      await pool.insertTablet(data);
    } else if (error.message.includes('Database does not exist')) {
      console.log('Creating database...');
      await createDatabase(pool);
      await createTable(pool);
      await pool.insertTablet(data);
    } else {
      console.error('Insert failed:', error);
      throw error;
    }
  }
}

8.5 资源清理

async function properCleanup() {
  const pool = new TableSessionPool('localhost', 6667, {
    username: 'root',
    password: 'root',
  });
  
  await pool.open();
  
  try {
    const dataSet = await pool.executeQueryStatement('SELECT * FROM table1');
    try {
      while (await dataSet.hasNext()) {
        // 处理结果
      }
    } finally {
      await dataSet.close(); // 始终关闭 DataSet
    }
  } finally {
    await pool.close(); // 始终关闭连接池
  }
}

9. 故障排查

9.1 常见问题

数据库不存在

症状:

Error: Database 'my_db' does not exist

解决方案:

// 先创建数据库
await pool.executeNonQueryStatement('CREATE DATABASE my_db');
await pool.executeNonQueryStatement('USE my_db');

// 或使用现有数据库配置连接池
const pool = new TableSessionPool('localhost', 6667, {
  database: 'my_db',  // 必须存在
});

表不存在

症状:

Error: Table 'my_table' does not exist

解决方案:

// 检查表是否存在
const dataSet = await pool.executeQueryStatement('SHOW TABLES');
// ... 验证表存在

// 如果需要,创建表
await pool.executeNonQueryStatement(`
  CREATE TABLE my_table (...)
`);

列不匹配

症状:

Error: Column count mismatch

解决方案:

  • 确保 columnNamescolumnTypescolumnCategories 长度相同
  • 验证 values 数组与列数匹配
  • 检查表模式是否与数据匹配
// 验证模式
const dataSet = await pool.executeQueryStatement('DESCRIBE my_table');
while (await dataSet.hasNext()) {
  console.log(dataSet.next().getFields());
}

TTL 问题

症状: 一段时间后数据自动删除

解决方案:

// 检查 TTL 设置
const dataSet = await pool.executeQueryStatement('SHOW TABLES');
// 查看表属性中的 TTL

// 修改 TTL
await pool.executeNonQueryStatement(`
  ALTER TABLE my_table SET PROPERTIES TTL=31536000000
`); // 1 年(毫秒)

9.2 性能问题

查询慢:

  1. 通过适当使用 TAG 添加索引
  2. 使用时间范围过滤器
  3. 添加 LIMIT 子句
  4. 考虑表分区

插入慢:

  1. 增加批量大小(100-1000 行)
  2. 使用连接池
  3. 考虑多个并发写入器
  4. 监控服务器资源

9.3 调试

启用调试日志:

process.env.LOG_LEVEL = 'debug';

检查 SQL 语法:

try {
  await pool.executeQueryStatement('EXPLAIN SELECT * FROM my_table');
} catch (error) {
  console.error('Invalid SQL:', error.message);
}

监控查询执行:

const start = Date.now();
const dataSet = await pool.executeQueryStatement('SELECT ...');
console.log(`Query took ${Date.now() - start}ms`);

let rowCount = 0;
while (await dataSet.hasNext()) {
  dataSet.next();
  rowCount++;
}
console.log(`Returned ${rowCount} rows`);

9.4 获取帮助

附录 A: 完整 API 参考

TableSessionPool 方法

  • open(enableRpcCompression?) - 打开连接池
  • close() - 关闭所有 session
  • executeQueryStatement(sql, timeout?) - 执行 SQL 查询
  • executeNonQueryStatement(sql) - 执行 DDL/DML
  • insertTablet(tablet) - 批量插入到表
  • getPoolSize() - 总 session 数
  • getAvailableSize() - 可用 session 数
  • getInUseSize() - 活动 session 数

SQL 语句

  • CREATE DATABASE database_name
  • DROP DATABASE database_name
  • USE database_name
  • SHOW DATABASES
  • SHOW TABLES
  • CREATE TABLE table_name (...)
  • DROP TABLE table_name
  • ALTER TABLE table_name SET PROPERTIES TTL=<ms>
  • SELECT ... FROM table_name WHERE ... LIMIT ...
  • DESCRIBE table_name

附录 B: 数据类型参考

详见 data-types.md 了解全面的数据类型文档。


版本: 1.0.0
最后更新: 2024年1月
许可证: Apache License 2.0