AINode 是支持时序相关模型注册、管理、调用的 IoTDB 原生节点,内置业界领先的自研时序大模型,如清华自研时序模型 Timer 系列,可通过标准 SQL 语句进行调用,实现时序数据的毫秒级实时推理,可支持时序趋势预测、缺失值填补、异常值检测等应用场景。
系统架构如下图所示:
三种节点的职责如下:
与单独构建机器学习服务相比,具有以下优势:
AINode 的部署可参考文档 AINode 部署 。
TimechoDB-AINode 支持模型推理、模型微调以及模型管理(注册、查看、删除、加载、卸载等)三大功能,下面章节将进行详细说明。
SQL语法如下:
call inference(<model_id>,inputSql,(<parameterName>=<parameterValue>)*)
在完成模型的注册后(内置模型推理无需注册流程),通过call关键字,调用inference函数就可以使用模型的推理功能,其对应的参数介绍如下:
model_id: 对应一个已经注册的模型
sql:sql查询语句,查询的结果作为模型的输入进行模型推理。查询的结果中行列的维度需要与具体模型config中指定的大小相匹配。(这里的sql不建议使用SELECT *子句,因为在IoTDB中,*并不会对列进行排序,因此列的顺序是未定义的,可以使用SELECT ot 的方式确保列的顺序符合模型输入的预期)
parameterName/parameterValue:参数名/参数值,目前支持:
| 参数名称 | 参数类型 | 参数描述 | 默认值 |
|---|---|---|---|
| generateTime | boolean | 返回结果是否包含时间戳列 | false |
| outputLength | int | 指定返回结果的输出长度 | 96 |
说明:
示例
样本数据 ETTh-tree
下面是使用 sundial 模型推理的一个操作示例,输入 96 行, 输出 48 行,我们通过SQL使用其进行推理。
IoTDB> select OT from root.db.** +-----------------------------+---------------+ | Time|root.db.etth.OT| +-----------------------------+---------------+ |2016-07-01T00:00:00.000+08:00| 30.531| |2016-07-01T01:00:00.000+08:00| 27.787| |2016-07-01T02:00:00.000+08:00| 27.787| |2016-07-01T03:00:00.000+08:00| 25.044| |2016-07-01T04:00:00.000+08:00| 21.948| | ...... | ...... | |2016-07-04T19:00:00.000+08:00| 29.546| |2016-07-04T20:00:00.000+08:00| 29.475| |2016-07-04T21:00:00.000+08:00| 29.264| |2016-07-04T22:00:00.000+08:00| 30.953| |2016-07-04T23:00:00.000+08:00| 31.726| +-----------------------------+---------------+ Total line number = 96 IoTDB> call inference(sundial,"select OT from root.db.**", generateTime=True, outputLength=48) +-----------------------------+------------------+ | Time| output| +-----------------------------+------------------+ |2016-07-04T23:00:00.000+08:00|30.537494659423828| |2016-07-04T23:59:22.500+08:00|29.619892120361328| |2016-07-05T00:58:45.000+08:00|28.815832138061523| |2016-07-05T01:58:07.500+08:00| 27.91131019592285| |2016-07-05T02:57:30.000+08:00|26.893848419189453| | ...... | ...... | |2016-07-06T17:33:07.500+08:00| 24.40607261657715| |2016-07-06T18:32:30.000+08:00| 25.00441551208496| |2016-07-06T19:31:52.500+08:00|24.907312393188477| |2016-07-06T20:31:15.000+08:00|25.156436920166016| |2016-07-06T21:30:37.500+08:00|25.335433959960938| +-----------------------------+------------------+ Total line number = 48
AINode 支持通过 SQL 进行模型微调任务。
SQL 语法
createModel | CREATE MODEL modelId=identifier (WITH HYPERPARAMETERS LR_BRACKET hparamPair (COMMA hparamPair)* RR_BRACKET)? FROM MODEL existingModelId=identifier ON DATASET LR_BRACKET trainingData RR_BRACKET ; trainingData : dataElement(COMMA dataElement)* ; dataElement : pathPatternElement (LR_BRACKET timeRange RR_BRACKET)? ; pathPatternElement : PATH path=prefixPath ;
参数说明
| 名称 | 描述 |
|---|---|
| modelId | 微调出的模型的唯一标识 |
| hparamPair | 微调使用的超参数 key-value 对,目前支持如下:train_epochs: int 类型,微调轮数 iter_per_epoch: int 类型,每轮微调的迭代次数 learning_rate: double 类型,学习率 |
| existingModelId | 微调使用的基座模型 |
| trainingData | 微调使用的数据集 |
示例
IoTDB> CREATE MODEL sundialv2 FROM MODEL sundial ON DATASET (PATH root.db.etth.OT([1467302400000, 1467644400000))) Msg: The statement is executed successfully. IoTDB> show models +---------------------+---------+-----------+---------+ | ModelId|ModelType| Category| State| +---------------------+---------+-----------+---------+ | arima| sktime| builtin| active| | holtwinters| sktime| builtin| active| |exponential_smoothing| sktime| builtin| active| | naive_forecaster| sktime| builtin| active| | stl_forecaster| sktime| builtin| active| | gaussian_hmm| sktime| builtin| active| | gmm_hmm| sktime| builtin| active| | stray| sktime| builtin| active| | timer_xl| timer| builtin| active| | sundial| sundial| builtin| active| | chronos2| t5| builtin| active| | sundialv2| sundial| fine_tuned| training| +---------------------+---------+-----------+---------+
IoTDB> show models +---------------------+---------+-----------+---------+ | ModelId|ModelType| Category| State| +---------------------+---------+-----------+---------+ | arima| sktime| builtin| active| | holtwinters| sktime| builtin| active| |exponential_smoothing| sktime| builtin| active| | naive_forecaster| sktime| builtin| active| | stl_forecaster| sktime| builtin| active| | gaussian_hmm| sktime| builtin| active| | gmm_hmm| sktime| builtin| active| | stray| sktime| builtin| active| | timer_xl| timer| builtin| active| | sundial| sundial| builtin| active| | chronos2| t5| builtin| active| | sundialv2| sundial| fine_tuned| active| +---------------------+---------+-----------+---------+
符合以下要求的 Transformers 模型可以注册到 AINode 中:
AINode 目前使用 v4.56.2 版本的 transformers,构建模型时需避免继承低版本(<4.50)接口;
模型需继承一类 AINode 的推理任务流水线(当前支持预测流水线):
V2.0.9.3 之前
class BasicPipeline(ABC): def __init__(self, model_id, **model_kwargs): self.model_info = model_info self.device = model_kwargs.get("device", "cpu") self.model = load_model(model_info, device_map=self.device, **model_kwargs) @abstractmethod def preprocess(self, inputs, **infer_kwargs): """ 在推理任务开始前对输入数据进行前处理,包括形状验证和数值转换。 """ pass @abstractmethod def postprocess(self, output, **infer_kwargs): """ 在推理任务结束后对输出结果进行后处理。 """ pass class ForecastPipeline(BasicPipeline): def __init__(self, model_info, **model_kwargs): super().__init__(model_info, model_kwargs=model_kwargs) def preprocess(self, inputs: list[dict[str, dict[str, torch.Tensor] | torch.Tensor]], **infer_kwargs): """ 在将输入数据传递给模型进行推理之前进行预处理,验证输入数据的形状和类型。 Args: inputs (list[dict]): 输入数据,字典列表,每个字典包含: - 'targets': 形状为 (input_length,) 或 (target_count, input_length) 的张量。 - 'past_covariates': 可选,张量字典,每个张量形状为 (input_length,)。 - 'future_covariates': 可选,张量字典,每个张量形状为 (input_length,)。 infer_kwargs (dict, optional): 推理的额外关键字参数,如: - `output_length`(int): 如果提供'future_covariates',用于验证其有效性。 Raises: ValueError: 如果输入格式不正确(例如,缺少键、张量形状无效)。 Returns: 经过预处理和验证的输入数据,可直接用于模型推理。 """ pass def forecast(self, inputs, **infer_kwargs): """ 对给定输入执行预测。 Parameters: inputs: 用于进行预测的输入数据。类型和结构取决于模型的具体实现。 **infer_kwargs: 额外的推理参数,例如: - `output_length`(int): 模型应该生成的时间点数量。 Returns: 预测输出,具体形式取决于模型的具体实现。 """ pass def postprocess(self, outputs: list[torch.Tensor], **infer_kwargs) -> list[torch.Tensor]: """ 在推理后对模型输出进行后处理,验证输出数据的形状并确保其符合预期维度。 Args: outputs: 模型输出,2D张量列表,每个张量形状为 `[target_count, output_length]`。 Raises: InferenceModelInternalException: 如果输出张量形状无效(例如,维数错误)。 ValueError: 如果输出格式不正确。 Returns: list[torch.Tensor]: 后处理后的输出,将是一个2D张量列表。 """ pass
V2.0.9.3 起
class BasicPipeline(ABC): def __init__(self, model_id, **model_kwargs): self.model_info = model_info self.device = model_kwargs.get("device", "cpu") self.model = load_model(model_info, device_map=self.device, **model_kwargs) @abstractmethod def preprocess(self, inputs, **infer_kwargs): """ 在推理任务开始前对输入数据进行前处理,包括形状验证和数值转换。 """ pass @abstractmethod def postprocess(self, output, **infer_kwargs): """ 在推理任务结束后对输出结果进行后处理。 """ pass class ForecastPipeline(BasicPipeline): def __init__(self, model_info, **model_kwargs): super().__init__(model_info, model_kwargs=model_kwargs) def _preprocess( self, inputs: list[dict[str, dict[str, torch.Tensor] | torch.Tensor]], **infer_kwargs, ): """ 在将输入数据传递给模型进行推理之前进行预处理,验证输入数据的形状和类型。 Args: inputs (list[dict[str, dict[str, torch.Tensor] | torch.Tensor]]): 输入数据,字典列表,每个字典包含: - 'targets': 形状为 (input_length,) 或 (target_count, input_length) 的张量。 - 'past_covariates': 可选,张量字典,每个张量形状为 (input_length,)。 - 'future_covariates': 可选,张量字典,每个张量形状为 (input_length,)。 infer_kwargs (dict, optional): 推理的额外关键字参数,如: - `output_length`(int): 如果提供'future_covariates',用于验证其有效性。 Raises: ValueError: 如果输入格式不正确(例如,缺少键、张量形状无效)。 Returns: 经过预处理和验证的输入数据,可直接用于模型推理。 """ pass def forecast(self, inputs, **infer_kwargs): """ 对给定输入执行预测。 Parameters: inputs: 用于进行预测的输入数据。类型和结构取决于模型的具体实现。 **infer_kwargs: 额外的推理参数,例如: - `output_length`(int): 模型应该生成的时间点数量。 Returns: 预测输出,具体形式取决于模型的具体实现。 """ pass def _postprocess(self, outputs, **infer_kwargs) -> list[torch.Tensor]: """ 在推理后对模型输出进行后处理,验证输出数据的形状并确保其符合预期维度。 Args: outputs: 模型输出,2D张量列表,每个张量形状为 `[target_count, output_length]`。 Raises: InferenceModelInternalException: 如果输出张量形状无效(例如,维数错误)。 ValueError: 如果输出格式不正确。 Returns: list[torch.Tensor]: 后处理后的输出,将是一个2D张量列表。 """ pass
修改模型配置文件 config.json,确保包含以下字段:
V2.0.9.3 之前
{ "auto_map": { "AutoConfig": "config.Chronos2CoreConfig", // 指定模型 Config 类 "AutoModelForCausalLM": "model.Chronos2Model" // 指定模型类 }, "pipeline_cls": "pipeline_chronos2.Chronos2Pipeline", // 指定模型的推理流水线 "model_type": "custom_t5", // 指定模型类型 }
V2.0.9.3 起
参数 model_type 非必填
{ "auto_map": { "AutoConfig": "config.Chronos2CoreConfig", // 指定模型 Config 类 "AutoModelForCausalLM": "model.Chronos2Model" // 指定模型类 }, "pipeline_cls": "pipeline_chronos2.Chronos2Pipeline", // 指定模型的推理流水线 }
确保要注册的模型目录包含以下文件,且模型配置文件名称和权重文件名称不支持自定义:
注册自定义模型的 SQL 语法如下所示:
CREATE MODEL <model_id> USING URI <uri>
参数说明:
注册示例:
从本地路径上传自定义 Transformers 模型,AINode 会将该文件夹拷贝至 user_defined 目录中。
CREATE MODEL chronos2 USING URI 'file:///path/to/chronos2'
SQL执行后会异步进行注册的流程,可以通过模型展示查看模型的注册状态(见查看模型章节)。模型注册完成后,就可以通过使用正常查询的方式调用具体函数,进行模型推理。
注册成功的模型可以通过查看指令查询模型的具体信息。
SHOW MODELS
除了直接展示所有模型的信息外,可以指定model_id来查看某一具体模型的信息。
SHOW MODELS <model_id> -- 只展示特定模型
模型展示的结果中包含如下内容:
| ModelId | ModelType | Category | State |
|---|---|---|---|
| 模型ID | 模型类型 | 模型种类 | 模型状态 |
其中,State 模型状态机流转示意图如下:
状态机流程说明:
show models 命令,仅能查看到**系统内置(BUILTIN)**的模型。查看示例
IoTDB> show models +---------------------+--------------+--------------+-------------+ | ModelId| ModelType| Category| State| +---------------------+--------------+--------------+-------------+ | arima| sktime| builtin| active| | holtwinters| sktime| builtin| active| |exponential_smoothing| sktime| builtin| active| | naive_forecaster| sktime| builtin| active| | stl_forecaster| sktime| builtin| active| | gaussian_hmm| sktime| builtin| active| | gmm_hmm| sktime| builtin| active| | stray| sktime| builtin| active| | custom| | user_defined| active| | timer_xl| timer| builtin| activating| | sundial| sundial| builtin| active| | sundialx_1| sundial| fine_tuned| active| | sundialx_4| sundial| fine_tuned| training| | sundialx_5| sundial| fine_tuned| failed| | chronos2| t5| builtin| inactive| +---------------------+--------------+--------------+-------------+
内置传统时序模型介绍如下:
| 模型名称 | 核心概念 | 适用场景 | 主要特点 |
|---|---|---|---|
| ARIMA(自回归整合移动平均模型) | 结合自回归(AR)、差分(I)和移动平均(MA),用于预测平稳时间序列或可通过差分变为平稳的数据。 | 单变量时间序列预测,如股票价格、销量、经济指标等。 | 1. 适用于线性趋势和季节性较弱的数据。2. 需要选择参数 (p,d,q)。3. 对缺失值敏感。 |
| Holt-Winters(三参数指数平滑) | 基于指数平滑,引入水平、趋势和季节性三个分量,适用于具有趋势和季节性的数据。 | 有明显季节性和趋势的时间序列,如月度销售额、电力需求等。 | 1. 可处理加性或乘性季节性。2. 对近期数据赋予更高权重。3. 简单易实现。 |
| Exponential Smoothing(指数平滑) | 通过加权平均历史数据,权重随时间指数递减,强调近期观测值的重要性。 | 无显著季节性但存在趋势的数据,如短期需求预测。 | 1. 参数少,计算简单。2. 适合平稳或缓慢变化序列。3. 可扩展为双指数或三指数平滑。 |
| Naive Forecaster(朴素预测器) | 使用最近一期的观测值作为下一期的预测值,是最简单的基准模型。 | 作为其他模型的比较基准,或数据无明显模式时的简单预测。 | 1. 无需训练。2. 对突发变化敏感。3. 季节性朴素变体可用前一季节同期值预测。 |
| STL Forecaster(季节趋势分解预测) | 基于STL分解时间序列,分别预测趋势、季节性和残差分量后组合。 | 具有复杂季节性、趋势和非线性模式的数据,如气候数据、交通流量。 | 1. 能处理非固定季节性。2. 对异常值稳健。3. 分解后可结合其他模型预测各分量。 |
| Gaussian HMM(高斯隐马尔可夫模型) | 假设观测数据由隐藏状态生成,每个状态的观测概率服从高斯分布。 | 状态序列预测或分类,如语音识别、金融状态识别。 | 1. 适用于时序数据的状态建模。2. 假设观测值在给定状态下独立。3. 需指定隐藏状态数量。 |
| GMM HMM (高斯混合隐马尔可夫模型) | 扩展Gaussian HMM,每个状态的观测概率由高斯混合模型描述,可捕捉更复杂的观测分布。 | 需要多模态观测分布的场景,如复杂动作识别、生物信号分析。 | 1. 比单一高斯更灵活。2. 参数更多,计算复杂度高。3. 需训练GMM成分数。 |
| STRAY(基于奇异值的异常检测) | 通过奇异值分解(SVD)检测高维数据中的异常点,常用于时间序列异常检测。 | 高维时间序列的异常检测,如传感器网络、IT系统监控。 | 1. 无需分布假设。2. 可处理高维数据。3. 对全局异常敏感,局部异常可能漏检。 |
内置时序大模型介绍如下:
| 模型名称 | 核心概念 | 适用场景 | 主要特点 |
|---|---|---|---|
| Timer-XL | 支持超长上下文的时序大模型,通过大规模工业数据预训练增强泛化能力。 | 需利用极长历史数据的复杂工业预测,如能源、航空航天、交通等领域。 | 1. 超长上下文支持,可处理数万时间点输入。2. 多场景覆盖,支持非平稳、多变量及协变量预测。3. 基于万亿级高质量工业时序数据预训练。 |
| Timer-Sundial | 采用“Transformer + TimeFlow”架构的生成式基础模型,专注于概率预测。 | 需要量化不确定性的零样本预测场景,如金融、供应链、新能源发电预测。 | 1. 强大的零样本泛化能力,支持点预测与概率预测 2. 可灵活分析预测分布的任意统计特性。3. 创新生成架构,实现高效的非确定性样本生成。 |
| Chronos-2 | 基于离散词元化范式的通用时序基础模型,将预测转化为语言建模任务。 | 快速零样本单变量预测,以及可借助协变量(如促销、天气)提升效果的场景。 | 1. 强大的零样本概率预测能力。2. 支持协变量统一建模,但对输入有严格要求:a. 未来协变量的名称组成的集合必须是历史协变量的名称组成的集合的子集;b. 每个历史协变量的长度必须等于目标变量的长度; c. 每个未来协变量的长度必须等于预测长度;3. 采用高效的编码器式结构,兼顾性能与推理速度。 |
对于注册成功的模型,用户可以通过 SQL 进行删除,AINode 会将 user_defined 目录下的对应模型文件夹整个删除。其 SQL 语法如下:
DROP MODEL <model_id>
需要指定已经成功注册的模型 model_id 来删除对应的模型。由于模型删除涉及模型数据清理,操作不会立即完成,此时模型的状态为 DROPPING,该状态的模型不能用于模型推理。请注意,该功能不支持删除内置模型。
为适应不同场景,AINode 提供以下两种模型加载策略:
下文将详细介绍加载/卸载模型的相关内容:
支持通过编辑如下配置项设置常驻加载相关参数。
# AINode 在推理时可使用的设备内存/显存占总量的比例 # Datatype: Float ain_inference_memory_usage_ratio=0.4 # AINode 每个加载的模型实例需要占用的内存比例,即模型占用*该值 # Datatype: Float ain_inference_extra_memory_ratio=1.2
支持通过如下 SQL 命令查看所有可用的设备 ID
SHOW AI_DEVICES
示例
IoTDB> show ai_devices +-------------+ | DeviceId| +-------------+ | cpu| | 0| | 1| +-------------+
支持通过如下 SQL 命令手动加载模型,系统根据硬件资源使用情况自动均衡模型实例数量。
LOAD MODEL <existing_model_id> TO DEVICES <device_id>(, <device_id>)*
参数要求
示例
LOAD MODEL sundial TO DEVICES 'cpu,0,1'
支持通过如下 SQL 命令手动卸载指定模型的所有实例,系统会重分配空闲出的资源给其他模型
UNLOAD MODEL <existing_model_id> FROM DEVICES <device_id>(, <device_id>)*
参数要求
示例
UNLOAD MODEL sundial FROM DEVICES 'cpu,0,1'
支持通过如下 SQL 命令查看已经手动加载的模型实例,可通过 device_id 指定设备。
SHOW LOADED MODELS SHOW LOADED MODELS <device_id>(, <device_id>)* # 展示指定设备中的模型实例
示例:在内存、gpu_0 和 gpu_1 两张显卡加载了sundial 模型
IoTDB> show loaded models +-------------+--------------+------------------+ | DeviceId| ModelId| Count(instances)| +-------------+--------------+------------------+ | cpu| sundial| 4| | 0| sundial| 6| | 1| sundial| 6| +-------------+--------------+------------------+
说明:
AINode 目前支持多种时序大模型,相关介绍及部署使用可参考时序大模型
使用 AINode 相关的功能时,可以使用IoTDB本身的鉴权去做一个权限管理,用户只有在具备 USE_MODEL 权限时,才可以使用模型管理的相关功能。当使用推理功能时,用户需要有访问输入模型的 SQL 对应的源序列的权限。
| 权限名称 | 权限范围 | 管理员用户(默认ROOT) | 普通用户 | 路径相关 |
|---|---|---|---|---|
| USE_MODEL | create model / show models / drop model | √ | √ | x |
| READ_DATA | call inference | √ | √ | √ |