UDF

1. UDF介绍

UDF(User Defined Function)即用户自定义函数,IoTDB 提供多种内置的时序处理函数,也支持扩展自定义函数来满足更多的计算需求。

IoTDB 表模型中支持三种类型的 UDF ,如下表所示。

UDF 类型函数类型描述
UDSF(User-defined Scalar Function)标量函数输入 k 列 1 行数据,输出1 列 1 行数据(一对一)。
UDAF(User-defined Aggregate Function)聚合函数输入k 列 m 行数据,输出1 列 1 行数据(多对一)。
UDTF(User-defined Table Function)表函数输入0或1张表(k 列 m 行),输出1张表(x 行 y 列)。
  • UDSF 可用于标量函数出现的任何子句和表达式中,如select子句、where子句等。
    • select udsf1(s1) from table1 where udsf2(s1)>0
  • UDAF 可用于聚合函数出现的任何子句和表达式中,如select子句、having子句等;
    • select udaf1(s1), device_id from table1 group by device_id having udaf2(s1)>0
  • UDTF 可以像关系表一样在from子句中使用;
    • select * from udtf('t1', bid);

2. UDF 管理

2.1 UDF 注册

  1. 准备 UDF 实现的 JAR 包,其中包含 UDF 实现类,如org.apache.iotdb.udf.ScalarFunctionExample

    Jar 包的放置有两种方式:

  • 本地:需要将 JAR 包放置到集群所有节点ext/udf目录下。
  • 远端:需要将 JAR 包上传到 URI 服务器上并确保 IoTDB 实例能够访问该 URI 服务器(注册成功后IoTDB 会下载 JAR 包并同步到整个集群)。
  1. 使用以下 SQL 语句注册 UDF
CREATE FUNCTION <UDF-NAME> AS <UDF-CLASS-FULL-PATHNAME> (USING URI URI-STRING)
  • 示例
-- 本地
CREATE FUNCTION contain_null AS 'org.apache.iotdb.udf.ScalarFunctionExample';

-- 远端
CREATE FUNCTION contain_null AS 'org.apache.iotdb.udf.ScalarFunctionExample' USING URI 'http://jar/example.jar'

注意:

  1. UDF 在装载过程中无需启停服务器。
  2. UDF 名称大小写不敏感,不能与 IoTDB 内置函数重名。
  3. 表模型和树模型的 UDF 空间相互独立。
  4. 避免在不同的 JAR 包中创建全类名相同但功能逻辑不同的 UDF 类。如果存在,系统在执行 UDF 时会随机加载其中一个,造成执行行为不一致。

2.2 UDF 卸载

SQL 语法如下:

DROP FUNCTION <UDF-NAME>

示例:卸载上述例子的 UDF:

DROP FUNCTION contain_null

2.3 UDF 查看

  • 如果 State 为 UNAVAILABLE,可能是在注册或卸载过程中系统发生了错误,请查看系统日志进行排查,重新注册或卸载 UDF 直至成功即可。
SHOW FUNCTIONS

2.4 UDF 配置

  • 可以在 iotdb-system.properties 中配置 UDF Jar 文件的存储目录:
# UDF lib dir
udf_lib_dir=ext/udf

3. UDF 开发

3.1 UDF 依赖

可以从 Maven 库 中搜索下面示例中的依赖,请注意选择和目标 IoTDB 服务器版本相同的依赖版本。

<dependency>
  <groupId>org.apache.iotdb</groupId>
  <artifactId>udf-api</artifactId>
  <version>2.0.0</version>
  <scope>provided</scope>
</dependency>

3.2 标量函数(UDSF)

编写一个 UDSF 需要实现org.apache.iotdb.udf.api.relational.ScalarFunction接口。

public interface ScalarFunction extends SQLFunction {
  /**
   * In this method, the user need to do the following things:
   *
   * <ul>
   *   <li>Validate {@linkplain FunctionArguments}. Throw {@link UDFArgumentNotValidException} if
   *       any parameter is not valid.
   *   <li>Use {@linkplain FunctionArguments} to get input data types and infer output data type.
   *   <li>Construct and return a {@linkplain ScalarFunctionAnalysis} object.
   * </ul>
   *
   * @param arguments arguments used to validate
   * @throws UDFArgumentNotValidException if any parameter is not valid
   * @return the analysis result of the scalar function
   */
  ScalarFunctionAnalysis analyze(FunctionArguments arguments) throws UDFArgumentNotValidException;

  /**
   * This method is called after the ScalarFunction is instantiated and before the beginning of the
   * transformation process. This method is mainly used to initialize the resources used in
   * ScalarFunction.
   *
   * @param arguments used to parse the input arguments entered by the user
   * @throws UDFException the user can throw errors if necessary
   */
  default void beforeStart(FunctionArguments arguments) throws UDFException {
    // do nothing
  }

  /**
   * This method will be called to process the transformation. In a single UDF query, this method
   * may be called multiple times.
   *
   * @param input original input data row
   * @throws UDFException the user can throw errors if necessary
   */
  Object evaluate(Record input) throws UDFException;

  /** This method is mainly used to release the resources used in the ScalarFunction. */
  default void beforeDestroy() {
    // do nothing
  }
}

接口说明:

接口定义描述是否必须
ScalarFunctionAnalysis analyze(FunctionArguments arguments);1. 校验FunctionArguments中的输入列数、数据类型、系统参数等是否合法,不合法则抛出异常。
2. 根据FunctionArguments构造ScalarFunctionAnalysis,包括输出类型等信息。
void beforeStart(FunctionArguments arguments);在 UDSF 处理输入数据前,调用用户自定义的初始化行为
Object evaluate(Record input) throws UDFException;UDSF 处理逻辑,根据一行输入数据,返回一行输出数据。
void beforeDestroy();UDSF 的结束方法,您可以在此方法中进行一些资源释放等的操作。此方法由框架调用。对于一个实例而言,生命周期中会且只会被调用一次,即在处理完最后一条记录之后被调用。

目前 ScalarFunctionAnalysis 中的字段:

字段类型字段名称默认值
TypeoutputDataType

示例:UDSF 的实现示例,输入任意数据类型的任意多列,返回一个布尔值,代表该行输入是否包含 NULL 值。

3.3 聚合函数(UDAF)

一个完整的 UDAF 定义涉及到 StateAggregateFunction 两个类。

3.3.1 State 类

编写一个 State 类需要实现org.apache.iotdb.udf.api.State接口。

public interface State {
  /** Reset your state object to its initial state. */
  void reset();

  /**
   * Serialize your state into byte array. The order of serialization must be consistent with
   * deserialization.
   */
  byte[] serialize();

  /**
   * Deserialize byte array into your state. The order of deserialization must be consistent with
   * serialization.
   */
  void deserialize(byte[] bytes);

  /** Destroy state. You may release previously binding resource in this method. */
  default void destroyState() {}
  ;
}

接口说明:

接口定义描述是否必须
void reset() State对象重置为初始的状态,您需要像编写构造函数一样,在该方法内填入State类中各个字段的初始值。
byte[] serialize()State序列化为二进制数据。该方法用于 IoTDB 内部的State对象传递,注意序列化的顺序必须和下面的反序列化方法一致。
void deserialize(byte[] bytes)将二进制数据反序列化为State。该方法用于 IoTDB 内部的State对象传递,注意反序列化的顺序必须和上面的序列化方法一致。
void destroyState()进行资源释放等的操作。此方法由框架调用。对于一个实例而言,生命周期中会且只会被调用一次。

3.3.2 AggregateFunction 类

编写一个 UDAF 需要实现 org.apache.iotdb.udf.api.relational.AggregateFunction接口。

public interface AggregateFunction extends SQLFunction {

  /**
   * In this method, the user need to do the following things:
   *
   * <ul>
   *   <li>Validate {@linkplain FunctionArguments}. Throw {@link UDFArgumentNotValidException} if
   *       any parameter is not valid.
   *   <li>Use {@linkplain FunctionArguments} to get input data types and infer output data type.
   *   <li>Construct and return a {@linkplain AggregateFunctionAnalysis} object.
   * </ul>
   *
   * @param arguments arguments used to validate
   * @throws UDFArgumentNotValidException if any parameter is not valid
   * @return the analysis result of the scalar function
   */
  AggregateFunctionAnalysis analyze(FunctionArguments arguments)
      throws UDFArgumentNotValidException;

  /**
   * This method is called after the AggregateFunction is instantiated and before the beginning of
   * the transformation process. This method is mainly used to initialize the resources used in
   * AggregateFunction.
   *
   * @param arguments used to parse the input arguments entered by the user
   * @throws UDFException the user can throw errors if necessary
   */
  default void beforeStart(FunctionArguments arguments) throws UDFException {
    // do nothing
  }

  /** Create and initialize state. You may bind some resource in this method. */
  State createState();

  /**
   * Update state with data columns.
   *
   * @param state state to be updated
   * @param input original input data row
   */
  void addInput(State state, Record input);

  /**
   * Merge two state in execution engine.
   *
   * @param state current state
   * @param rhs right-hand-side state to be merged
   */
  void combineState(State state, State rhs);

  /**
   * Calculate output value from final state
   *
   * @param state final state
   * @param resultValue used to collect output data points
   */
  void outputFinal(State state, ResultValue resultValue);

  /**
   * Remove input data from state. This method is used to remove the data points that have been
   * added to the state. Once it is implemented, {@linkplain
   * AggregateFunctionAnalysis.Builder#removable(boolean)} should be set to true.
   *
   * @param state state to be updated
   * @param input row to be removed
   */
  default void remove(State state, Record input) {
    throw new UnsupportedOperationException();
  }

  /** This method is mainly used to release the resources used in the SQLFunction. */
  default void beforeDestroy() {
    // do nothing
  }
}

接口说明:

接口定义描述是否必须
AggregateFunctionAnalysis analyze(FunctionArguments arguments);1. 校验FunctionArguments中的输入列数、数据类型、系统参数等是否合法,不合法则抛出异常。
2. 根据FunctionArguments构造AggregateFunctionAnalysis,包括输出类型、removable 等信息。
void beforeStart(FunctionArguments arguments); 在 UDAF 处理输入数据前,调用用户自定义的初始化行为
State createState();创建State对象,一般只需要调用默认构造函数,然后按需修改默认的初始值即可。
void addInput(State state, Record input);更新State对象,将输入的一行​ ​Record数据添加到聚合状态中。
void combineState(State state, State rhs); rhs状态合并至state状态中。在分布式场景下,同一组的数据可能分布在不同节点上,IoTDB 会为每个节点上的部分数据生成一个State对象,然后调用该方法合并成完整的State
void outputFinal(State state, ResultValue resultValue);根据State中的数据,计算出最终的聚合结果。注意根据聚合的语义,每一组只能输出一个值。
void remove(State state, Record input);更新State对象,将输入的一行​ ​Record数据从聚合状态中剔除。实现该方法需要设置 AggregateFunctionAnalysis 中的 removable 字段为 true。
void beforeDestroy();UDSF 的结束方法,您可以在此方法中进行一些资源释放等的操作。此方法由框架调用。对于一个实例而言,生命周期中会且只会被调用一次,即在处理完最后一条记录之后被调用。

目前 AggregateFunctionAnalysis 中的字段:

字段类型字段名称默认值
TypeoutputDataType
booleanremovablefalse

示例:UDAF 的实现示例,计算不为 NULL 的行数。

3.4 表函数(UDTF)

3.4.1 定义

表函数,也被称为表值函数(Table-Valued Function,TVF),不同于标量函数、聚合函数和窗口函数的返回值是一个“标量值”,表函数的返回值是一个“表”(结果集)。

3.4.2 使用

表函数可以像关系表一样,以tableFunctionCall的形式在 SQL 查询的 FROM 子句中使用,支持传递参数,并根据参数动态生成结果集。

3.4.3 语法

tableFunctionCall 的具体定义如下所示:

tableFunctionCall
    : qualifiedName '(' (tableFunctionArgument (',' tableFunctionArgument)*)?')'
    ;

tableFunctionArgument
    : (identifier '=>')? (tableArgument | scalarArgument)
    ​;

tableArgument
    : tableArgumentRelation
        (PARTITION BY ('(' (expression (',' expression)*)? ')' | expression))?
        (ORDER BY ('(' sortItem (',' sortItem)* ')' | sortItem))?
    ;

tableArgumentRelation
    : qualifiedName  (AS? identifier columnAliases?)?         #tableArgumentTable
    | '(' query ')' (AS? identifier columnAliases?)?          #tableArgumentQuery
    ;

scalarArgument
    : expression
    | timeDuration
    ;

例如:

// 这是从表函数中进行查询,传入一个字符串“t1”参数。
select * from tvf('t1'); 

// 这是从表函数中进行查询,传入一个字符串“t1”参数,一个 bid 表参数。
select * from tvf('t1', bid);

3.4.4 函数参数

IoTDB 中的表函数为多态表值函数,支持参数类型如下所示:

参数类型定义示例
标量参数(Scalar Argument)必须是常量表达式,可以是任何的 SQL 数据类型,需要和声明的类型兼容。SIZE => 42SIZE => '42'SIZE => 42.2SIZE => 12hSIZE => 60m
表参数(Table Argument)可以是一个表名或一条查询语句。input => ordersdata => (SELECT * FROM region, nation WHERE region.regionkey = nation.regionkey)

表参数具有如下属性:

  1. 组语义与行语义

    • 被声明为组语义(Set Semantic)的表参数意味着需要根据整个完整的分区才能得到结果集。

      • 允许在调用时指定 PARTITION 或 ORDER,执行引擎会 partition-by-partition 地进行处理。
      input => orders PARTITION BY device_id ORDER BY time
      
      • 如果没有指定 PARTITION,则认为所有的数据都在同一个数据组中。
    • 被声明为行语义(Row Semantics)的表参数意味着行与行之间没有依赖关系。不允许在调用时指定 PARTITION 或 ORDER,执行引擎会 row-by-row 地进行处理。

  2. 列穿透(Pass-through Columns)

    • 表参数如果被声明为列穿透,则表函数的结果列会包含该表参数输入的所有列。
    • 例如,窗口分析函数,通过为表参数设置列穿透属性,可实现输出结果为“所有输入列(包含原始列和聚合结果)+窗口ID”,即“原始数据+分析结果”。

3.4.5 传递方式

传递方式描述示例
按名传递1. 可以通过任意的顺序传递参数。
2. 被声明有默认值的参数可以被省略。
3. 参数名大小写不敏感。
SELECT * FROM my_function(row_count => 100, column_count => 1); SELECT * FROM my_function(column_count => 1, row_count => 100); SELECT * FROM my_function(column_count => 1);
按位置传递1. 必须按照声明的顺序进行传递参数。
2. 如果余下的参数都有默认值,可以只传一部分参数。
SELECT * FROM my_function(1, 100); SELECT * FROM my_function(1);

注意: 以上两种方式不允许混用,否则在语义解析时候会抛出“​All arguments must be passed by name or all must be passed positionally​”异常。

3.4.6 返回结果

表函数的结果集由以下两部分组成。

  1. 由表函数创建的生成列(Proper Columns)。
  2. 根据表参数自动构建的映射列(Pass-through Columns)。
    • 如果指定了表参数的属性为列穿透,则会包括输入关系的所有列;
    • 如果没有指定为列穿透但指定了 PartitionBy,则是 PartitionBy 的列;
    • 如果均未指定,则不根据表参数自动构建列。

3.5 完整Maven项目示例

如果使用 Maven,可以参考示例项目udf-example