触发器提供了一种侦听序列数据变动的机制。配合用户自定义逻辑,可完成告警、数据转发等功能。
触发器基于 Java 反射机制实现。用户通过简单实现 Java 接口,即可实现数据侦听。IoTDB 允许用户动态注册、卸载触发器,在注册、卸载期间,无需启停服务器。
IoTDB 的单个触发器可用于侦听符合特定模式的时间序列的数据变动,如时间序列 root.sg.a 上的数据变动,或者符合路径模式 root.**.a 的时间序列上的数据变动。您在注册触发器时可以通过 SQL 语句指定触发器侦听的路径模式。
目前触发器分为两类,您在注册触发器时可以通过 SQL 语句指定类型:
触发器的触发时机目前有两种,后续会拓展其它触发时机。您在注册触发器时可以通过 SQL 语句指定触发时机:
触发器的逻辑需要您编写 Java 类进行实现。 在编写触发器逻辑时,需要使用到下面展示的依赖。如果您使用 Maven,则可以直接从 Maven 库中搜索到它们。请注意选择和目标服务器版本相同的依赖版本。
<dependency> <groupId>org.apache.iotdb</groupId> <artifactId>iotdb-server</artifactId> <version>1.0.0</version> <scope>provided</scope> </dependency>
编写一个触发器需要实现 org.apache.iotdb.trigger.api.Trigger
类。
import org.apache.iotdb.trigger.api.enums.FailureStrategy; import org.apache.iotdb.tsfile.write.record.Tablet; public interface Trigger { /** * This method is mainly used to validate {@link TriggerAttributes} before calling {@link * Trigger#onCreate(TriggerAttributes)}. * * @param attributes TriggerAttributes * @throws Exception e */ default void validate(TriggerAttributes attributes) throws Exception {} /** * This method will be called when creating a trigger after validation. * * @param attributes TriggerAttributes * @throws Exception e */ default void onCreate(TriggerAttributes attributes) throws Exception {} /** * This method will be called when dropping a trigger. * * @throws Exception e */ default void onDrop() throws Exception {} /** * When restarting a DataNode, Triggers that have been registered will be restored and this method * will be called during the process of restoring. * * @throws Exception e */ default void restore() throws Exception {} /** * Overrides this method to set the expected FailureStrategy, {@link FailureStrategy#OPTIMISTIC} * is the default strategy. * * @return {@link FailureStrategy} */ default FailureStrategy getFailureStrategy() { return FailureStrategy.OPTIMISTIC; } /** * @param tablet see {@link Tablet} for detailed information of data structure. Data that is * inserted will be constructed as a Tablet and you can define process logic with {@link * Tablet}. * @return true if successfully fired * @throws Exception e */ default boolean fire(Tablet tablet) throws Exception { return true; } }
该类主要提供了两类编程接口:生命周期相关接口和数据变动侦听相关接口。该类中所有的接口都不是必须实现的,当您不实现它们时,它们不会对流经的数据操作产生任何响应。您可以根据实际需要,只实现其中若干接口。
下面是所有可供用户进行实现的接口的说明。
接口定义 | 描述 |
---|---|
default void validate(TriggerAttributes attributes) throws Exception {} | 用户在使用 CREATE TRIGGER 语句创建触发器时,可以指定触发器需要使用的参数,该接口会用于验证参数正确性。 |
default void onCreate(TriggerAttributes attributes) throws Exception {} | 当您使用CREATE TRIGGER 语句创建触发器后,该接口会被调用一次。在每一个触发器实例的生命周期内,该接口会且仅会被调用一次。该接口主要有如下作用:帮助用户解析 SQL 语句中的自定义属性(使用TriggerAttributes )。 可以创建或申请资源,如建立外部链接、打开文件等。 |
default void onDrop() throws Exception {} | 当您使用DROP TRIGGER 语句删除触发器后,该接口会被调用。在每一个触发器实例的生命周期内,该接口会且仅会被调用一次。该接口主要有如下作用:可以进行资源释放的操作。可以用于持久化触发器计算的结果。 |
default void restore() throws Exception {} | 当重启 DataNode 时,集群会恢复 DataNode 上已经注册的触发器实例,在此过程中会为该 DataNode 上的有状态触发器调用一次该接口。有状态触发器实例所在的 DataNode 宕机后,集群会在另一个可用 DataNode 上恢复该触发器的实例,在此过程中会调用一次该接口。该接口可以用于自定义恢复逻辑。 |
/** * @param tablet see {@link Tablet} for detailed information of data structure. Data that is * inserted will be constructed as a Tablet and you can define process logic with {@link * Tablet}. * @return true if successfully fired * @throws Exception e */ default boolean fire(Tablet tablet) throws Exception { return true; }
数据变动时,触发器以 Tablet 作为触发操作的单位。您可以通过 Tablet 获取相应序列的元数据和数据,然后进行相应的触发操作,触发成功则返回值应当为 true。该接口返回 false 或是抛出异常我们均认为触发失败。在触发失败时,我们会根据侦听策略接口进行相应的操作。
进行一次 INSERT 操作时,对于其中的每条时间序列,我们会检测是否有侦听该路径模式的触发器,然后将符合同一个触发器所侦听的路径模式的时间序列数据组装成一个新的 Tablet 用于触发器的 fire 接口。可以理解成:
Map<PartialPath, List<Trigger>> pathToTriggerListMap => Map<Trigger, Tablet>
请注意,目前我们不对触发器的触发顺序有任何保证。
下面是示例:
假设有三个触发器,触发器的触发时机均为 BEFORE INSERT
写入语句:
insert into root.sg(time, a, b) values (1, 1, 1);
序列 root.sg.a 匹配 Trigger1 和 Trigger2,序列 root.sg.b 匹配 Trigger1 和 Trigger3,那么:
在触发器触发失败时,我们会根据侦听策略接口设置的策略进行相应的操作,您可以通过下述接口设置 org.apache.iotdb.trigger.api.enums.FailureStrategy
,目前有乐观和悲观两种策略:
/** * Overrides this method to set the expected FailureStrategy, {@link FailureStrategy#OPTIMISTIC} * is the default strategy. * * @return {@link FailureStrategy} */ default FailureStrategy getFailureStrategy() { return FailureStrategy.OPTIMISTIC; }
您可以参考下图辅助理解,其中 Trigger1 配置采用乐观策略,Trigger2 配置采用悲观策略。Trigger1 和 Trigger2 的触发时机是 BEFORE INSERT,Trigger3 和 Trigger4 的触发时机是 AFTER INSERT。 正常执行流程如下:
如果您使用 Maven,可以参考我们编写的示例项目 trigger-example。您可以在 这里 找到它。后续我们会加入更多的示例项目供您参考。
下面是其中一个示例项目的代码:
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.iotdb.trigger; import org.apache.iotdb.db.engine.trigger.sink.alertmanager.AlertManagerConfiguration; import org.apache.iotdb.db.engine.trigger.sink.alertmanager.AlertManagerEvent; import org.apache.iotdb.db.engine.trigger.sink.alertmanager.AlertManagerHandler; import org.apache.iotdb.trigger.api.Trigger; import org.apache.iotdb.trigger.api.TriggerAttributes; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.write.record.Tablet; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.HashMap; import java.util.List; public class ClusterAlertingExample implements Trigger { private static final Logger LOGGER = LoggerFactory.getLogger(ClusterAlertingExample.class); private final AlertManagerHandler alertManagerHandler = new AlertManagerHandler(); private final AlertManagerConfiguration alertManagerConfiguration = new AlertManagerConfiguration("http://127.0.0.1:9093/api/v2/alerts"); private String alertname; private final HashMap<String, String> labels = new HashMap<>(); private final HashMap<String, String> annotations = new HashMap<>(); @Override public void onCreate(TriggerAttributes attributes) throws Exception { alertname = "alert_test"; labels.put("series", "root.ln.wf01.wt01.temperature"); labels.put("value", ""); labels.put("severity", ""); annotations.put("summary", "high temperature"); annotations.put("description", "{{.alertname}}: {{.series}} is {{.value}}"); alertManagerHandler.open(alertManagerConfiguration); } @Override public void onDrop() throws IOException { alertManagerHandler.close(); } @Override public boolean fire(Tablet tablet) throws Exception { List<MeasurementSchema> measurementSchemaList = tablet.getSchemas(); for (int i = 0, n = measurementSchemaList.size(); i < n; i++) { if (measurementSchemaList.get(i).getType().equals(TSDataType.DOUBLE)) { // for example, we only deal with the columns of Double type double[] values = (double[]) tablet.values[i]; for (double value : values) { if (value > 100.0) { LOGGER.info("trigger value > 100"); labels.put("value", String.valueOf(value)); labels.put("severity", "critical"); AlertManagerEvent alertManagerEvent = new AlertManagerEvent(alertname, labels, annotations); alertManagerHandler.onEvent(alertManagerEvent); } else if (value > 50.0) { LOGGER.info("trigger value > 50"); labels.put("value", String.valueOf(value)); labels.put("severity", "warning"); AlertManagerEvent alertManagerEvent = new AlertManagerEvent(alertname, labels, annotations); alertManagerHandler.onEvent(alertManagerEvent); } } } } return true; } }
您可以通过 SQL 语句注册和卸载一个触发器实例,您也可以通过 SQL 语句查询到所有已经注册的触发器。
我们建议您在注册触发器时停止写入。
触发器可以注册在任意路径模式上。被注册有触发器的序列将会被触发器侦听,当序列上有数据变动时,触发器中对应的触发方法将会被调用。
注册一个触发器可以按如下流程进行:
org.apache.iotdb.trigger.ClusterAlertingExample
validate
和 onCreate
接口,具体请参考编写触发器章节。完整 SQL 语法如下:
// Create Trigger createTrigger : CREATE triggerType TRIGGER triggerName=identifier triggerEventClause ON pathPattern AS className=STRING_LITERAL uriClause? triggerAttributeClause? ; triggerType : STATELESS | STATEFUL ; triggerEventClause : (BEFORE | AFTER) INSERT ; : USING URI uri ; uri : STRING_LITERAL ; triggerAttributeClause : WITH LR_BRACKET triggerAttribute (COMMA triggerAttribute)* RR_BRACKET ; triggerAttribute : key=attributeKey operator_eq value=attributeValue ;
下面对 SQL 语法进行说明,您可以结合使用说明章节进行理解:
下面是一个帮助您理解的 SQL 语句示例:
CREATE STATELESS TRIGGER triggerTest BEFORE INSERT ON root.sg.** AS 'org.apache.iotdb.trigger.ClusterAlertingExample' USING URI 'http://jar/ClusterAlertingExample.jar' WITH ( "name" = "trigger", "limit" = "100" )
上述 SQL 语句创建了一个名为 triggerTest 的触发器:
可以通过指定触发器 ID 的方式卸载触发器,卸载触发器的过程中会且仅会调用一次触发器的 onDrop
接口。
卸载触发器的 SQL 语法如下:
// Drop Trigger dropTrigger : DROP TRIGGER triggerName=identifier ;
下面是示例语句:
DROP TRIGGER triggerTest1
上述语句将会卸载 ID 为 triggerTest1 的触发器。
可以通过 SQL 语句查询集群中存在的触发器的信息。SQL 语法如下:
SHOW TRIGGERS
该语句的结果集格式如下:
TriggerName | Event | Type | State | PathPattern | ClassName | NodeId |
---|---|---|---|---|---|---|
triggerTest1 | BEFORE_INSERT / AFTER_INSERT | STATELESS / STATEFUL | INACTIVE / ACTIVE / DROPPING / TRANSFFERING | root.** | org.apache.iotdb.trigger.TriggerExample | ALL(STATELESS) / DATA_NODE_ID(STATEFUL) |
在集群中注册以及卸载触发器的过程中,我们维护了触发器的状态,下面是对这些状态的说明:
状态 | 描述 | 是否建议写入进行 |
---|---|---|
INACTIVE | 执行 CREATE TRIGGER 的中间状态,集群刚在 ConfigNode 上记录该触发器的信息,还未在任何 DataNode 上激活该触发器 | 否 |
ACTIVE | 执行 CREATE TRIGGE 成功后的状态,集群所有 DataNode 上的该触发器都已经可用 | 是 |
DROPPING | 执行 DROP TRIGGER 的中间状态,集群正处在卸载该触发器的过程中 | 否 |
TRANSFERRING | 集群正在进行该触发器实例位置的迁移 | 否 |
config_node_ratis_log_appender_buffer_size_max
, 2G),其中 config_node_ratis_log_appender_buffer_size_max
是一个配置项,具体含义可以参考 IOTDB 配置项说明。org.apache.iotdb.trigger.example.AlertListener
类,当 CREATE TRIGGER
使用到这个类时,系统会随机加载其中一个 JAR 包中的类,最终导致触发器执行行为不一致以及其他的问题。配置项 | 含义 |
---|---|
trigger_lib_dir | 保存触发器 jar 包的目录位置 |
stateful_trigger_retry_num_when_not_found | 有状态触发器触发无法找到触发器实例时的重试次数 |
连续查询(Continuous queries, aka CQ) 是对实时数据周期性地自动执行的查询,并将查询结果写入指定的时间序列中。
用户可以通过连续查询实现滑动窗口流式计算,如计算某个序列每小时平均温度,并写入一个新序列中。用户可以自定义 RESAMPLE
子句去创建不同的滑动窗口,可以实现对于乱序数据一定程度的容忍。
CREATE (CONTINUOUS QUERY | CQ) <cq_id> [RESAMPLE [EVERY <every_interval>] [BOUNDARY <execution_boundary_time>] [RANGE <start_time_offset>[, end_time_offset]] ] [TIMEOUT POLICY BLOCKED|DISCARD] BEGIN SELECT CLAUSE INTO CLAUSE FROM CLAUSE [WHERE CLAUSE] [GROUP BY(<group_by_interval>[, <sliding_step>]) [, level = <level>]] [HAVING CLAUSE] [FILL {PREVIOUS | LINEAR | constant}] [LIMIT rowLimit OFFSET rowOffset] [ALIGN BY DEVICE] END
注意:
- 如果where子句中出现任何时间过滤条件,IoTDB将会抛出异常,因为IoTDB会自动为每次查询执行指定时间范围。
- GROUP BY TIME CLAUSE在连续查询中的语法稍有不同,它不能包含原来的第一个参数,即 [start_time, end_time),IoTDB会自动填充这个缺失的参数。如果指定,IoTDB将会抛出异常。
- 如果连续查询中既没有GROUP BY TIME子句,也没有指定EVERY子句,IoTDB将会抛出异常。
<cq_id>
为连续查询指定一个全局唯一的标识。<every_interval>
指定了连续查询周期性执行的间隔。现在支持的时间单位有:ns, us, ms, s, m, h, d, w, 并且它的值不能小于用户在iotdb-system.properties
配置文件中指定的continuous_query_min_every_interval
。这是一个可选参数,默认等于group by子句中的group_by_interval
。<start_time_offset>
指定了每次查询执行窗口的开始时间,即now()-<start_time_offset>
。现在支持的时间单位有:ns, us, ms, s, m, h, d, w。这是一个可选参数,默认等于EVERY
子句中的every_interval
。<end_time_offset>
指定了每次查询执行窗口的结束时间,即now()-<end_time_offset>
。现在支持的时间单位有:ns, us, ms, s, m, h, d, w。这是一个可选参数,默认等于0
.<execution_boundary_time>
表示用户期待的连续查询的首个周期任务的执行时间。(因为连续查询只会对当前实时的数据流做计算,所以该连续查询实际首个周期任务的执行时间并不一定等于用户指定的时间,具体计算逻辑如下所示)<execution_boundary_time>
可以早于、等于或者迟于当前时间。0
。<execution_boundary_time> - <start_time_offset>
.<execution_boundary_time> - <end_time_offset>
.[<execution_boundary_time> - <start_time_offset> + (i - 1) * <every_interval>, <execution_boundary_time> - <end_time_offset> + (i - 1) * <every_interval>)
。execution_boundary_time
.execution_boundary_time
,那么连续查询的首个周期任务的执行时间就是execution_boundary_time + i * <every_interval>
中第一个大于或等于当前时间的值。
- <every_interval>,<start_time_offset> 和 <group_by_interval> 都应该大于 0
- <group_by_interval>应该小于等于<start_time_offset>
- 用户应该根据实际需求,为<start_time_offset> 和 <every_interval> 指定合适的值
- 如果<start_time_offset>大于<every_interval>,在每一次查询执行的时间窗口上会有部分重叠
- 如果<start_time_offset>小于<every_interval>,在连续的两次查询执行的时间窗口中间将会有未覆盖的时间范围
- start_time_offset 应该大于end_time_offset
<start_time_offset>
等于<every_interval>
<start_time_offset>
大于<every_interval>
<start_time_offset>
小于<every_interval>
<every_interval>
不为0TIMEOUT POLICY
指定了我们如何处理“前一个时间窗口还未执行完时,下一个窗口的执行时间已经到达的场景,默认值是BLOCKED
.BLOCKED
意味着即使下一个窗口的执行时间已经到达,我们依旧需要阻塞等待前一个时间窗口的查询执行完再开始执行下一个窗口。如果使用BLOCKED
策略,所有的时间窗口都将会被依此执行,但是如果遇到执行查询的时间长于周期性间隔时,连续查询的结果会迟于最新的时间窗口范围。DISCARD
意味着如果前一个时间窗口还未执行完,我们会直接丢弃下一个窗口的执行时间。如果使用DISCARD
策略,可能会有部分时间窗口得不到执行。但是一旦前一个查询执行完后,它将会使用最新的时间窗口,所以它的执行结果总能赶上最新的时间窗口范围,当然是以部分时间窗口得不到执行为代价。下面是用例数据,这是一个实时的数据流,我们假设数据都按时到达。
+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+ | Time|root.ln.wf02.wt02.temperature|root.ln.wf02.wt01.temperature|root.ln.wf01.wt02.temperature|root.ln.wf01.wt01.temperature| +-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+ |2021-05-11T22:18:14.598+08:00| 121.0| 72.0| 183.0| 115.0| |2021-05-11T22:18:19.941+08:00| 0.0| 68.0| 68.0| 103.0| |2021-05-11T22:18:24.949+08:00| 122.0| 45.0| 11.0| 14.0| |2021-05-11T22:18:29.967+08:00| 47.0| 14.0| 59.0| 181.0| |2021-05-11T22:18:34.979+08:00| 182.0| 113.0| 29.0| 180.0| |2021-05-11T22:18:39.990+08:00| 42.0| 11.0| 52.0| 19.0| |2021-05-11T22:18:44.995+08:00| 78.0| 38.0| 123.0| 52.0| |2021-05-11T22:18:49.999+08:00| 137.0| 172.0| 135.0| 193.0| |2021-05-11T22:18:55.003+08:00| 16.0| 124.0| 183.0| 18.0| +-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+
在RESAMPLE
子句中使用EVERY
参数指定连续查询的执行间隔,如果没有指定,默认等于group_by_interval
。
CREATE CONTINUOUS QUERY cq1 RESAMPLE EVERY 20s BEGIN SELECT max_value(temperature) INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max) FROM root.ln.*.* GROUP BY(10s) END
cq1
计算出temperature
传感器每10秒的平均值,并且将查询结果存储在temperature_max
传感器下,传感器路径前缀使用跟原来一样的前缀。
cq1
每20秒执行一次,每次执行的查询的时间窗口范围是从过去20秒到当前时间。
假设当前时间是2021-05-11T22:18:40.000+08:00
,如果把日志等级设置为DEBUG,我们可以在cq1
执行的DataNode上看到如下的输出:
At **2021-05-11T22:18:40.000+08:00**, `cq1` executes a query within the time range `[2021-05-11T22:18:20, 2021-05-11T22:18:40)`. `cq1` generate 2 lines: > +-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+ | Time|root.ln.wf02.wt02.temperature_max|root.ln.wf02.wt01.temperature_max|root.ln.wf01.wt02.temperature_max|root.ln.wf01.wt01.temperature_max| +-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+ |2021-05-11T22:18:20.000+08:00| 122.0| 45.0| 59.0| 181.0| |2021-05-11T22:18:30.000+08:00| 182.0| 113.0| 52.0| 180.0| +-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+ > At **2021-05-11T22:19:00.000+08:00**, `cq1` executes a query within the time range `[2021-05-11T22:18:40, 2021-05-11T22:19:00)`. `cq1` generate 2 lines: > +-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+ | Time|root.ln.wf02.wt02.temperature_max|root.ln.wf02.wt01.temperature_max|root.ln.wf01.wt02.temperature_max|root.ln.wf01.wt01.temperature_max| +-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+ |2021-05-11T22:18:40.000+08:00| 137.0| 172.0| 135.0| 193.0| |2021-05-11T22:18:50.000+08:00| 16.0| 124.0| 183.0| 18.0| +-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+ >
cq1
并不会处理当前时间窗口以外的数据,即2021-05-11T22:18:20.000+08:00
以前的数据,所以我们会得到如下结果:
> SELECT temperature_max from root.ln.*.*; +-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+ | Time|root.ln.wf02.wt02.temperature_max|root.ln.wf02.wt01.temperature_max|root.ln.wf01.wt02.temperature_max|root.ln.wf01.wt01.temperature_max| +-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+ |2021-05-11T22:18:20.000+08:00| 122.0| 45.0| 59.0| 181.0| |2021-05-11T22:18:30.000+08:00| 182.0| 113.0| 52.0| 180.0| |2021-05-11T22:18:40.000+08:00| 137.0| 172.0| 135.0| 193.0| |2021-05-11T22:18:50.000+08:00| 16.0| 124.0| 183.0| 18.0| +-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
使用RANGE
子句中的start_time_offset
参数指定连续查询每次执行的时间窗口的开始时间偏移,如果没有指定,默认值等于EVERY
参数。
CREATE CONTINUOUS QUERY cq2 RESAMPLE RANGE 40s BEGIN SELECT max_value(temperature) INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max) FROM root.ln.*.* GROUP BY(10s) END
cq2
计算出temperature
传感器每10秒的平均值,并且将查询结果存储在temperature_max
传感器下,传感器路径前缀使用跟原来一样的前缀。
cq2
每10秒执行一次,每次执行的查询的时间窗口范围是从过去40秒到当前时间。
假设当前时间是2021-05-11T22:18:40.000+08:00
,如果把日志等级设置为DEBUG,我们可以在cq2
执行的DataNode上看到如下的输出:
At **2021-05-11T22:18:40.000+08:00**, `cq2` executes a query within the time range `[2021-05-11T22:18:00, 2021-05-11T22:18:40)`. `cq2` generate 4 lines: > +-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+ | Time|root.ln.wf02.wt02.temperature_max|root.ln.wf02.wt01.temperature_max|root.ln.wf01.wt02.temperature_max|root.ln.wf01.wt01.temperature_max| +-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+ |2021-05-11T22:18:00.000+08:00| NULL| NULL| NULL| NULL| |2021-05-11T22:18:10.000+08:00| 121.0| 72.0| 183.0| 115.0| |2021-05-11T22:18:20.000+08:00| 122.0| 45.0| 59.0| 181.0| |2021-05-11T22:18:30.000+08:00| 182.0| 113.0| 52.0| 180.0| +-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+ > At **2021-05-11T22:18:50.000+08:00**, `cq2` executes a query within the time range `[2021-05-11T22:18:10, 2021-05-11T22:18:50)`. `cq2` generate 4 lines: > +-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+ | Time|root.ln.wf02.wt02.temperature_max|root.ln.wf02.wt01.temperature_max|root.ln.wf01.wt02.temperature_max|root.ln.wf01.wt01.temperature_max| +-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+ |2021-05-11T22:18:10.000+08:00| 121.0| 72.0| 183.0| 115.0| |2021-05-11T22:18:20.000+08:00| 122.0| 45.0| 59.0| 181.0| |2021-05-11T22:18:30.000+08:00| 182.0| 113.0| 52.0| 180.0| |2021-05-11T22:18:40.000+08:00| 137.0| 172.0| 135.0| 193.0| +-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+ > At **2021-05-11T22:19:00.000+08:00**, `cq2` executes a query within the time range `[2021-05-11T22:18:20, 2021-05-11T22:19:00)`. `cq2` generate 4 lines: > +-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+ | Time|root.ln.wf02.wt02.temperature_max|root.ln.wf02.wt01.temperature_max|root.ln.wf01.wt02.temperature_max|root.ln.wf01.wt01.temperature_max| +-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+ |2021-05-11T22:18:20.000+08:00| 122.0| 45.0| 59.0| 181.0| |2021-05-11T22:18:30.000+08:00| 182.0| 113.0| 52.0| 180.0| |2021-05-11T22:18:40.000+08:00| 137.0| 172.0| 135.0| 193.0| |2021-05-11T22:18:50.000+08:00| 16.0| 124.0| 183.0| 18.0| +-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+ >
cq2
并不会写入全是null值的行,值得注意的是cq2
会多次计算某些区间的聚合值,下面是计算结果:
> SELECT temperature_max from root.ln.*.*; +-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+ | Time|root.ln.wf02.wt02.temperature_max|root.ln.wf02.wt01.temperature_max|root.ln.wf01.wt02.temperature_max|root.ln.wf01.wt01.temperature_max| +-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+ |2021-05-11T22:18:10.000+08:00| 121.0| 72.0| 183.0| 115.0| |2021-05-11T22:18:20.000+08:00| 122.0| 45.0| 59.0| 181.0| |2021-05-11T22:18:30.000+08:00| 182.0| 113.0| 52.0| 180.0| |2021-05-11T22:18:40.000+08:00| 137.0| 172.0| 135.0| 193.0| |2021-05-11T22:18:50.000+08:00| 16.0| 124.0| 183.0| 18.0| +-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
使用RESAMPLE
子句中的EVERY
参数和RANGE
参数分别指定连续查询的执行间隔和窗口大小。并且使用fill()
来填充没有值的时间区间。
CREATE CONTINUOUS QUERY cq3 RESAMPLE EVERY 20s RANGE 40s BEGIN SELECT max_value(temperature) INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max) FROM root.ln.*.* GROUP BY(10s) FILL(100.0) END
cq3
计算出temperature
传感器每10秒的平均值,并且将查询结果存储在temperature_max
传感器下,传感器路径前缀使用跟原来一样的前缀。如果某些区间没有值,用100.0
填充。
cq3
每20秒执行一次,每次执行的查询的时间窗口范围是从过去40秒到当前时间。
假设当前时间是2021-05-11T22:18:40.000+08:00
,如果把日志等级设置为DEBUG,我们可以在cq3
执行的DataNode上看到如下的输出:
At **2021-05-11T22:18:40.000+08:00**, `cq3` executes a query within the time range `[2021-05-11T22:18:00, 2021-05-11T22:18:40)`. `cq3` generate 4 lines: > +-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+ | Time|root.ln.wf02.wt02.temperature_max|root.ln.wf02.wt01.temperature_max|root.ln.wf01.wt02.temperature_max|root.ln.wf01.wt01.temperature_max| +-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+ |2021-05-11T22:18:00.000+08:00| 100.0| 100.0| 100.0| 100.0| |2021-05-11T22:18:10.000+08:00| 121.0| 72.0| 183.0| 115.0| |2021-05-11T22:18:20.000+08:00| 122.0| 45.0| 59.0| 181.0| |2021-05-11T22:18:30.000+08:00| 182.0| 113.0| 52.0| 180.0| +-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+ > At **2021-05-11T22:19:00.000+08:00**, `cq3` executes a query within the time range `[2021-05-11T22:18:20, 2021-05-11T22:19:00)`. `cq3` generate 4 lines: > +-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+ | Time|root.ln.wf02.wt02.temperature_max|root.ln.wf02.wt01.temperature_max|root.ln.wf01.wt02.temperature_max|root.ln.wf01.wt01.temperature_max| +-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+ |2021-05-11T22:18:20.000+08:00| 122.0| 45.0| 59.0| 181.0| |2021-05-11T22:18:30.000+08:00| 182.0| 113.0| 52.0| 180.0| |2021-05-11T22:18:40.000+08:00| 137.0| 172.0| 135.0| 193.0| |2021-05-11T22:18:50.000+08:00| 16.0| 124.0| 183.0| 18.0| +-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+ >
值得注意的是cq3
会多次计算某些区间的聚合值,下面是计算结果:
> SELECT temperature_max from root.ln.*.*; +-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+ | Time|root.ln.wf02.wt02.temperature_max|root.ln.wf02.wt01.temperature_max|root.ln.wf01.wt02.temperature_max|root.ln.wf01.wt01.temperature_max| +-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+ |2021-05-11T22:18:00.000+08:00| 100.0| 100.0| 100.0| 100.0| |2021-05-11T22:18:10.000+08:00| 121.0| 72.0| 183.0| 115.0| |2021-05-11T22:18:20.000+08:00| 122.0| 45.0| 59.0| 181.0| |2021-05-11T22:18:30.000+08:00| 182.0| 113.0| 52.0| 180.0| |2021-05-11T22:18:40.000+08:00| 137.0| 172.0| 135.0| 193.0| |2021-05-11T22:18:50.000+08:00| 16.0| 124.0| 183.0| 18.0| +-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
使用RESAMPLE
子句中的EVERY
参数和RANGE
参数分别指定连续查询的执行间隔和窗口大小。并且使用fill()
来填充没有值的时间区间。
CREATE CONTINUOUS QUERY cq4 RESAMPLE EVERY 20s RANGE 40s, 20s BEGIN SELECT max_value(temperature) INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max) FROM root.ln.*.* GROUP BY(10s) FILL(100.0) END
cq4
计算出temperature
传感器每10秒的平均值,并且将查询结果存储在temperature_max
传感器下,传感器路径前缀使用跟原来一样的前缀。如果某些区间没有值,用100.0
填充。
cq4
每20秒执行一次,每次执行的查询的时间窗口范围是从过去40秒到过去20秒。
假设当前时间是2021-05-11T22:18:40.000+08:00
,如果把日志等级设置为DEBUG,我们可以在cq4
执行的DataNode上看到如下的输出:
At **2021-05-11T22:18:40.000+08:00**, `cq4` executes a query within the time range `[2021-05-11T22:18:00, 2021-05-11T22:18:20)`. `cq4` generate 2 lines: > +-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+ | Time|root.ln.wf02.wt02.temperature_max|root.ln.wf02.wt01.temperature_max|root.ln.wf01.wt02.temperature_max|root.ln.wf01.wt01.temperature_max| +-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+ |2021-05-11T22:18:00.000+08:00| 100.0| 100.0| 100.0| 100.0| |2021-05-11T22:18:10.000+08:00| 121.0| 72.0| 183.0| 115.0| +-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+ > At **2021-05-11T22:19:00.000+08:00**, `cq4` executes a query within the time range `[2021-05-11T22:18:20, 2021-05-11T22:18:40)`. `cq4` generate 2 lines: > +-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+ | Time|root.ln.wf02.wt02.temperature_max|root.ln.wf02.wt01.temperature_max|root.ln.wf01.wt02.temperature_max|root.ln.wf01.wt01.temperature_max| +-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+ |2021-05-11T22:18:20.000+08:00| 122.0| 45.0| 59.0| 181.0| |2021-05-11T22:18:30.000+08:00| 182.0| 113.0| 52.0| 180.0| +-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+ >
值得注意的是cq4
只会计算每个聚合区间一次,并且每次开始执行计算的时间都会比当前的时间窗口结束时间迟20s, 下面是计算结果:
> SELECT temperature_max from root.ln.*.*; +-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+ | Time|root.ln.wf02.wt02.temperature_max|root.ln.wf02.wt01.temperature_max|root.ln.wf01.wt02.temperature_max|root.ln.wf01.wt01.temperature_max| +-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+ |2021-05-11T22:18:00.000+08:00| 100.0| 100.0| 100.0| 100.0| |2021-05-11T22:18:10.000+08:00| 121.0| 72.0| 183.0| 115.0| |2021-05-11T22:18:20.000+08:00| 122.0| 45.0| 59.0| 181.0| |2021-05-11T22:18:30.000+08:00| 182.0| 113.0| 52.0| 180.0| +-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
不使用GROUP BY TIME
子句,并在RESAMPLE
子句中显式使用EVERY
参数指定连续查询的执行间隔。
CREATE CONTINUOUS QUERY cq5 RESAMPLE EVERY 20s BEGIN SELECT temperature + 1 INTO root.precalculated_sg.::(temperature) FROM root.ln.*.* align by device END
cq5
计算以root.ln
为前缀的所有temperature + 1
的值,并将结果储存在另一个 database root.precalculated_sg
中。除 database 名称不同外,目标序列与源序列路径名均相同。
cq5
每20秒执行一次,每次执行的查询的时间窗口范围是从过去20秒到当前时间。
假设当前时间是2021-05-11T22:18:40.000+08:00
,如果把日志等级设置为DEBUG,我们可以在cq5
执行的DataNode上看到如下的输出:
At **2021-05-11T22:18:40.000+08:00**, `cq5` executes a query within the time range `[2021-05-11T22:18:20, 2021-05-11T22:18:40)`. `cq5` generate 16 lines: > +-----------------------------+-------------------------------+-----------+ | Time| Device|temperature| +-----------------------------+-------------------------------+-----------+ |2021-05-11T22:18:24.949+08:00|root.precalculated_sg.wf02.wt02| 123.0| |2021-05-11T22:18:29.967+08:00|root.precalculated_sg.wf02.wt02| 48.0| |2021-05-11T22:18:34.979+08:00|root.precalculated_sg.wf02.wt02| 183.0| |2021-05-11T22:18:39.990+08:00|root.precalculated_sg.wf02.wt02| 45.0| |2021-05-11T22:18:24.949+08:00|root.precalculated_sg.wf02.wt01| 46.0| |2021-05-11T22:18:29.967+08:00|root.precalculated_sg.wf02.wt01| 15.0| |2021-05-11T22:18:34.979+08:00|root.precalculated_sg.wf02.wt01| 114.0| |2021-05-11T22:18:39.990+08:00|root.precalculated_sg.wf02.wt01| 12.0| |2021-05-11T22:18:24.949+08:00|root.precalculated_sg.wf01.wt02| 12.0| |2021-05-11T22:18:29.967+08:00|root.precalculated_sg.wf01.wt02| 60.0| |2021-05-11T22:18:34.979+08:00|root.precalculated_sg.wf01.wt02| 30.0| |2021-05-11T22:18:39.990+08:00|root.precalculated_sg.wf01.wt02| 53.0| |2021-05-11T22:18:24.949+08:00|root.precalculated_sg.wf01.wt01| 15.0| |2021-05-11T22:18:29.967+08:00|root.precalculated_sg.wf01.wt01| 182.0| |2021-05-11T22:18:34.979+08:00|root.precalculated_sg.wf01.wt01| 181.0| |2021-05-11T22:18:39.990+08:00|root.precalculated_sg.wf01.wt01| 20.0| +-----------------------------+-------------------------------+-----------+ > At **2021-05-11T22:19:00.000+08:00**, `cq5` executes a query within the time range `[2021-05-11T22:18:40, 2021-05-11T22:19:00)`. `cq5` generate 12 lines: > +-----------------------------+-------------------------------+-----------+ | Time| Device|temperature| +-----------------------------+-------------------------------+-----------+ |2021-05-11T22:18:44.995+08:00|root.precalculated_sg.wf02.wt02| 79.0| |2021-05-11T22:18:49.999+08:00|root.precalculated_sg.wf02.wt02| 138.0| |2021-05-11T22:18:55.003+08:00|root.precalculated_sg.wf02.wt02| 17.0| |2021-05-11T22:18:44.995+08:00|root.precalculated_sg.wf02.wt01| 39.0| |2021-05-11T22:18:49.999+08:00|root.precalculated_sg.wf02.wt01| 173.0| |2021-05-11T22:18:55.003+08:00|root.precalculated_sg.wf02.wt01| 125.0| |2021-05-11T22:18:44.995+08:00|root.precalculated_sg.wf01.wt02| 124.0| |2021-05-11T22:18:49.999+08:00|root.precalculated_sg.wf01.wt02| 136.0| |2021-05-11T22:18:55.003+08:00|root.precalculated_sg.wf01.wt02| 184.0| |2021-05-11T22:18:44.995+08:00|root.precalculated_sg.wf01.wt01| 53.0| |2021-05-11T22:18:49.999+08:00|root.precalculated_sg.wf01.wt01| 194.0| |2021-05-11T22:18:55.003+08:00|root.precalculated_sg.wf01.wt01| 19.0| +-----------------------------+-------------------------------+-----------+ >
cq5
并不会处理当前时间窗口以外的数据,即2021-05-11T22:18:20.000+08:00
以前的数据,所以我们会得到如下结果:
> SELECT temperature from root.precalculated_sg.*.* align by device; +-----------------------------+-------------------------------+-----------+ | Time| Device|temperature| +-----------------------------+-------------------------------+-----------+ |2021-05-11T22:18:24.949+08:00|root.precalculated_sg.wf02.wt02| 123.0| |2021-05-11T22:18:29.967+08:00|root.precalculated_sg.wf02.wt02| 48.0| |2021-05-11T22:18:34.979+08:00|root.precalculated_sg.wf02.wt02| 183.0| |2021-05-11T22:18:39.990+08:00|root.precalculated_sg.wf02.wt02| 45.0| |2021-05-11T22:18:44.995+08:00|root.precalculated_sg.wf02.wt02| 79.0| |2021-05-11T22:18:49.999+08:00|root.precalculated_sg.wf02.wt02| 138.0| |2021-05-11T22:18:55.003+08:00|root.precalculated_sg.wf02.wt02| 17.0| |2021-05-11T22:18:24.949+08:00|root.precalculated_sg.wf02.wt01| 46.0| |2021-05-11T22:18:29.967+08:00|root.precalculated_sg.wf02.wt01| 15.0| |2021-05-11T22:18:34.979+08:00|root.precalculated_sg.wf02.wt01| 114.0| |2021-05-11T22:18:39.990+08:00|root.precalculated_sg.wf02.wt01| 12.0| |2021-05-11T22:18:44.995+08:00|root.precalculated_sg.wf02.wt01| 39.0| |2021-05-11T22:18:49.999+08:00|root.precalculated_sg.wf02.wt01| 173.0| |2021-05-11T22:18:55.003+08:00|root.precalculated_sg.wf02.wt01| 125.0| |2021-05-11T22:18:24.949+08:00|root.precalculated_sg.wf01.wt02| 12.0| |2021-05-11T22:18:29.967+08:00|root.precalculated_sg.wf01.wt02| 60.0| |2021-05-11T22:18:34.979+08:00|root.precalculated_sg.wf01.wt02| 30.0| |2021-05-11T22:18:39.990+08:00|root.precalculated_sg.wf01.wt02| 53.0| |2021-05-11T22:18:44.995+08:00|root.precalculated_sg.wf01.wt02| 124.0| |2021-05-11T22:18:49.999+08:00|root.precalculated_sg.wf01.wt02| 136.0| |2021-05-11T22:18:55.003+08:00|root.precalculated_sg.wf01.wt02| 184.0| |2021-05-11T22:18:24.949+08:00|root.precalculated_sg.wf01.wt01| 15.0| |2021-05-11T22:18:29.967+08:00|root.precalculated_sg.wf01.wt01| 182.0| |2021-05-11T22:18:34.979+08:00|root.precalculated_sg.wf01.wt01| 181.0| |2021-05-11T22:18:39.990+08:00|root.precalculated_sg.wf01.wt01| 20.0| |2021-05-11T22:18:44.995+08:00|root.precalculated_sg.wf01.wt01| 53.0| |2021-05-11T22:18:49.999+08:00|root.precalculated_sg.wf01.wt01| 194.0| |2021-05-11T22:18:55.003+08:00|root.precalculated_sg.wf01.wt01| 19.0| +-----------------------------+-------------------------------+-----------+
展示集群中所有的已注册的连续查询
SHOW (CONTINUOUS QUERIES | CQS)
SHOW (CONTINUOUS QUERIES | CQS)
会将结果集按照cq_id
排序。
SHOW CONTINUOUS QUERIES;
执行以上sql,我们将会得到如下的查询结果:
cq_id | query | state |
---|---|---|
s1_count_cq | CREATE CQ s1_count_cq BEGIN SELECT count(s1) INTO root.sg_count.d.count_s1 FROM root.sg.d GROUP BY(30m) END | active |
删除指定的名为cq_id的连续查询:
DROP (CONTINUOUS QUERY | CQ) <cq_id>
DROP CQ并不会返回任何结果集。
删除名为s1_count_cq的连续查询:
DROP CONTINUOUS QUERY s1_count_cq;
目前连续查询一旦被创建就不能再被修改。如果想要修改某个连续查询,只能先用DROP
命令删除它,然后再用CREATE
命令重新创建。
可以使用连续查询,定期将高频率采样的原始数据(如每秒1000个点),降采样(如每秒仅保留一个点)后保存到另一个 database 的同名序列中。高精度的原始数据所在 database 的TTL
可能设置的比较短,比如一天,而低精度的降采样后的数据所在的 database TTL
可以设置的比较长,比如一个月,从而达到快速释放磁盘空间的目的。
我们可以通过连续查询对一些重复的查询进行预计算,并将查询结果保存在某些目标序列中,这样真实查询并不需要真的再次去做计算,而是直接查询目标序列的结果,从而缩短了查询的时间。
预计算查询结果尤其对一些可视化工具渲染时序图和工作台时有很大的加速作用。
IoTDB现在不支持子查询,但是我们可以通过创建连续查询得到相似的功能。我们可以将子查询注册为一个连续查询,并将子查询的结果物化到目标序列中,外层查询再直接查询哪个目标序列。
IoTDB并不会接收如下的嵌套子查询。这个查询会计算s1序列每隔30分钟的非空值数量的平均值:
SELECT avg(count_s1) from (select count(s1) as count_s1 from root.sg.d group by([0, now()), 30m));
为了得到相同的结果,我们可以:
1. 创建一个连续查询
这一步执行内层子查询部分。下面创建的连续查询每隔30分钟计算一次root.sg.d.s1
序列的非空值数量,并将结果写入目标序列root.sg_count.d.count_s1
中。
CREATE CQ s1_count_cq BEGIN SELECT count(s1) INTO root.sg_count.d.count_s1 FROM root.sg.d GROUP BY(30m) END
2. 查询连续查询的结果
这一步执行外层查询的avg([...])部分。
查询序列root.sg_count.d.count_s1
的值,并计算平均值:
SELECT avg(count_s1) from root.sg_count.d;
参数名 | 描述 | 类型 | 默认值 |
---|---|---|---|
continuous_query_submit_thread | 用于周期性提交连续查询执行任务的线程数 | int32 | 2 |
continuous_query_min_every_interval_in_ms | 系统允许的连续查询最小的周期性时间间隔 | duration | 1000 |
UDF(User Defined Function)即用户自定义函数。IoTDB 提供多种内建函数来满足您的计算需求,同时您还可以通过创建自定义函数来满足更多的计算需求。
根据此文档,您将会很快学会 UDF 的编写、注册、使用等操作。
IoTDB 支持两种类型的 UDF 函数,如下表所示。
UDF 分类 | 描述 |
---|---|
UDTF(User Defined Timeseries Generating Function) | 自定义时间序列生成函数。该类函数允许接收多条时间序列,最终会输出一条时间序列,生成的时间序列可以有任意多数量的数据点。 |
UDAF(User Defined Aggregation Function) | 自定义聚合函数。该类函数接受一条时间序列数据,最终会根据用户指定的 GROUP BY 类型,为每个组生成一个聚合后的数据点。 |
如果您使用 Maven ,可以从 Maven 库 中搜索下面示例中的依赖。请注意选择和目标 IoTDB 服务器版本相同的依赖版本。
<dependency> <groupId>org.apache.iotdb</groupId> <artifactId>udf-api</artifactId> <version>1.0.0</version> <scope>provided</scope> </dependency>
编写一个 UDTF 需要继承org.apache.iotdb.udf.api.UDTF
类,并至少实现beforeStart
方法和一种transform
方法。
下表是所有可供用户实现的接口说明。
接口定义 | 描述 | 是否必须 |
---|---|---|
void validate(UDFParameterValidator validator) throws Exception | 在初始化方法beforeStart 调用前执行,用于检测UDFParameters 中用户输入的参数是否合法。 | 否 |
void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) throws Exception | 初始化方法,在 UDTF 处理输入数据前,调用用户自定义的初始化行为。用户每执行一次 UDTF 查询,框架就会构造一个新的 UDF 类实例,该方法在每个 UDF 类实例被初始化时调用一次。在每一个 UDF 类实例的生命周期内,该方法只会被调用一次。 | 是 |
void transform(Row row, PointCollector collector) throws Exception | 这个方法由框架调用。当您在beforeStart 中选择以RowByRowAccessStrategy 的策略消费原始数据时,这个数据处理方法就会被调用。输入参数以Row 的形式传入,输出结果通过PointCollector 输出。您需要在该方法内自行调用collector 提供的数据收集方法,以决定最终的输出数据。 | 与下面的方法二选一 |
void transform(RowWindow rowWindow, PointCollector collector) throws Exception | 这个方法由框架调用。当您在beforeStart 中选择以SlidingSizeWindowAccessStrategy 或者SlidingTimeWindowAccessStrategy 的策略消费原始数据时,这个数据处理方法就会被调用。输入参数以RowWindow 的形式传入,输出结果通过PointCollector 输出。您需要在该方法内自行调用collector 提供的数据收集方法,以决定最终的输出数据。 | 与上面的方法二选一 |
void terminate(PointCollector collector) throws Exception | 这个方法由框架调用。该方法会在所有的transform 调用执行完成后,在beforeDestory 方法执行前被调用。在一个 UDF 查询过程中,该方法会且只会调用一次。您需要在该方法内自行调用collector 提供的数据收集方法,以决定最终的输出数据。 | 否 |
void beforeDestroy() | UDTF 的结束方法。此方法由框架调用,并且只会被调用一次,即在处理完最后一条记录之后被调用。 | 否 |
在一个完整的 UDTF 实例生命周期中,各个方法的调用顺序如下:
void validate(UDFParameterValidator validator) throws Exception
void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) throws Exception
void transform(Row row, PointCollector collector) throws Exception
或者void transform(RowWindow rowWindow, PointCollector collector) throws Exception
void terminate(PointCollector collector) throws Exception
void beforeDestroy()
注意,框架每执行一次 UDTF 查询,都会构造一个全新的 UDF 类实例,查询结束时,对应的 UDF 类实例即被销毁,因此不同 UDTF 查询(即使是在同一个 SQL 语句中)UDF 类实例内部的数据都是隔离的。您可以放心地在 UDTF 中维护一些状态数据,无需考虑并发对 UDF 类实例内部状态数据的影响。
下面将详细介绍各个接口的使用方法。
validate
方法能够对用户输入的参数进行验证。
您可以在该方法中限制输入序列的数量和类型,检查用户输入的属性或者进行自定义逻辑的验证。
UDFParameterValidator
的使用方法请见 Javadoc。
beforeStart
方法有两个作用:
1. 帮助用户解析 SQL 语句中的 UDF 参数 2. 配置 UDF 运行时必要的信息,即指定 UDF 访问原始数据时采取的策略和输出结果序列的类型 3. 创建资源,比如建立外部链接,打开文件等。
UDFParameters
的作用是解析 SQL 语句中的 UDF 参数(SQL 中 UDF 函数名称后括号中的部分)。参数包括序列类型参数和字符串 key-value 对形式输入的属性参数。
例子:
SELECT UDF(s1, s2, 'key1'='iotdb', 'key2'='123.45') FROM root.sg.d;
用法:
void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) throws Exception { String stringValue = parameters.getString("key1"); // iotdb Float floatValue = parameters.getFloat("key2"); // 123.45 Double doubleValue = parameters.getDouble("key3"); // null int intValue = parameters.getIntOrDefault("key4", 678); // 678 // do something // configurations // ... }
您必须使用 UDTFConfigurations
指定 UDF 访问原始数据时采取的策略和输出结果序列的类型。
用法:
void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) throws Exception { // parameters // ... // configurations configurations .setAccessStrategy(new RowByRowAccessStrategy()) .setOutputDataType(Type.INT32); }
其中setAccessStrategy
方法用于设定 UDF 访问原始数据时采取的策略,setOutputDataType
用于设定输出结果序列的类型。
注意,您在此处设定的原始数据访问策略决定了框架会调用哪一种transform
方法 ,请实现与原始数据访问策略对应的transform
方法。当然,您也可以根据UDFParameters
解析出来的属性参数,动态决定设定哪一种策略,因此,实现两种transform
方法也是被允许的。
下面是您可以设定的访问原始数据的策略:
接口定义 | 描述 | 调用的transform 方法 |
---|---|---|
RowByRowAccessStrategy | 逐行地处理原始数据。框架会为每一行原始数据输入调用一次transform 方法。当 UDF 只有一个输入序列时,一行输入就是该输入序列中的一个数据点。当 UDF 有多个输入序列时,一行输入序列对应的是这些输入序列按时间对齐后的结果(一行数据中,可能存在某一列为null 值,但不会全部都是null )。 | void transform(Row row, PointCollector collector) throws Exception |
SlidingTimeWindowAccessStrategy | 以滑动时间窗口的方式处理原始数据。框架会为每一个原始数据输入窗口调用一次transform 方法。一个窗口可能存在多行数据,每一行数据对应的是输入序列按时间对齐后的结果(一行数据中,可能存在某一列为null 值,但不会全部都是null )。 | void transform(RowWindow rowWindow, PointCollector collector) throws Exception |
SlidingSizeWindowAccessStrategy | 以固定行数的方式处理原始数据,即每个数据处理窗口都会包含固定行数的数据(最后一个窗口除外)。框架会为每一个原始数据输入窗口调用一次transform 方法。一个窗口可能存在多行数据,每一行数据对应的是输入序列按时间对齐后的结果(一行数据中,可能存在某一列为null 值,但不会全部都是null )。 | void transform(RowWindow rowWindow, PointCollector collector) throws Exception |
SessionTimeWindowAccessStrategy | 以会话窗口的方式处理原始数据,框架会为每一个原始数据输入窗口调用一次transform 方法。一个窗口可能存在多行数据,每一行数据对应的是输入序列按时间对齐后的结果(一行数据中,可能存在某一列为null 值,但不会全部都是null )。 | void transform(RowWindow rowWindow, PointCollector collector) throws Exception |
StateWindowAccessStrategy | 以状态窗口的方式处理原始数据,框架会为每一个原始数据输入窗口调用一次transform 方法。一个窗口可能存在多行数据。目前仅支持对一个物理量也就是一列数据进行开窗。 | void transform(RowWindow rowWindow, PointCollector collector) throws Exception |
RowByRowAccessStrategy
的构造不需要任何参数。
如图是SlidingTimeWindowAccessStrategy
的开窗示意图。
SlidingTimeWindowAccessStrategy
有多种构造方法,您可以向构造方法提供 3 类参数:
时间轴显示时间窗开始和结束时间不是必须要提供的。当您不提供这类参数时,时间轴显示时间窗开始时间会被定义为整个查询结果集中最小的时间戳,时间轴显示时间窗结束时间会被定义为整个查询结果集中最大的时间戳。
滑动步长参数也不是必须的。当您不提供滑动步长参数时,滑动步长会被设定为划分时间轴的时间间隔。
3 类参数的关系可见下图。策略的构造方法详见 Javadoc。
注意,最后的一些时间窗口的实际时间间隔可能小于规定的时间间隔参数。另外,可能存在某些时间窗口内数据行数量为 0 的情况,这种情况框架也会为该窗口调用一次transform
方法。
如图是SlidingSizeWindowAccessStrategy
的开窗示意图。
SlidingSizeWindowAccessStrategy
有多种构造方法,您可以向构造方法提供 2 个参数:
滑动步长参数不是必须的。当您不提供滑动步长参数时,滑动步长会被设定为窗口大小。
如图是SessionTimeWindowAccessStrategy
的开窗示意图。时间间隔小于等于给定的最小时间间隔 sessionGap 则分为一组。
SessionTimeWindowAccessStrategy
有多种构造方法,您可以向构造方法提供 2 类参数:
如图是StateWindowAccessStrategy
的开窗示意图。对于数值型数据,状态差值小于等于给定的阈值 delta 则分为一组。
StateWindowAccessStrategy
有四种构造方法。
StateWindowAccessStrategy 目前只能接收一列输入。策略的构造方法详见 Javadoc。
注意,您在此处设定的输出结果序列的类型,决定了transform
方法中PointCollector
实际能够接收的数据类型。setOutputDataType
中设定的输出类型和PointCollector
实际能够接收的数据输出类型关系如下:
setOutputDataType 中设定的输出类型 | PointCollector 实际能够接收的输出类型 |
---|---|
INT32 | int |
INT64 | long |
FLOAT | float |
DOUBLE | double |
BOOLEAN | boolean |
TEXT | java.lang.String 和 org.apache.iotdb.udf.api.type.Binary |
UDTF 输出序列的类型是运行时决定的。您可以根据输入序列类型动态决定输出序列类型。
下面是一个简单的例子:
void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) throws Exception { // do something // ... configurations .setAccessStrategy(new RowByRowAccessStrategy()) .setOutputDataType(parameters.getDataType(0)); }
当您在beforeStart
方法中指定 UDF 读取原始数据的策略为 RowByRowAccessStrategy
,您就需要实现该方法,在该方法中增加对原始数据处理的逻辑。
该方法每次处理原始数据的一行。原始数据由Row
读入,由PointCollector
输出。您可以选择在一次transform
方法调用中输出任意数量的数据点。需要注意的是,输出数据点的类型必须与您在beforeStart
方法中设置的一致,而输出数据点的时间戳必须是严格单调递增的。
下面是一个实现了void transform(Row row, PointCollector collector) throws Exception
方法的完整 UDF 示例。它是一个加法器,接收两列时间序列输入,当这两个数据点都不为null
时,输出这两个数据点的代数和。
import org.apache.iotdb.udf.api.UDTF; import org.apache.iotdb.udf.api.access.Row; import org.apache.iotdb.udf.api.collector.PointCollector; import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations; import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters; import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy; import org.apache.iotdb.udf.api.type.Type; public class Adder implements UDTF { @Override public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) { configurations .setOutputDataType(Type.INT64) .setAccessStrategy(new RowByRowAccessStrategy()); } @Override public void transform(Row row, PointCollector collector) throws Exception { if (row.isNull(0) || row.isNull(1)) { return; } collector.putLong(row.getTime(), row.getLong(0) + row.getLong(1)); } }
当您在beforeStart
方法中指定 UDF 读取原始数据的策略为 SlidingTimeWindowAccessStrategy
或者SlidingSizeWindowAccessStrategy
时,您就需要实现该方法,在该方法中增加对原始数据处理的逻辑。
该方法每次处理固定行数或者固定时间间隔内的一批数据,我们称包含这一批数据的容器为窗口。原始数据由RowWindow
读入,由PointCollector
输出。RowWindow
能够帮助您访问某一批次的Row
,它提供了对这一批次的Row
进行随机访问和迭代访问的接口。您可以选择在一次transform
方法调用中输出任意数量的数据点,需要注意的是,输出数据点的类型必须与您在beforeStart
方法中设置的一致,而输出数据点的时间戳必须是严格单调递增的。
下面是一个实现了void transform(RowWindow rowWindow, PointCollector collector) throws Exception
方法的完整 UDF 示例。它是一个计数器,接收任意列数的时间序列输入,作用是统计并输出指定时间范围内每一个时间窗口中的数据行数。
import java.io.IOException; import org.apache.iotdb.udf.api.UDTF; import org.apache.iotdb.udf.api.access.RowWindow; import org.apache.iotdb.udf.api.collector.PointCollector; import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations; import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters; import org.apache.iotdb.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy; import org.apache.iotdb.udf.api.type.Type; public class Counter implements UDTF { @Override public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) { configurations .setOutputDataType(Type.INT32) .setAccessStrategy(new SlidingTimeWindowAccessStrategy( parameters.getLong("time_interval"), parameters.getLong("sliding_step"), parameters.getLong("display_window_begin"), parameters.getLong("display_window_end"))); } @Override public void transform(RowWindow rowWindow, PointCollector collector) throws Exception { if (rowWindow.windowSize() != 0) { collector.putInt(rowWindow.windowStartTime(), rowWindow.windowSize()); } } }
在一些场景下,UDF 需要遍历完所有的原始数据后才能得到最后的输出结果。terminate
接口为这类 UDF 提供了支持。
该方法会在所有的transform
调用执行完成后,在beforeDestory
方法执行前被调用。您可以选择使用transform
方法进行单纯的数据处理,最后使用terminate
将处理结果输出。
结果需要由PointCollector
输出。您可以选择在一次terminate
方法调用中输出任意数量的数据点。需要注意的是,输出数据点的类型必须与您在beforeStart
方法中设置的一致,而输出数据点的时间戳必须是严格单调递增的。
下面是一个实现了void terminate(PointCollector collector) throws Exception
方法的完整 UDF 示例。它接收一个INT32
类型的时间序列输入,作用是输出该序列的最大值点。
import java.io.IOException; import org.apache.iotdb.udf.api.UDTF; import org.apache.iotdb.udf.api.access.Row; import org.apache.iotdb.udf.api.collector.PointCollector; import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations; import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters; import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy; import org.apache.iotdb.udf.api.type.Type; public class Max implements UDTF { private Long time; private int value; @Override public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) { configurations .setOutputDataType(TSDataType.INT32) .setAccessStrategy(new RowByRowAccessStrategy()); } @Override public void transform(Row row, PointCollector collector) { if (row.isNull(0)) { return; } int candidateValue = row.getInt(0); if (time == null || value < candidateValue) { time = row.getTime(); value = candidateValue; } } @Override public void terminate(PointCollector collector) throws IOException { if (time != null) { collector.putInt(time, value); } } }
UDTF 的结束方法,您可以在此方法中进行一些资源释放等的操作。
此方法由框架调用。对于一个 UDF 类实例而言,生命周期中会且只会被调用一次,即在处理完最后一条记录之后被调用。
一个完整的 UDAF 定义涉及到 State 和 UDAF 两个类。
编写一个 State 类需要实现org.apache.iotdb.udf.api.State
接口,下表是需要实现的方法说明。
接口定义 | 描述 | 是否必须 |
---|---|---|
void reset() | 将 State 对象重置为初始的状态,您需要像编写构造函数一样,在该方法内填入 State 类中各个字段的初始值。 | 是 |
byte[] serialize() | 将 State 序列化为二进制数据。该方法用于 IoTDB 内部的 State 对象传递,注意序列化的顺序必须和下面的反序列化方法一致。 | 是 |
void deserialize(byte[] bytes) | 将二进制数据反序列化为 State 。该方法用于 IoTDB 内部的 State 对象传递,注意反序列化的顺序必须和上面的序列化方法一致。 | 是 |
下面将详细介绍各个接口的使用方法。
该方法的作用是将 State
重置为初始的状态,您需要在该方法内填写 State
对象中各个字段的初始值。出于优化上的考量,IoTDB 在内部会尽可能地复用 State
,而不是为每一个组创建一个新的 State
,这样会引入不必要的开销。当 State
更新完一个组中的数据之后,就会调用这个方法重置为初始状态,以此来处理下一个组。
以求平均数(也就是 avg
)的 State
为例,您需要数据的总和 sum
与数据的条数 count
,并在 reset()
方法中将二者初始化为 0。
class AvgState implements State { double sum; long count; @Override public void reset() { sum = 0; count = 0; } // other methods }
该方法的作用是将 State 序列化为二进制数据,和从二进制数据中反序列化出 State。IoTDB 作为分布式数据库,涉及到在不同节点中传递数据,因此您需要编写这两个方法,来实现 State 在不同节点中的传递。注意序列化和反序列的顺序必须一致。
还是以求平均数(也就是求 avg)的 State 为例,您可以通过任意途径将 State 的内容转化为 byte[]
数组,以及从 byte[]
数组中读取出 State 的内容,下面展示的是用 Java8 引入的 ByteBuffer
进行序列化/反序列的代码:
@Override public byte[] serialize() { ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES + Long.BYTES); buffer.putDouble(sum); buffer.putLong(count); return buffer.array(); } @Override public void deserialize(byte[] bytes) { ByteBuffer buffer = ByteBuffer.wrap(bytes); sum = buffer.getDouble(); count = buffer.getLong(); }
编写一个 UDAF 类需要实现org.apache.iotdb.udf.api.UDAF
接口,下表是需要实现的方法说明。
接口定义 | 描述 | 是否必须 |
---|---|---|
void validate(UDFParameterValidator validator) throws Exception | 在初始化方法beforeStart 调用前执行,用于检测UDFParameters 中用户输入的参数是否合法。该方法与 UDTF 的validate 相同。 | 否 |
void beforeStart(UDFParameters parameters, UDAFConfigurations configurations) throws Exception | 初始化方法,在 UDAF 处理输入数据前,调用用户自定义的初始化行为。与 UDTF 不同的是,这里的 configuration 是 UDAFConfiguration 类型。 | 是 |
State createState() | 创建State 对象,一般只需要调用默认构造函数,然后按需修改默认的初始值即可。 | 是 |
void addInput(State state, Column[] columns, BitMap bitMap) | 根据传入的数据Column[] 批量地更新State 对象,注意 column[0] 总是代表时间列。另外BitMap 表示之前已经被过滤掉的数据,您在编写该方法时需要手动判断对应的数据是否被过滤掉。 | 是 |
void combineState(State state, State rhs) | 将rhs 状态合并至state 状态中。在分布式场景下,同一组的数据可能分布在不同节点上,IoTDB 会为每个节点上的部分数据生成一个State 对象,然后调用该方法合并成完整的State 。 | 是 |
void outputFinal(State state, ResultValue resultValue) | 根据State 中的数据,计算出最终的聚合结果。注意根据聚合的语义,每一组只能输出一个值。 | 是 |
void beforeDestroy() | UDAF 的结束方法。此方法由框架调用,并且只会被调用一次,即在处理完最后一条记录之后被调用。 | 否 |
在一个完整的 UDAF 实例生命周期中,各个方法的调用顺序如下:
State createState()
void validate(UDFParameterValidator validator) throws Exception
void beforeStart(UDFParameters parameters, UDAFConfigurations configurations) throws Exception
void addInput(State state, Column[] columns, BitMap bitMap)
void combineState(State state, State rhs)
void outputFinal(State state, ResultValue resultValue)
void beforeDestroy()
和 UDTF 类似,框架每执行一次 UDAF 查询,都会构造一个全新的 UDF 类实例,查询结束时,对应的 UDF 类实例即被销毁,因此不同 UDAF 查询(即使是在同一个 SQL 语句中)UDF 类实例内部的数据都是隔离的。您可以放心地在 UDAF 中维护一些状态数据,无需考虑并发对 UDF 类实例内部状态数据的影响。
下面将详细介绍各个接口的使用方法。
同 UDTF, validate
方法能够对用户输入的参数进行验证。
您可以在该方法中限制输入序列的数量和类型,检查用户输入的属性或者进行自定义逻辑的验证。
beforeStart
方法的作用 UDAF 相同:
1. 帮助用户解析 SQL 语句中的 UDF 参数 2. 配置 UDF 运行时必要的信息,即指定 UDF 访问原始数据时采取的策略和输出结果序列的类型 3. 创建资源,比如建立外部链接,打开文件等。
其中,UDFParameters
类型的作用可以参照上文。
和 UDTF 的区别在于,UDAF 使用了 UDAFConfigurations
作为 configuration
对象的类型。
目前,该类仅支持设置输出数据的类型。
void beforeStart(UDFParameters parameters, UDAFConfigurations configurations) throws Exception { // parameters // ... // configurations configurations .setOutputDataType(Type.INT32); }
setOutputDataType
中设定的输出类型和 ResultValue
实际能够接收的数据输出类型关系如下:
setOutputDataType 中设定的输出类型 | ResultValue 实际能够接收的输出类型 |
---|---|
INT32 | int |
INT64 | long |
FLOAT | float |
DOUBLE | double |
BOOLEAN | boolean |
TEXT | org.apache.iotdb.udf.api.type.Binary |
UDAF 输出序列的类型也是运行时决定的。您可以根据输入序列类型动态决定输出序列类型。
下面是一个简单的例子:
void beforeStart(UDFParameters parameters, UDAFConfigurations configurations) throws Exception { // do something // ... configurations .setOutputDataType(parameters.getDataType(0)); }
为 UDAF 创建并初始化 State
。由于 Java 语言本身的限制,您只能调用 State
类的默认构造函数。默认构造函数会为类中所有的字段赋一个默认的初始值,如果该初始值并不符合您的要求,您需要在这个方法内进行手动的初始化。
下面是一个包含手动初始化的例子。假设您要实现一个累乘的聚合函数,State
的初始值应该设置为 1,但是默认构造函数会初始化为 0,因此您需要在调用默认构造函数之后,手动对 State
进行初始化:
public State createState() { MultiplyState state = new MultiplyState(); state.result = 1; return state; }
该方法的作用是,通过原始的输入数据来更新 State
对象。出于性能上的考量,也是为了和 IoTDB 向量化的查询引擎相对齐,原始的输入数据不再是一个数据点,而是列的数组 Column[]
。注意第一列(也就是 column[0]
)总是时间列,因此您也可以在 UDAF 中根据时间进行不同的操作。
由于输入参数的类型不是一个数据点,而是多个列,您需要手动对列中的部分数据进行过滤处理,这就是第三个参数 BitMap
存在的意义。它用来标识这些列中哪些数据被过滤掉了,您在任何情况下都无需考虑被过滤掉的数据。
下面是一个用于统计数据条数(也就是 count)的 addInput()
示例。它展示了您应该如何使用 BitMap
来忽视那些已经被过滤掉的数据。注意还是由于 Java 语言本身的限制,您需要在方法的开头将接口中定义的 State
类型强制转化为自定义的 State
类型,不然后续无法正常使用该 State
对象。
public void addInput(State state, Column[] column, BitMap bitMap) { CountState countState = (CountState) state; int count = column[0].getPositionCount(); for (int i = 0; i < count; i++) { if (bitMap != null && !bitMap.isMarked(i)) { continue; } if (!column[1].isNull(i)) { countState.count++; } } }
该方法的作用是合并两个 State
,更加准确的说,是用第二个 State
对象来更新第一个 State
对象。IoTDB 是分布式数据库,同一组的数据可能分布在多个不同的节点上。出于性能考虑,IoTDB 会为每个节点上的部分数据先进行聚合成 State
,然后再将不同节点上的、属于同一个组的 State
进行合并,这就是 combineState
的作用。
下面是一个用于求平均数(也就是 avg)的 combineState()
示例。和 addInput
类似,您都需要在开头对两个 State
进行强制类型转换。另外需要注意是用第二个 State
的内容来更新第一个 State
的值。
public void combineState(State state, State rhs) { AvgState avgState = (AvgState) state; AvgState avgRhs = (AvgState) rhs; avgState.count += avgRhs.count; avgState.sum += avgRhs.sum; }
该方法的作用是从 State
中计算出最终的结果。您需要访问 State
中的各个字段,求出最终的结果,并将最终的结果设置到 ResultValue
对象中。IoTDB 内部会为每个组在最后调用一次这个方法。注意根据聚合的语义,最终的结果只能是一个值。
下面还是一个用于求平均数(也就是 avg)的 outputFinal
示例。除了开头的强制类型转换之外,您还将看到 ResultValue
对象的具体用法,即通过 setXXX
(其中 XXX
是类型名)来设置最后的结果。
public void outputFinal(State state, ResultValue resultValue) { AvgState avgState = (AvgState) state; if (avgState.count != 0) { resultValue.setDouble(avgState.sum / avgState.count); } else { resultValue.setNull(); } }
UDAF 的结束方法,您可以在此方法中进行一些资源释放等的操作。
此方法由框架调用。对于一个 UDF 类实例而言,生命周期中会且只会被调用一次,即在处理完最后一条记录之后被调用。
如果您使用 Maven,可以参考我们编写的示例项目udf-example。您可以在 这里 找到它。
注册一个 UDF 可以按如下流程进行:
org.apache.iotdb.udf.UDTFExample
CREATE FUNCTION <UDF-NAME> AS <UDF-CLASS-FULL-PATHNAME> (USING URI URI-STRING)?
example
的 UDF,以下两种注册方式任选其一即可准备工作:
使用该种方式注册时,您需要提前将 JAR 包放置到目录 iotdb-server-1.X.X-all-bin/ext/udf
(该目录可配置) 下。
注意,如果您使用的是集群,那么需要将 JAR 包放置到所有 DataNode 的该目录下
注册语句:
CREATE FUNCTION example AS 'org.apache.iotdb.udf.UDTFExample'
准备工作:
使用该种方式注册时,您需要提前将 JAR 包上传到 URI 服务器上并确保执行注册语句的 IoTDB 实例能够访问该 URI 服务器。
注意,您无需手动放置 JAR 包,IoTDB 会下载 JAR 包并正确同步到整个集群
注册语句:
CREATE FUNCTION example AS 'org.apache.iotdb.udf.UDTFExample' USING URI 'http://jar/example.jar'
由于 IoTDB 的 UDF 是通过反射技术动态装载的,因此您在装载过程中无需启停服务器。
UDF 函数名称是大小写不敏感的。
请不要给 UDF 函数注册一个内置函数的名字。使用内置函数的名字给 UDF 注册会失败。
不同的 JAR 包中最好不要有全类名相同但实现功能逻辑不一样的类。例如 UDF(UDAF/UDTF):udf1
、udf2
分别对应资源udf1.jar
、udf2.jar
。如果两个 JAR 包里都包含一个org.apache.iotdb.udf.UDTFExample
类,当同一个 SQL 中同时使用到这两个 UDF 时,系统会随机加载其中一个类,导致 UDF 执行行为不一致。
卸载 UDF 的 SQL 语法如下:
DROP FUNCTION <UDF-NAME>
可以通过如下 SQL 语句卸载上面例子中的 UDF:
DROP FUNCTION example
UDF 的使用方法与普通内建函数的类似。
SLIMIT
/ SOFFSET
LIMIT
/ OFFSET
假定现在有时间序列 root.sg.d1.s1
和 root.sg.d1.s2
。
SELECT example(*) from root.sg.d1
那么结果集中将包括example(root.sg.d1.s1)
和example(root.sg.d1.s2)
的结果。
SELECT example(s1, *) from root.sg.d1
那么结果集中将包括example(root.sg.d1.s1, root.sg.d1.s1)
和example(root.sg.d1.s1, root.sg.d1.s2)
的结果。
SELECT example(*, *) from root.sg.d1
那么结果集中将包括example(root.sg.d1.s1, root.sg.d1.s1)
,example(root.sg.d1.s2, root.sg.d1.s1)
,example(root.sg.d1.s1, root.sg.d1.s2)
和 example(root.sg.d1.s2, root.sg.d1.s2)
的结果。
您可以在进行 UDF 查询的时候,向 UDF 传入任意数量的键值对参数。键值对中的键和值都需要被单引号或者双引号引起来。注意,键值对参数只能在所有时间序列后传入。下面是一组例子:
SELECT example(s1, 'key1'='value1', 'key2'='value2'), example(*, 'key3'='value3') FROM root.sg.d1; SELECT example(s1, s2, 'key1'='value1', 'key2'='value2') FROM root.sg.d1;
SELECT s1, s2, example(s1, s2) FROM root.sg.d1; SELECT *, example(*) FROM root.sg.d1 DISABLE ALIGN; SELECT s1 * example(* / s1 + s2) FROM root.sg.d1; SELECT s1, s2, s1 + example(s1, s2), s1 - example(s1 + example(s1, s2) / s2) FROM root.sg.d1;
SHOW FUNCTIONS
用户在使用 UDF 时会涉及到 1 种权限:USE_UDF
更多用户权限相关的内容,请参考 权限管理语句。
使用配置项 udf_lib_dir
来配置 udf 的存储目录.
在 SQL 语句中使用自定义函数时,可能提示内存不足。这种情况下,您可以通过更改配置文件iotdb-system.properties
中的udf_initial_byte_array_length_for_memory_control
,udf_memory_budget_in_mb
和udf_reader_transformer_collector_memory_proportion
并重启服务来解决此问题。
该部分主要讲述了外部用户如何将自己编写的 UDF 贡献给 IoTDB 社区。
UDF 具有通用性。
通用性主要指的是:UDF 在某些业务场景下,可以被广泛使用。换言之,就是 UDF 具有复用价值,可被社区内其他用户直接使用。
如果您不确定自己写的 UDF 是否具有通用性,可以发邮件到 dev@iotdb.apache.org
或直接创建 ISSUE 发起讨论。
UDF 已经完成测试,且能够正常运行在用户的生产环境中。
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin
中创建 UDF 主类和相关的辅助类。iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinTimeSeriesGeneratingFunction.java
中注册您编写的 UDF。您至少需要为您贡献的 UDF 编写集成测试。
您可以在integration-test/src/test/java/org/apache/iotdb/db/it/udf
中为您贡献的 UDF 新增一个测试类进行测试。
使用说明需要包含:UDF 的名称、UDF 的作用、执行函数必须的属性参数、函数的适用的场景以及使用示例等。
使用说明需包含中英文两个版本。应分别在 docs/zh/UserGuide/Operation Manual/DML Data Manipulation Language.md
和 docs/UserGuide/Operation Manual/DML Data Manipulation Language.md
中新增使用说明。
当您准备好源代码、测试用例和使用说明后,就可以将 UDF 贡献到 IoTDB 社区了。在 Github 上面提交 Pull Request (PR) 即可。具体提交方式见:贡献指南。
当 PR 评审通过并被合并后,您的 UDF 就已经贡献给 IoTDB 社区了!
对基于时序数据的应用而言,数据质量至关重要。基于用户自定义函数能力,IoTDB 提供了一系列关于数据质量的函数,包括数据画像、数据质量评估与修复等,能够满足工业领域对数据质量的需求。
该函数库中的函数不是内置函数,使用前要先加载到系统中。 操作流程如下:
mvn clean package -pl library-udf -am -DskipTests -Pget-jar-with-dependencies
ext\udf
目录下(若您使用的是集群,请将jar包复制到所有DataNode的该目录下),如下图所示; sbin
目录下,修改脚本中的参数(默认为host=127.0.0.1,rpcPort=6667,user=root,pass=root);register-UDF.sh
以注册 UDF。Q1: 如何修改已经注册的 UDF?
A1: 假设 UDF 的名称为example
,全类名为org.apache.iotdb.udf.UDTFExample
,由example.jar
引入
example
函数,执行DROP FUNCTION example
iotdb-server-1.0.0-all-bin/ext/udf
目录下的example.jar
org.apache.iotdb.udf.UDTFExample
中的逻辑,重新打包,JAR 包的名字可以仍然为example.jar
iotdb-server-1.0.0-all-bin/ext/udf
目录下CREATE FUNCTION example AS "org.apache.iotdb.udf.UDTFExample"