blob: e4c6b09ff13574ba8be1872405d2f34e44867d68 [file] [log] [blame] [view]
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# UDF
## 1. UDF介绍
UDF(User Defined Function)即用户自定义函数,IoTDB 提供多种内置的时序处理函数,也支持扩展自定义函数来满足更多的计算需求。
IoTDB 表模型中支持三种类型的 UDF ,如下表所示。
| UDF 类型 | 函数类型 | 描述 |
|-----------------------------------------|------|--------------------------------|
| `UDSF(User-defined Scalar Function)` | 标量函数 | 输入 k 列 1 行数据,输出1 列 1 行数据(一对一)。 |
| `UDAF(User-defined Aggregate Function)` | 聚合函数 | 输入k 列 m 行数据,输出1 列 1 行数据(多对一)。 |
| `UDTF(User-defined Table Function)` | 表函数 | 输入0或1张表(k 列 m 行),输出1张表(x 行 y 列)。 |
* `UDSF` 可用于标量函数出现的任何子句和表达式中,如select子句、where子句等。
* `select udsf1(s1) from table1 where udsf2(s1)>0`
* `UDAF` 可用于聚合函数出现的任何子句和表达式中,如select子句、having子句等;
* `select udaf1(s1), device_id from table1 group by device_id having udaf2(s1)>0 `
* `UDTF` 可以像关系表一样在from子句中使用;
* `select * from udtf('t1', bid);`
## 2. UDF 管理
### 2.1 UDF 注册
1. 准备 UDF 实现的 JAR 包,其中包含 UDF 实现类,如`org.apache.iotdb.udf.ScalarFunctionExample`。
Jar 包的放置有两种方式:
* 本地:需要将 JAR 包放置到集群**所有节点**的 `ext/udf`目录下。
* 远端:需要将 JAR 包上传到 URI 服务器上并确保 IoTDB 实例能够访问该 URI 服务器(注册成功后IoTDB 会下载 JAR 包并同步到整个集群)。
2. 使用以下 SQL 语句注册 UDF
```SQL
CREATE FUNCTION <UDF-NAME> AS <UDF-CLASS-FULL-PATHNAME> (USING URI URI-STRING)
```
* 示例
```SQL
-- 本地
CREATE FUNCTION contain_null AS 'org.apache.iotdb.udf.ScalarFunctionExample';
-- 远端
CREATE FUNCTION contain_null AS 'org.apache.iotdb.udf.ScalarFunctionExample' USING URI 'http://jar/example.jar'
```
**注意:**
1. UDF 在装载过程中无需启停服务器。
2. UDF 名称大小写不敏感,不能与 IoTDB 内置函数重名。
3. 表模型和树模型的 UDF 空间相互独立。
4. 避免在不同的 JAR 包中创建全类名相同但功能逻辑不同的 UDF 类。如果存在,系统在执行 UDF 时会随机加载其中一个,造成执行行为不一致。
### 2.2 UDF 卸载
SQL 语法如下:
```SQL
DROP FUNCTION <UDF-NAME>
```
示例:卸载上述例子的 UDF:
```SQL
DROP FUNCTION contain_null
```
### 2.3 UDF 查看
* 如果 State 为 UNAVAILABLE,可能是在注册或卸载过程中系统发生了错误,请查看系统日志进行排查,重新注册或卸载 UDF 直至成功即可。
```SQL
SHOW FUNCTIONS
```
### 2.4 UDF 配置
* 可以在 `iotdb-system.properties` 中配置 UDF Jar 文件的存储目录:
```Properties
# UDF lib dir
udf_lib_dir=ext/udf
```
## 3. UDF 开发
### 3.1 UDF 依赖
可以从 [Maven 库](http://search.maven.org/) 中搜索下面示例中的依赖,请注意选择和目标 IoTDB 服务器版本相同的依赖版本。
```XML
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>udf-api</artifactId>
<version>2.0.0</version>
<scope>provided</scope>
</dependency>
```
### 3.2 标量函数(UDSF)
编写一个 UDSF 需要实现`org.apache.iotdb.udf.api.relational.ScalarFunction`接口。
```java
public interface ScalarFunction extends SQLFunction {
/**
* In this method, the user need to do the following things:
*
* <ul>
* <li>Validate {@linkplain FunctionArguments}. Throw {@link UDFArgumentNotValidException} if
* any parameter is not valid.
* <li>Use {@linkplain FunctionArguments} to get input data types and infer output data type.
* <li>Construct and return a {@linkplain ScalarFunctionAnalysis} object.
* </ul>
*
* @param arguments arguments used to validate
* @throws UDFArgumentNotValidException if any parameter is not valid
* @return the analysis result of the scalar function
*/
ScalarFunctionAnalysis analyze(FunctionArguments arguments) throws UDFArgumentNotValidException;
/**
* This method is called after the ScalarFunction is instantiated and before the beginning of the
* transformation process. This method is mainly used to initialize the resources used in
* ScalarFunction.
*
* @param arguments used to parse the input arguments entered by the user
* @throws UDFException the user can throw errors if necessary
*/
default void beforeStart(FunctionArguments arguments) throws UDFException {
// do nothing
}
/**
* This method will be called to process the transformation. In a single UDF query, this method
* may be called multiple times.
*
* @param input original input data row
* @throws UDFException the user can throw errors if necessary
*/
Object evaluate(Record input) throws UDFException;
/** This method is mainly used to release the resources used in the ScalarFunction. */
default void beforeDestroy() {
// do nothing
}
}
```
接口说明:
| 接口定义 | 描述 | 是否必须 |
| --------------------------------------------------------------- |---------------------------------------------------------------------------------------------------------------------------| ---------- |
| `ScalarFunctionAnalysis analyze(FunctionArguments arguments);`| 1. 校验`FunctionArguments`中的输入列数、数据类型、系统参数等是否合法,不合法则抛出异常。<br> 2. 根据`FunctionArguments`构造`ScalarFunctionAnalysis`,包括输出类型等信息。 | 是 |
| `void beforeStart(FunctionArguments arguments);` | 在 UDSF 处理输入数据前,调用用户自定义的初始化行为 | 否 |
| `Object evaluate(Record input) throws UDFException;` | UDSF 处理逻辑,根据一行输入数据,返回一行输出数据。 | 是 |
| `void beforeDestroy();` | UDSF 的结束方法,您可以在此方法中进行一些资源释放等的操作。此方法由框架调用。对于一个实例而言,生命周期中会且只会被调用一次,即在处理完最后一条记录之后被调用。 | 否 |
目前 ScalarFunctionAnalysis 中的字段:
| 字段类型 | 字段名称 | 默认值 |
| ---------- | ---------------- | -------- |
| Type | outputDataType | 无 |
示例:[UDSF 的实现示例](https://github.com/apache/iotdb/blob/master/example/udf/src/main/java/org/apache/iotdb/udf/ScalarFunctionExample.java),输入任意数据类型的任意多列,返回一个布尔值,代表该行输入是否包含 NULL 值。
### 3.3 聚合函数(UDAF)
一个完整的 UDAF 定义涉及到 `State` 和 `AggregateFunction` 两个类。
#### 3.3.1 State 类
编写一个 State 类需要实现`org.apache.iotdb.udf.api.State`接口。
```c++
public interface State {
/** Reset your state object to its initial state. */
void reset();
/**
* Serialize your state into byte array. The order of serialization must be consistent with
* deserialization.
*/
byte[] serialize();
/**
* Deserialize byte array into your state. The order of deserialization must be consistent with
* serialization.
*/
void deserialize(byte[] bytes);
/** Destroy state. You may release previously binding resource in this method. */
default void destroyState() {}
;
}
```
接口说明:
| 接口定义 | 描述 | 是否必须 |
| -------------------------------- | ------------------------------------------------------------------------------------------------------------------------------- | ---------- |
| `void reset() ` | 将`State`对象重置为初始的状态,您需要像编写构造函数一样,在该方法内填入`State`类中各个字段的初始值。 | 是 |
| `byte[] serialize()` | 将`State`序列化为二进制数据。该方法用于 IoTDB 内部的`State`对象传递,注意序列化的顺序必须和下面的反序列化方法一致。 | 是 |
| `void deserialize(byte[] bytes)` | 将二进制数据反序列化为`State`。该方法用于 IoTDB 内部的`State`对象传递,注意反序列化的顺序必须和上面的序列化方法一致。 | 是 |
| `void destroyState()` | 进行资源释放等的操作。此方法由框架调用。对于一个实例而言,生命周期中会且只会被调用一次。 | 否 |
#### 3.3.2 AggregateFunction 类
编写一个 UDAF 需要实现 `org.apache.iotdb.udf.api.relational.AggregateFunction`接口。
```java
public interface AggregateFunction extends SQLFunction {
/**
* In this method, the user need to do the following things:
*
* <ul>
* <li>Validate {@linkplain FunctionArguments}. Throw {@link UDFArgumentNotValidException} if
* any parameter is not valid.
* <li>Use {@linkplain FunctionArguments} to get input data types and infer output data type.
* <li>Construct and return a {@linkplain AggregateFunctionAnalysis} object.
* </ul>
*
* @param arguments arguments used to validate
* @throws UDFArgumentNotValidException if any parameter is not valid
* @return the analysis result of the scalar function
*/
AggregateFunctionAnalysis analyze(FunctionArguments arguments)
throws UDFArgumentNotValidException;
/**
* This method is called after the AggregateFunction is instantiated and before the beginning of
* the transformation process. This method is mainly used to initialize the resources used in
* AggregateFunction.
*
* @param arguments used to parse the input arguments entered by the user
* @throws UDFException the user can throw errors if necessary
*/
default void beforeStart(FunctionArguments arguments) throws UDFException {
// do nothing
}
/** Create and initialize state. You may bind some resource in this method. */
State createState();
/**
* Update state with data columns.
*
* @param state state to be updated
* @param input original input data row
*/
void addInput(State state, Record input);
/**
* Merge two state in execution engine.
*
* @param state current state
* @param rhs right-hand-side state to be merged
*/
void combineState(State state, State rhs);
/**
* Calculate output value from final state
*
* @param state final state
* @param resultValue used to collect output data points
*/
void outputFinal(State state, ResultValue resultValue);
/**
* Remove input data from state. This method is used to remove the data points that have been
* added to the state. Once it is implemented, {@linkplain
* AggregateFunctionAnalysis.Builder#removable(boolean)} should be set to true.
*
* @param state state to be updated
* @param input row to be removed
*/
default void remove(State state, Record input) {
throw new UnsupportedOperationException();
}
/** This method is mainly used to release the resources used in the SQLFunction. */
default void beforeDestroy() {
// do nothing
}
}
```
接口说明:
| 接口定义 | 描述 | 是否必须 |
| ----------------------------------------------------------------- |-----------------------------------------------------------------------------------------------------------------------------------------|----------------|
| `AggregateFunctionAnalysis analyze(FunctionArguments arguments);` | 1. 校验`FunctionArguments`中的输入列数、数据类型、系统参数等是否合法,不合法则抛出异常。<br> 2. 根据`FunctionArguments`构造`AggregateFunctionAnalysis`,包括输出类型、removable 等信息。 | 是 |
| `void beforeStart(FunctionArguments arguments); ` | 在 UDAF 处理输入数据前,调用用户自定义的初始化行为 | 否 |
| `State createState();` | 创建`State`对象,一般只需要调用默认构造函数,然后按需修改默认的初始值即可。 | 是 |
| `void addInput(State state, Record input);` | 更新`State`对象,将输入的一行*​ ​*`Record`数据添加到聚合状态中。 | 是 |
| `void combineState(State state, State rhs); ` | 将`rhs`状态合并至`state`状态中。在分布式场景下,同一组的数据可能分布在不同节点上,IoTDB 会为每个节点上的部分数据生成一个`State`对象,然后调用该方法合并成完整的`State`。 | 是 |
| `void outputFinal(State state, ResultValue resultValue);` | 根据`State`中的数据,计算出最终的聚合结果。注意根据聚合的语义,每一组只能输出一个值。 | 是 |
| `void remove(State state, Record input);`| 更新`State`对象,将输入的一行*​ ​*`Record`数据从聚合状态中剔除。**实现该方法需要设置 AggregateFunctionAnalysis 中的 removable 字段为 true。** | 否 |
| `void beforeDestroy();`| UDSF 的结束方法,您可以在此方法中进行一些资源释放等的操作。此方法由框架调用。对于一个实例而言,生命周期中会且只会被调用一次,即在处理完最后一条记录之后被调用。 | 否 |
目前 AggregateFunctionAnalysis 中的字段:
| 字段类型 | 字段名称 | 默认值 |
| ---------- | ---------------- | -------- |
| Type | outputDataType | 无 |
| boolean | removable | false |
示例:[UDAF 的实现示例](https://github.com/apache/iotdb/blob/master/example/udf/src/main/java/org/apache/iotdb/udf/AggregateFunctionExample.java),计算不为 NULL 的行数。
### 3.4 表函数(UDTF)
#### 3.4.1 定义
表函数,也被称为表值函数(Table-Valued Function,TVF),不同于标量函数、聚合函数和窗口函数的返回值是一个“标量值”,表函数的返回值是一个“表”(结果集)。
#### 3.4.2 使用
表函数可以像关系表一样,以`tableFunctionCall`的形式在 SQL 查询的 `FROM` 子句中使用,支持传递参数,并根据参数动态生成结果集。
#### 3.4.3 语法
`tableFunctionCall` 的具体定义如下所示:
```sql
tableFunctionCall
: qualifiedName '(' (tableFunctionArgument (',' tableFunctionArgument)*)?')'
;
tableFunctionArgument
: (identifier '=>')? (tableArgument | scalarArgument)
​ ​;
tableArgument
: tableArgumentRelation
(PARTITION BY ('(' (expression (',' expression)*)? ')' | expression))?
(ORDER BY ('(' sortItem (',' sortItem)* ')' | sortItem))?
;
tableArgumentRelation
: qualifiedName (AS? identifier columnAliases?)? #tableArgumentTable
| '(' query ')' (AS? identifier columnAliases?)? #tableArgumentQuery
;
scalarArgument
: expression
| timeDuration
;
```
例如:
```SQL
// 这是从表函数中进行查询,传入一个字符串“t1”参数。
select * from tvf('t1');
// 这是从表函数中进行查询,传入一个字符串“t1”参数,一个 bid 表参数。
select * from tvf('t1', bid);
```
#### 3.4.4 函数参数
IoTDB 中的表函数为多态表值函数,支持参数类型如下所示:
| 参数类型 | 定义 | 示例 |
| ------------------------------ | ---------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------- |
| 标量参数(Scalar Argument)| 必须是常量表达式,可以是任何的 SQL 数据类型,需要和声明的类型兼容。| `SIZE => 42`,`SIZE => '42'`,`SIZE => 42.2`,`SIZE => 12h`,`SIZE => 60m` |
| 表参数(Table Argument) | 可以是一个表名或一条查询语句。 | `input => orders`,`data => (SELECT * FROM region, nation WHERE region.regionkey = nation.regionkey)` |
表参数具有如下属性:
1. 组语义与行语义
* 被声明为组语义(Set Semantic)的表参数意味着需要根据整个完整的分区才能得到结果集。
* 允许在调用时指定 PARTITION 或 ORDER,执行引擎会 partition-by-partition 地进行处理。
```SQL
input => orders PARTITION BY device_id ORDER BY time
```
* 如果没有指定 PARTITION,则认为所有的数据都在同一个数据组中。
* 被声明为行语义(Row Semantics)的表参数意味着行与行之间没有依赖关系。不允许在调用时指定 PARTITION 或 ORDER,执行引擎会 row-by-row 地进行处理。
2. 列穿透(Pass-through Columns)
* 表参数如果被声明为列穿透,则表函数的结果列会包含该表参数输入的所有列。
* 例如,窗口分析函数,通过为表参数设置列穿透属性,可实现输出结果为“所有输入列(包含原始列和聚合结果)+窗口ID”,即“原始数据+分析结果”。
#### 3.4.5 传递方式
| 传递方式 | 描述 | 示例 |
| ------------ |-----------------------------------------------------------| ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 按名传递 | 1. 可以通过任意的顺序传递参数。<br>2. 被声明有默认值的参数可以被省略。<br>3. 参数名大小写不敏感。 | `SELECT * FROM my_function(row_count => 100, column_count => 1);` `SELECT * FROM my_function(column_count => 1, row_count => 100);` `SELECT * FROM my_function(column_count => 1);` |
| 按位置传递 | 1. 必须按照声明的顺序进行传递参数。<br>2. 如果余下的参数都有默认值,可以只传一部分参数。 | `SELECT * FROM my_function(1, 100);` `SELECT * FROM my_function(1);` |
> 注意:
> 以上两种方式不允许混用,否则在语义解析时候会抛出“​**All arguments must be passed by name or all must be passed positionally**​”异常。
#### 3.4.6 返回结果
表函数的结果集由以下两部分组成。
1. 由表函数创建的生成列(Proper Columns)。
2. 根据表参数自动构建的映射列(Pass-through Columns)。
* 如果指定了表参数的属性为列穿透,则会包括输入关系的所有列;
* 如果没有指定为列穿透但指定了 PartitionBy,则是 PartitionBy 的列;
* 如果均未指定,则不根据表参数自动构建列。
### 3.5 完整Maven项目示例
如果使用 [Maven](http://search.maven.org/),可以参考示例项目[udf-example](https://github.com/apache/iotdb/tree/master/example/udf)。