UDF(User Defined Function)即用户自定义函数,IoTDB 提供多种内置的时序处理函数,也支持扩展自定义函数来满足更多的计算需求。
IoTDB 表模型中支持三种类型的 UDF ,如下表所示。
| UDF 类型 | 函数类型 | 描述 |
|---|---|---|
UDSF(User-defined Scalar Function) | 标量函数 | 输入 k 列 1 行数据,输出1 列 1 行数据(一对一)。 |
UDAF(User-defined Aggregate Function) | 聚合函数 | 输入k 列 m 行数据,输出1 列 1 行数据(多对一)。 |
UDTF(User-defined Table Function) | 表函数 | 输入0或1张表(k 列 m 行),输出1张表(x 行 y 列)。 |
UDSF 可用于标量函数出现的任何子句和表达式中,如select子句、where子句等。select udsf1(s1) from table1 where udsf2(s1)>0UDAF 可用于聚合函数出现的任何子句和表达式中,如select子句、having子句等;select udaf1(s1), device_id from table1 group by device_id having udaf2(s1)>0 UDTF 可以像关系表一样在from子句中使用;select * from udtf('t1', bid);准备 UDF 实现的 JAR 包,其中包含 UDF 实现类,如org.apache.iotdb.udf.ScalarFunctionExample。
Jar 包的放置有两种方式:
ext/udf目录下。CREATE FUNCTION <UDF-NAME> AS <UDF-CLASS-FULL-PATHNAME> (USING URI URI-STRING)
-- 本地 CREATE FUNCTION contain_null AS 'org.apache.iotdb.udf.ScalarFunctionExample'; -- 远端 CREATE FUNCTION contain_null AS 'org.apache.iotdb.udf.ScalarFunctionExample' USING URI 'http://jar/example.jar'
注意:
SQL 语法如下:
DROP FUNCTION <UDF-NAME>
示例:卸载上述例子的 UDF:
DROP FUNCTION contain_null
SHOW FUNCTIONS
iotdb-system.properties 中配置 UDF Jar 文件的存储目录:# UDF lib dir udf_lib_dir=ext/udf
可以从 Maven 库 中搜索下面示例中的依赖,请注意选择和目标 IoTDB 服务器版本相同的依赖版本。
<dependency> <groupId>org.apache.iotdb</groupId> <artifactId>udf-api</artifactId> <version>2.0.0</version> <scope>provided</scope> </dependency>
编写一个 UDSF 需要实现org.apache.iotdb.udf.api.relational.ScalarFunction接口。
public interface ScalarFunction extends SQLFunction { /** * In this method, the user need to do the following things: * * <ul> * <li>Validate {@linkplain FunctionArguments}. Throw {@link UDFArgumentNotValidException} if * any parameter is not valid. * <li>Use {@linkplain FunctionArguments} to get input data types and infer output data type. * <li>Construct and return a {@linkplain ScalarFunctionAnalysis} object. * </ul> * * @param arguments arguments used to validate * @throws UDFArgumentNotValidException if any parameter is not valid * @return the analysis result of the scalar function */ ScalarFunctionAnalysis analyze(FunctionArguments arguments) throws UDFArgumentNotValidException; /** * This method is called after the ScalarFunction is instantiated and before the beginning of the * transformation process. This method is mainly used to initialize the resources used in * ScalarFunction. * * @param arguments used to parse the input arguments entered by the user * @throws UDFException the user can throw errors if necessary */ default void beforeStart(FunctionArguments arguments) throws UDFException { // do nothing } /** * This method will be called to process the transformation. In a single UDF query, this method * may be called multiple times. * * @param input original input data row * @throws UDFException the user can throw errors if necessary */ Object evaluate(Record input) throws UDFException; /** This method is mainly used to release the resources used in the ScalarFunction. */ default void beforeDestroy() { // do nothing } }
接口说明:
| 接口定义 | 描述 | 是否必须 |
|---|---|---|
ScalarFunctionAnalysis analyze(FunctionArguments arguments); | 1. 校验FunctionArguments中的输入列数、数据类型、系统参数等是否合法,不合法则抛出异常。2. 根据 FunctionArguments构造ScalarFunctionAnalysis,包括输出类型等信息。 | 是 |
void beforeStart(FunctionArguments arguments); | 在 UDSF 处理输入数据前,调用用户自定义的初始化行为 | 否 |
Object evaluate(Record input) throws UDFException; | UDSF 处理逻辑,根据一行输入数据,返回一行输出数据。 | 是 |
void beforeDestroy(); | UDSF 的结束方法,您可以在此方法中进行一些资源释放等的操作。此方法由框架调用。对于一个实例而言,生命周期中会且只会被调用一次,即在处理完最后一条记录之后被调用。 | 否 |
目前 ScalarFunctionAnalysis 中的字段:
| 字段类型 | 字段名称 | 默认值 |
|---|---|---|
| Type | outputDataType | 无 |
示例:UDSF 的实现示例,输入任意数据类型的任意多列,返回一个布尔值,代表该行输入是否包含 NULL 值。
一个完整的 UDAF 定义涉及到 State 和 AggregateFunction 两个类。
编写一个 State 类需要实现org.apache.iotdb.udf.api.State接口。
public interface State { /** Reset your state object to its initial state. */ void reset(); /** * Serialize your state into byte array. The order of serialization must be consistent with * deserialization. */ byte[] serialize(); /** * Deserialize byte array into your state. The order of deserialization must be consistent with * serialization. */ void deserialize(byte[] bytes); /** Destroy state. You may release previously binding resource in this method. */ default void destroyState() {} ; }
接口说明:
| 接口定义 | 描述 | 是否必须 |
|---|---|---|
void reset() | 将State对象重置为初始的状态,您需要像编写构造函数一样,在该方法内填入State类中各个字段的初始值。 | 是 |
byte[] serialize() | 将State序列化为二进制数据。该方法用于 IoTDB 内部的State对象传递,注意序列化的顺序必须和下面的反序列化方法一致。 | 是 |
void deserialize(byte[] bytes) | 将二进制数据反序列化为State。该方法用于 IoTDB 内部的State对象传递,注意反序列化的顺序必须和上面的序列化方法一致。 | 是 |
void destroyState() | 进行资源释放等的操作。此方法由框架调用。对于一个实例而言,生命周期中会且只会被调用一次。 | 否 |
编写一个 UDAF 需要实现 org.apache.iotdb.udf.api.relational.AggregateFunction接口。
public interface AggregateFunction extends SQLFunction { /** * In this method, the user need to do the following things: * * <ul> * <li>Validate {@linkplain FunctionArguments}. Throw {@link UDFArgumentNotValidException} if * any parameter is not valid. * <li>Use {@linkplain FunctionArguments} to get input data types and infer output data type. * <li>Construct and return a {@linkplain AggregateFunctionAnalysis} object. * </ul> * * @param arguments arguments used to validate * @throws UDFArgumentNotValidException if any parameter is not valid * @return the analysis result of the scalar function */ AggregateFunctionAnalysis analyze(FunctionArguments arguments) throws UDFArgumentNotValidException; /** * This method is called after the AggregateFunction is instantiated and before the beginning of * the transformation process. This method is mainly used to initialize the resources used in * AggregateFunction. * * @param arguments used to parse the input arguments entered by the user * @throws UDFException the user can throw errors if necessary */ default void beforeStart(FunctionArguments arguments) throws UDFException { // do nothing } /** Create and initialize state. You may bind some resource in this method. */ State createState(); /** * Update state with data columns. * * @param state state to be updated * @param input original input data row */ void addInput(State state, Record input); /** * Merge two state in execution engine. * * @param state current state * @param rhs right-hand-side state to be merged */ void combineState(State state, State rhs); /** * Calculate output value from final state * * @param state final state * @param resultValue used to collect output data points */ void outputFinal(State state, ResultValue resultValue); /** * Remove input data from state. This method is used to remove the data points that have been * added to the state. Once it is implemented, {@linkplain * AggregateFunctionAnalysis.Builder#removable(boolean)} should be set to true. * * @param state state to be updated * @param input row to be removed */ default void remove(State state, Record input) { throw new UnsupportedOperationException(); } /** This method is mainly used to release the resources used in the SQLFunction. */ default void beforeDestroy() { // do nothing } }
接口说明:
| 接口定义 | 描述 | 是否必须 |
|---|---|---|
AggregateFunctionAnalysis analyze(FunctionArguments arguments); | 1. 校验FunctionArguments中的输入列数、数据类型、系统参数等是否合法,不合法则抛出异常。2. 根据 FunctionArguments构造AggregateFunctionAnalysis,包括输出类型、removable 等信息。 | 是 |
void beforeStart(FunctionArguments arguments); | 在 UDAF 处理输入数据前,调用用户自定义的初始化行为 | 否 |
State createState(); | 创建State对象,一般只需要调用默认构造函数,然后按需修改默认的初始值即可。 | 是 |
void addInput(State state, Record input); | 更新State对象,将输入的一行 Record数据添加到聚合状态中。 | 是 |
void combineState(State state, State rhs); | 将rhs状态合并至state状态中。在分布式场景下,同一组的数据可能分布在不同节点上,IoTDB 会为每个节点上的部分数据生成一个State对象,然后调用该方法合并成完整的State。 | 是 |
void outputFinal(State state, ResultValue resultValue); | 根据State中的数据,计算出最终的聚合结果。注意根据聚合的语义,每一组只能输出一个值。 | 是 |
void remove(State state, Record input); | 更新State对象,将输入的一行 Record数据从聚合状态中剔除。实现该方法需要设置 AggregateFunctionAnalysis 中的 removable 字段为 true。 | 否 |
void beforeDestroy(); | UDSF 的结束方法,您可以在此方法中进行一些资源释放等的操作。此方法由框架调用。对于一个实例而言,生命周期中会且只会被调用一次,即在处理完最后一条记录之后被调用。 | 否 |
目前 AggregateFunctionAnalysis 中的字段:
| 字段类型 | 字段名称 | 默认值 |
|---|---|---|
| Type | outputDataType | 无 |
| boolean | removable | false |
示例:UDAF 的实现示例,计算不为 NULL 的行数。
表函数,也被称为表值函数(Table-Valued Function,TVF),不同于标量函数、聚合函数和窗口函数的返回值是一个“标量值”,表函数的返回值是一个“表”(结果集)。
表函数可以像关系表一样,以tableFunctionCall的形式在 SQL 查询的 FROM 子句中使用,支持传递参数,并根据参数动态生成结果集。
tableFunctionCall 的具体定义如下所示:
tableFunctionCall : qualifiedName '(' (tableFunctionArgument (',' tableFunctionArgument)*)?')' ; tableFunctionArgument : (identifier '=>')? (tableArgument | scalarArgument) ; tableArgument : tableArgumentRelation (PARTITION BY ('(' (expression (',' expression)*)? ')' | expression))? (ORDER BY ('(' sortItem (',' sortItem)* ')' | sortItem))? ; tableArgumentRelation : qualifiedName (AS? identifier columnAliases?)? #tableArgumentTable | '(' query ')' (AS? identifier columnAliases?)? #tableArgumentQuery ; scalarArgument : expression | timeDuration ;
例如:
// 这是从表函数中进行查询,传入一个字符串“t1”参数。 select * from tvf('t1'); // 这是从表函数中进行查询,传入一个字符串“t1”参数,一个 bid 表参数。 select * from tvf('t1', bid);
IoTDB 中的表函数为多态表值函数,支持参数类型如下所示:
| 参数类型 | 定义 | 示例 |
|---|---|---|
| 标量参数(Scalar Argument) | 必须是常量表达式,可以是任何的 SQL 数据类型,需要和声明的类型兼容。 | SIZE => 42,SIZE => '42',SIZE => 42.2,SIZE => 12h,SIZE => 60m |
| 表参数(Table Argument) | 可以是一个表名或一条查询语句。 | input => orders,data => (SELECT * FROM region, nation WHERE region.regionkey = nation.regionkey) |
表参数具有如下属性:
组语义与行语义
被声明为组语义(Set Semantic)的表参数意味着需要根据整个完整的分区才能得到结果集。
input => orders PARTITION BY device_id ORDER BY time
被声明为行语义(Row Semantics)的表参数意味着行与行之间没有依赖关系。不允许在调用时指定 PARTITION 或 ORDER,执行引擎会 row-by-row 地进行处理。
列穿透(Pass-through Columns)
| 传递方式 | 描述 | 示例 |
|---|---|---|
| 按名传递 | 1. 可以通过任意的顺序传递参数。 2. 被声明有默认值的参数可以被省略。 3. 参数名大小写不敏感。 | SELECT * FROM my_function(row_count => 100, column_count => 1); SELECT * FROM my_function(column_count => 1, row_count => 100); SELECT * FROM my_function(column_count => 1); |
| 按位置传递 | 1. 必须按照声明的顺序进行传递参数。 2. 如果余下的参数都有默认值,可以只传一部分参数。 | SELECT * FROM my_function(1, 100); SELECT * FROM my_function(1); |
注意: 以上两种方式不允许混用,否则在语义解析时候会抛出“All arguments must be passed by name or all must be passed positionally”异常。
表函数的结果集由以下两部分组成。
如果使用 Maven,可以参考示例项目udf-example。