触发器提供了一种侦听序列数据变动的机制。配合用户自定义逻辑,可完成告警、数据清洗、数据转发等功能。
触发器基于 Java 反射机制实现。用户通过简单实现 Java 接口,即可实现数据侦听。IoTDB 允许用户动态装载、卸载触发器,在装载、卸载期间,无需启停服务器。
根据此文档,您将会很快学会触发器的编写与管理。
触发器的逻辑需要您编写 Java 类进行实现。
在编写触发器逻辑时,需要使用到下面展示的依赖。如果您使用 Maven,则可以直接从 Maven 库 中搜索到它们。
<dependency> <groupId>org.apache.iotdb</groupId> <artifactId>iotdb-server</artifactId> <version>0.14.0-SNAPSHOT</version> <scope>provided</scope> </dependency>
请注意选择和目标服务器版本相同的依赖版本。
编写一个触发器需要实现org.apache.iotdb.db.engine.trigger.api.Trigger类。
该类提供了两类编程接口:生命周期钩子和数据变动侦听钩子。该类中所有的接口都不是必须实现的,当您不实现它们时,它们不会对流经的数据操作产生任何响应。您可以根据实际需要,只实现其中若干接口。
下面是所有可供用户进行实现的接口的说明。
| 接口定义 | 描述 |
|---|---|
void onCreate(TriggerAttributes attributes) throws Exception | 当您使用CREATE TRIGGER语句注册触发器后,该钩子会被调用一次。在每一个实例的生命周期内,该钩子会且仅仅会被调用一次。该钩子主要有如下作用:1. 帮助用户解析 SQL 语句中的自定义属性(使用TriggerAttributes)。 2. 创建或申请资源,如建立外部链接、打开文件等。 |
void onDrop() throws Exception | 当您使用DROP TRIGGER语句删除触发器后,该钩子会被调用。在每一个实例的生命周期内,该钩子会且仅仅会被调用一次。该钩子的主要作用是进行一些资源释放等的操作。 |
void onStart() throws Exception | 当您使用START TRIGGER语句手动启动(被STOP TRIGGER语句停止的)触发器后,该钩子会被调用。 |
void onStop() throws Exception | 当您使用STOP TRIGGER语句手动停止触发器后,该钩子会被调用。 |
目前触发器仅能侦听数据插入的操作。
数据变动侦听钩子的调用时机由CREATE TRIGGER语句显式指定,在编程接口层面不作区分。
Integer fire(long timestamp, Integer value) throws Exception; Long fire(long timestamp, Long value) throws Exception; Float fire(long timestamp, Float value) throws Exception; Double fire(long timestamp, Double value) throws Exception; Boolean fire(long timestamp, Boolean value) throws Exception; Binary fire(long timestamp, Binary value) throws Exception;
对于注册序列上的每一点数据插入,触发器都会调用fire作为响应,钩子的入参timestamp和value即是这一次插入数据点的时间和数据值。您可以在fire钩子中编写处理数据的任意逻辑。
注意,目前钩子的返回值是没有任何意义的。
int[] fire(long[] timestamps, int[] values) throws Exception; long[] fire(long[] timestamps, long[] values) throws Exception; float[] fire(long[] timestamps, float[] values) throws Exception; double[] fire(long[] timestamps, double[] values) throws Exception; boolean[] fire(long[] timestamps, boolean[] values) throws Exception; Binary[] fire(long[] timestamps, Binary[] values) throws Exception;
如果您需要在业务场景中使用到 Session API 的insertTablet接口或insertTablets接口,那么您可以通过实现上述数据插入的侦听钩子来降低触发器的调用开销。
推荐您在实现上述批量数据插入的侦听钩子时, 保证批量数据插入侦听钩子与单点数据插入侦听钩子的行为具有一致性。当您不实现批量数据插入的侦听钩子时,它将遵循下面的默认逻辑。
default int[] fire(long[] timestamps, int[] values) throws Exception { int size = timestamps.length; for (int i = 0; i < size; ++i) { fire(timestamps[i], values[i]); } return values; }
注意,目前钩子的返回值是没有任何意义的。
您可以通过 SQL 语句注册、卸载、启动或停止一个触发器实例,您也可以通过 SQL 语句查询到所有已经注册的触发器。
触发器有两种运行状态:STARTED和STOPPED,您需要执行START TRIGGER或者STOP TRIGGER来启动或者停止一个触发器。
当一个触发器的状态为STOPPED时,它将不会响应被注册序列上的操作(如插入数据点的操作),对外表现就会像是这个序列没有被注册过触发器一样,但是它会保存所有的状态(触发器类变量)信息,同时也会保存所有的注册信息。
注意,通过CREATE TRIGGER语句注册的触发器默认是STARTED的。
触发器只能注册在一个已经存在的时间序列上。任何时间序列只允许注册一个触发器。
被注册有触发器的序列将会被触发器侦听,当序列上有数据变动时,触发器中对应的钩子将会被调用。
注册一个触发器可以按如下流程进行:
实现一个完整的 Trigger 类,假定这个类的全类名为org.apache.iotdb.db.engine.trigger.example.AlertListener
将项目打成 JAR 包,如果您使用 Maven 管理项目,可以参考上述 Maven 项目示例的写法
将 JAR 包放置到目录 iotdb-server-0.14.0-SNAPSHOT/ext/trigger (也可以是iotdb-server-0.14.0-SNAPSHOT/ext/trigger的子目录)下。
您可以通过修改配置文件中的
trigger_root_dir来指定加载触发器 JAR 包的根路径。
使用 SQL 语句注册该触发器,假定赋予该触发器的名字为alert-listener-sg1d1s1
使用CREATE TRIGGER语句注册该触发器
CREATE TRIGGER `alert-listener-sg1d1s1` AFTER INSERT ON root.sg1.d1.s1 AS 'org.apache.iotdb.db.engine.trigger.example.AlertListener' WITH ( 'lo' = '0', 'hi' = '100.0' )
注册触发器的详细 SQL 语法如下:
CREATE TRIGGER <TRIGGER-NAME> (BEFORE | AFTER) INSERT ON <FULL-PATH> AS <CLASSNAME>
同时,您还可以通过WITH子句传入任意数量的自定义属性值:
CREATE TRIGGER <TRIGGER-NAME> (BEFORE | AFTER) INSERT ON <FULL-PATH> AS <CLASSNAME> WITH ( <KEY-1>=<VALUE-1>, <KEY-2>=<VALUE-2>, ... )
TRIGGER-NAME是用于标定触发器的全局唯一 ID,它是大小写敏感的。
目前触发器可以侦听序列上的所有的数据插入操作,触发器可以选择在数据插入前(BEFORE INSERT)或者数据插入后(AFTER INSERT)触发钩子调用。
FULL-PATH 一个目标序列路径如root.sg1.d1.s1,或者是一个设备路径如root.sg1.d1,或者是一个存储组路径如root.sg1,抑或是一个业务语义节点路径如root.sg1.x。
CLASSNAME是触发器类的全类名。
请注意:
CLASSNAME以及属性值中的KEY和VALUE都需要被单引号或者双引号引用起来。FULL-PATH只支持注册一个触发器。触发器会在下面几种情景下被卸载:
DELETE TIMESERIES时,序列上注册的触发器会被卸载DELETE STORAGE GROUP时,对应存储组下注册的触发器会全部被卸载DROP TRIGGER语句主动卸载卸载触发器的 SQL 语法如下:
DROP TRIGGER <TRIGGER-NAME>
TRIGGER-NAME是用于标定触发器的全局唯一 ID。
下面是一个DROP TRIGGER语句的例子:
DROP TRIGGER `alert-listener-sg1d1s1`
该操作是“停止触发器”的逆操作。它将运行状态为STOPPED的触发器的运行状态变更为STARTED,这会使得触发器重新侦听被注册序列上的操作,并对数据变动产生响应。
启动触发器的 SQL 语法如下:
START TRIGGER <TRIGGER-NAME>
TRIGGER-NAME是用于标定触发器的全局唯一 ID。
下面是一个START TRIGGER语句的例子:
START TRIGGER `alert-listener-sg1d1s1`
注意,通过CREATE TRIGGER语句注册的触发器默认是STARTED的。
该操作将触发器的状态由STARTED变为STOPPED。当一个触发器的状态为STOPPED时,它将不会响应被注册序列上的操作(如插入数据点的操作),对外表现就会像是这个序列没有被注册过触发器一样。您可以使用START TRIGGER语句重新启动一个触发器。
停止触发器的 SQL 语法如下:
STOP TRIGGER <TRIGGER-NAME>
TRIGGER-NAME是用于标定触发器的全局唯一 ID。
下面是一个STOP TRIGGER语句的例子:
STOP TRIGGER `alert-listener-sg1d1s1`
查询触发器的 SQL 语句如下:
SHOW TRIGGERS
该语句展示已注册触发器的 ID、运行状态、触发时机、被注册的序列、触发器实例的全类名和注册触发器时用到的自定义属性。
用户在使用触发器时会涉及到 4 种权限:
CREATE_TRIGGER:具备该权限的用户才被允许注册触发器操作。该权限需要与触发器的路径绑定。DROP_TRIGGER:具备该权限的用户才被允许卸载触发器操作。该权限需要与触发器的路径绑定。START_TRIGGER:具备该权限的用户才被允许启动已被停止的触发器。该权限需要与触发器的路径绑定。STOP_TRIGGER:具备该权限的用户才被允许停止正在运行的触发器。该权限需要与触发器的路径绑定。更多用户权限相关的内容,请参考 权限管理语句。
实用工具类为常见的需求提供了编程范式和执行框架,它能够简化您编写触发器的一部分工作。
窗口工具类能够辅助您定义滑动窗口以及窗口上的数据处理逻辑。它能够构造两类滑动窗口:一种滑动窗口是固定窗口内时间长度的(SlidingTimeWindowEvaluationHandler),另一种滑动窗口是固定窗口内数据点数的(SlidingSizeWindowEvaluationHandler)。
窗口工具类允许您在窗口(Window)上定义侦听钩子(Evaluator)。每当一个新的窗口形成,您定义的侦听钩子就会被调用一次。您可以在这个侦听钩子内定义任何数据处理相关的逻辑。侦听钩子的调用是异步的,因此,在执行钩子内窗口处理逻辑的时候,是不会阻塞当前线程的。
值得注意的是,不论是SlidingTimeWindowEvaluationHandler还是SlidingSizeWindowEvaluationHandler,他们都只能够处理时间戳严格单调递增的序列,传入的不符合要求的数据点会被工具类抛弃。
Window与Evaluator接口的定义见org.apache.iotdb.db.utils.windowing.api包。
共两种构造方法。
第一种方法需要您提供窗口接受数据点的类型、窗口大小、滑动步长和一个侦听钩子(Evaluator)。
final TSDataType dataType = TSDataType.INT32; final int windowSize = 10; final int slidingStep = 5; SlidingSizeWindowEvaluationHandler handler = new SlidingSizeWindowEvaluationHandler( new SlidingSizeWindowConfiguration(dataType, windowSize, slidingStep), window -> { // do something });
第二种方法需要您提供窗口接受数据点的类型、窗口大小和一个侦听钩子(Evaluator)。这种构造方法下的窗口滑动步长等于窗口大小。
final TSDataType dataType = TSDataType.INT32; final int windowSize = 10; SlidingSizeWindowEvaluationHandler handler = new SlidingSizeWindowEvaluationHandler( new SlidingSizeWindowConfiguration(dataType, windowSize), window -> { // do something });
窗口大小、滑动步长必须为正数。
final long timestamp = 0; final int value = 0; hander.collect(timestamp, value);
注意,collect方法接受的第二个参数类型需要与构造时传入的dataType声明一致。
此外,collect方法只会对时间戳是单调递增的数据点产生响应。如果某一次collect方法采集到的数据点的时间戳小于等于上一次collect方法采集到的数据点时间戳,那么这一次采集的数据点将会被抛弃。
还需要注意的是,collect方法不是线程安全的。
共两种构造方法。
第一种方法需要您提供窗口接受数据点的类型、窗口内时间长度、滑动步长和一个侦听钩子(Evaluator)。
final TSDataType dataType = TSDataType.INT32; final long timeInterval = 1000; final long slidingStep = 500; SlidingTimeWindowEvaluationHandler handler = new SlidingTimeWindowEvaluationHandler( new SlidingTimeWindowConfiguration(dataType, timeInterval, slidingStep), window -> { // do something });
第二种方法需要您提供窗口接受数据点的类型、窗口内时间长度和一个侦听钩子(Evaluator)。这种构造方法下的窗口滑动步长等于窗口内时间长度。
final TSDataType dataType = TSDataType.INT32; final long timeInterval = 1000; SlidingTimeWindowEvaluationHandler handler = new SlidingTimeWindowEvaluationHandler( new SlidingTimeWindowConfiguration(dataType, timeInterval), window -> { // do something });
窗口内时间长度、滑动步长必须为正数。
final long timestamp = 0; final int value = 0; hander.collect(timestamp, value);
注意,collect方法接受的第二个参数类型需要与构造时传入的dataType声明一致。
此外,collect方法只会对时间戳是单调递增的数据点产生响应。如果某一次collect方法采集到的数据点的时间戳小于等于上一次collect方法采集到的数据点时间戳,那么这一次采集的数据点将会被抛弃。
还需要注意的是,collect方法不是线程安全的。
窗口计算的任务执行是异步的。
当异步任务无法被执行线程池及时消费时,会产生任务堆积。在极端情况下,异步任务的堆积会导致系统 OOM。因此,窗口计算线程池允许堆积的任务数量被设定为有限值。
当堆积的任务数量超出限值时,新提交的任务将无法进入线程池执行,此时,系统会调用您在侦听钩子(Evaluator)中制定的拒绝策略钩子onRejection进行处理。
onRejection的默认行为如下。
default void onRejection(Window window) { throw new RejectedExecutionException(); }
制定拒绝策略钩子的方式如下。
SlidingTimeWindowEvaluationHandler handler = new SlidingTimeWindowEvaluationHandler( new SlidingTimeWindowConfiguration(TSDataType.INT32, 1, 1), new Evaluator() { @Override public void evaluate(Window window) { // do something } @Override public void onRejection(Window window) { // do something } });
窗口计算线程池的默认线程数。默认为 CPU 核数。
最多允许堆积的窗口计算任务。默认为 64 个。
Sink 工具类为触发器提供了连接外部系统的能力。
它提供了一套编程范式。每一个 Sink 工具都包含一个用于处理数据发送的Handler、一个用于配置Handler的Configuration,还有一个用于描述发送数据的Event。
LocalIoTDBSink用于向本地序列写入数据点。
在写入数据前,不要求时间序列已被创建。
注意,在触发器场景中,侦听的时间序列和写入的目标时间序列不要在同一个存储组下。
使用示例:
final String device = "root.alerting"; final String[] measurements = new String[] {"local"}; final TSDataType[] dataTypes = new TSDataType[] {TSDataType.DOUBLE}; LocalIoTDBHandler localIoTDBHandler = new LocalIoTDBHandler(); localIoTDBHandler.open(new LocalIoTDBConfiguration(device, measurements, dataTypes)); // insert 100 data points for (int i = 0; i < 100; ++i) { final long timestamp = i; final double value = i; localIoTDBHandler.onEvent(new LocalIoTDBEvent(timestamp, value)); }
注意,当您需要向某个TEXT类型的序列写入数据时,您需要借助org.apache.iotdb.tsfile.utils.Binary:
// insert 100 data points for (int i = 0; i < 100; ++i) { final long timestamp = i; final String value = "" + i; localIoTDBHandler.onEvent(new LocalIoTDBEvent(timestamp, Binary.valueOf(value))); }
触发器可以使用MQTTSink向其他的 IoTDB 实例发送数据点。
在发送数据前,不要求时间序列已被创建。
使用示例:
final String host = "127.0.0.1"; final int port = 1883; final String username = "root"; final String password = "root"; final PartialPath device = new PartialPath("root.alerting"); final String[] measurements = new String[] {"remote"}; MQTTHandler mqttHandler = new MQTTHandler(); mqttHandler.open(new MQTTConfiguration(host, port, username, password, device, measurements)); final String topic = "test"; final QoS qos = QoS.EXACTLY_ONCE; final boolean retain = false; // send 100 data points for (int i = 0; i < 100; ++i) { final long timestamp = i; final double value = i; mqttHandler.onEvent(new MQTTEvent(topic, qos, retain, timestamp, value)); }
触发器可以使用AlertManagerSink 向 AlertManager 发送消息。
AlertManagerConfiguration 的构造需传入 AlertManager 的发送告警的 endpoint。
AlertManagerConfiguration(String endpoint);
AlertManagerEvent 提供三种构造函数:
AlertManagerEvent(String alertname); AlertManagerEvent(String alertname, Map<String, String> extraLabels); AlertManagerEvent(String alertname, Map<String, String> extraLabels, Map<String, String> annotations);
其中:
alertname 是必传参数,用于标识一个 alert,alertname 字段可用于 AlertManager 发送告警时的分组和消重。extraLabels 可选传,在后台与 alertname 组合成 labels 一起标识一个 alert,可用于 AlertManager 发送告警时的分组和消重。annotations 可选传,它的 value 值可使用 Go 语言模板风格的{{.<label_key>}}
它在最终生成消息时会被替换为 labels[<label_key>]。labels 和 annotations 会被解析成 json 字符串发送给 AlertManager:{ "labels": { "alertname": "<requiredAlertName>", "<labelname>": "<labelvalue>", ... }, "annotations": { "<labelname>": "<labelvalue>", ... } }
调用 AlertManagerHandler 的 onEvent(AlertManagerEvent event) 方法发送一个告警。
使用示例 1:
只传 alertname。
AlertManagerHandler alertManagerHandler = new AlertManagerHandler(); alertManagerHandler.open(new AlertManagerConfiguration("http://127.0.0.1:9093/api/v1/alerts")); final String alertName = "test0"; AlertManagerEvent alertManagerEvent = new AlertManagerEvent(alertName); alertManagerHandler.onEvent(alertManagerEvent);
使用示例 2:
传入 alertname 和 extraLabels。
AlertManagerHandler alertManagerHandler = new AlertManagerHandler(); alertManagerHandler.open(new AlertManagerConfiguration("http://127.0.0.1:9093/api/v1/alerts")); final String alertName = "test1"; final HashMap<String, String> extraLabels = new HashMap<>(); extraLabels.put("severity", "critical"); extraLabels.put("series", "root.ln.wt01.wf01.temperature"); extraLabels.put("value", String.valueOf(100.0)); AlertManagerEvent alertManagerEvent = new AlertManagerEvent(alertName, extraLabels); alertManagerHandler.onEvent(alertManagerEvent);
使用示例 3:
传入 alertname, extraLabels 和 annotations 。
最终 description 字段的值会被解析为 test2: root.ln.wt01.wf01.temperature is 100.0。
AlertManagerHandler alertManagerHandler = new AlertManagerHandler(); alertManagerHandler.open(new AlertManagerConfiguration("http://127.0.0.1:9093/api/v1/alerts")); final String alertName = "test2"; final HashMap<String, String> extraLabels = new HashMap<>(); extraLabels.put("severity", "critical"); extraLabels.put("series", "root.ln.wt01.wf01.temperature"); extraLabels.put("value", String.valueOf(100.0)); final HashMap<String, String> annotations = new HashMap<>(); annotations.put("summary", "high temperature"); annotations.put("description", "{{.alertname}}: {{.series}} is {{.value}}"); alertManagerHandler.onEvent(new AlertManagerEvent(alertName, extraLabels, annotations));
触发器可以使用ForwardSink通过HTTP和MQTT协议转发写入的数据,其内置了HTTPForwardHandler和MQTTForwardHandler。为提高连接使用效率,所有HTTPForwardHandler共用一个连接池,而host,port和username参数相同的MQTTForwardHandler共用一个连接池。
MQTTForwardHandler与MQTTHandler的区别在于,前者使用连接池而后者没有使用连接池,并且消息的格式也不同。
使用示例见ForwardTrigger。
ForwardTrigger是内置的用于实现数据分发的触发器,它使用ForwardSink和消费队列实现了对触发事件的异步批量处理。采用异步的方式进行转发,可以避免因为转发阻塞导致的系统阻塞。而采用ForwardSink中的连接池可使得池中的连接可以得到高效、安全的复用,避免了连接频繁建立、关闭的开销。
目前消息格式仅支持固定模板的JSON格式,模板如下:
[{"device":"%s","measurement":"%s","timestamp":%d,"value":%s}]
创建一个使用HTTP协议的forward_http触发器和一个使用MQTT协议的forward_mqtt触发器,两者分别订阅前缀路径root.http和root.mqtt。
CREATE trigger forward_http AFTER INSERT ON root.http AS 'org.apache.iotdb.db.engine.trigger.builtin.ForwardTrigger' WITH ('protocol' = 'http', 'endpoint' = 'http://127.0.0.1:8080/forward_receive') CREATE trigger forward_mqtt AFTER INSERT ON root.mqtt AS 'org.apache.iotdb.db.engine.trigger.builtin.ForwardTrigger' WITH ('protocol' = 'mqtt', 'host' = '127.0.0.1', 'port' = '1883', 'username' = 'root', 'password' = 'root', 'topic' = 'mqtt-test')
向两个前缀路径的子路径插入数据,激发触发器。
INSERT INTO root.http.d1(timestamp, s1) VALUES (1, 1); INSERT INTO root.http.d1(timestamp, s2) VALUES (2, true); INSERT INTO root.mqtt.d1(timestamp, s1) VALUES (1, 1); INSERT INTO root.mqtt.d1(timestamp, s2) VALUES (2, true);
触发器激发后,在HTTP接收端会接收到如下格式的JSON数据:
[ { "device":"root.http.d1", "measurement":"s1", "timestamp":1, "value":1.0 }, { "device":"root.http.d1", "measurement":"s2", "timestamp":2, "value":true } ]
触发器触发后,在MQTT接收端会接收到如下格式的JSON数据:
[ { "device":"root.mqtt.d1", "measurement":"s1", "timestamp":1, "value":1.0 }, { "device":"root.mqtt.d1", "measurement":"s2", "timestamp":2, "value":true } ]
| 参数 | 必填 | 默认值 | 上限 | 描述 |
|---|---|---|---|---|
| protocol | true | http | 转发协议,如HTTP/MQTT | |
| queueNumber | 8 | 8 | 队列数量,与全局参数trigger_forward_max_queue_number比较取小 | |
| queueSize | 2000 | 2000 | 队列大小,与全局参数trigger_forward_max_size_per_queue比较取小 | |
| batchSize | 50 | 50 | 每次最大转发批量,与全局参数trigger_forward_batch_size比较取小 | |
| stopIfException | false | 出现异常是否终止 | ||
| endpoint | true | 请求端点地址(HTTP协议参数) 说明:HTTP连接池参数取决于全局参数 trigger_forward_http_pool_size=200 和 trigger_forward_http_pool_max_per_route=20 | ||
| host | true | MQTT Broker主机名(MQTT 协议参数) | ||
| port | true | MQTT Broker端口号(MQTT 协议参数) | ||
| username | true | 用户名(MQTT 协议参数) | ||
| password | true | 密码(MQTT 协议参数) | ||
| topic | true | MQTT消息的主题(MQTT 协议参数) | ||
| reconnectDelay | 10ms | 重连等待时间(MQTT 协议参数) | ||
| connectAttemptsMax | 3 | 最大尝试连接次数(MQTT 协议参数) | ||
| qos | exactly_once | 服务质量保证(MQTT 协议参数),可选exactly_once,at_least_once,at_most_once | ||
| poolSize | 4 | 4 | MQTT连接池大小(MQTT 协议参数),与全局参数trigger_forward_mqtt_pool_size比较取小 | |
| retain | false | Publish后是否让MQTT Broker保持消息(MQTT 协议参数) |
如果您使用 Maven,可以参考我们编写的示例项目 trigger-example。
您可以在 这里 找到它。
它展示了:
package org.apache.iotdb.trigger; import org.apache.iotdb.db.engine.trigger.api.Trigger; import org.apache.iotdb.db.engine.trigger.api.TriggerAttributes; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.engine.trigger.sink.mqtt.MQTTConfiguration; import org.apache.iotdb.db.engine.trigger.sink.mqtt.MQTTEvent; import org.apache.iotdb.db.engine.trigger.sink.mqtt.MQTTHandler; import org.apache.iotdb.db.engine.trigger.sink.local.LocalIoTDBConfiguration; import org.apache.iotdb.db.engine.trigger.sink.local.LocalIoTDBEvent; import org.apache.iotdb.db.engine.trigger.sink.local.LocalIoTDBHandler; import org.apache.iotdb.db.utils.windowing.configuration.SlidingSizeWindowConfiguration; import org.apache.iotdb.db.utils.windowing.handler.SlidingSizeWindowEvaluationHandler; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.fusesource.mqtt.client.QoS; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TriggerExample implements Trigger { private static final Logger LOGGER = LoggerFactory.getLogger(TriggerExample.class); private static final String TARGET_DEVICE = "root.alerting"; private final LocalIoTDBHandler localIoTDBHandler = new LocalIoTDBHandler(); private final MQTTHandler mqttHandler = new MQTTHandler(); private SlidingSizeWindowEvaluationHandler windowEvaluationHandler; @Override public void onCreate(TriggerAttributes attributes) throws Exception { LOGGER.info("onCreate(TriggerAttributes attributes)"); double lo = attributes.getDouble("lo"); double hi = attributes.getDouble("hi"); openSinkHandlers(); windowEvaluationHandler = new SlidingSizeWindowEvaluationHandler( new SlidingSizeWindowConfiguration(TSDataType.DOUBLE, 5, 5), window -> { double avg = 0; for (int i = 0; i < window.size(); ++i) { avg += window.getDouble(i); } avg /= window.size(); if (avg < lo || hi < avg) { localIoTDBHandler.onEvent(new LocalIoTDBEvent(window.getTime(0), avg)); mqttHandler.onEvent( new MQTTEvent("test", QoS.EXACTLY_ONCE, false, window.getTime(0), avg)); } }); } @Override public void onDrop() throws Exception { LOGGER.info("onDrop()"); closeSinkHandlers(); } @Override public void onStart() throws Exception { LOGGER.info("onStart()"); openSinkHandlers(); } @Override public void onStop() throws Exception { LOGGER.info("onStop()"); closeSinkHandlers(); } @Override public Double fire(long timestamp, Double value) { windowEvaluationHandler.collect(timestamp, value); return value; } @Override public double[] fire(long[] timestamps, double[] values) { for (int i = 0; i < timestamps.length; ++i) { windowEvaluationHandler.collect(timestamps[i], values[i]); } return values; } private void openSinkHandlers() throws Exception { localIoTDBHandler.open( new LocalIoTDBConfiguration( TARGET_DEVICE, new String[]{"local"}, new TSDataType[]{TSDataType.DOUBLE})); mqttHandler.open( new MQTTConfiguration( "127.0.0.1", 1883, "root", new PartialPath(TARGET_DEVICE), new String[]{"remote"})); } private void closeSinkHandlers() throws Exception { localIoTDBHandler.close(); mqttHandler.close(); } }
您可以按照下面的步骤试用这个触发器:
在iotdb-engine.properties中启用 MQTT 服务
# whether to enable the mqtt service. enable_mqtt_service=true
启动 IoTDB 服务器
通过 cli 创建时间序列
CREATE TIMESERIES root.sg1.d1.s1 WITH DATATYPE=DOUBLE, ENCODING=PLAIN;
将 trigger-example 中打包好的 JAR(trigger-example-0.14.0-SNAPSHOT.jar)放置到目录 iotdb-server-0.14.0-SNAPSHOT/ext/trigger (也可以是iotdb-server-0.14.0-SNAPSHOT/ext/trigger的子目录)下
您可以通过修改配置文件中的
trigger_root_dir来指定加载触发器 JAR 包的根路径。
使用 SQL 语句注册该触发器,假定赋予该触发器的名字为window-avg-alerter
使用CREATE TRIGGER语句注册该触发器
CREATE TRIGGER `window-avg-alerter` AFTER INSERT ON root.sg1.d1.s1 AS 'org.apache.iotdb.trigger.TriggerExample' WITH ( 'lo' = '0', 'hi' = '10.0' )
使用 cli 插入测试数据
INSERT INTO root.sg1.d1(timestamp, s1) VALUES (1, 0); INSERT INTO root.sg1.d1(timestamp, s1) VALUES (2, 2); INSERT INTO root.sg1.d1(timestamp, s1) VALUES (3, 4); INSERT INTO root.sg1.d1(timestamp, s1) VALUES (4, 6); INSERT INTO root.sg1.d1(timestamp, s1) VALUES (5, 8); INSERT INTO root.sg1.d1(timestamp, s1) VALUES (6, 10); INSERT INTO root.sg1.d1(timestamp, s1) VALUES (7, 12); INSERT INTO root.sg1.d1(timestamp, s1) VALUES (8, 14); INSERT INTO root.sg1.d1(timestamp, s1) VALUES (9, 16); INSERT INTO root.sg1.d1(timestamp, s1) VALUES (10, 18);
使用 cli 查询数据以验证触发器的行为
SELECT * FROM root.alerting;
正常情况下,得到如下结果
IoTDB> SELECT * FROM root.alerting; +-----------------------------+--------------------+-------------------+ | Time|root.alerting.remote|root.alerting.local| +-----------------------------+--------------------+-------------------+ |1970-01-01T08:00:00.006+08:00| 14.0| 14.0| +-----------------------------+--------------------+-------------------+ Total line number = 1 It costs 0.006s
以上就是基本的使用方法,希望您能喜欢 :D
触发器是通过反射技术动态装载的,因此您在装载过程中无需启停服务器。
不同的 JAR 包中最好不要有全类名相同但功能实现不一样的类。例如:触发器trigger1、trigger2分别对应资源trigger1.jar、trigger2.jar。如果两个 JAR 包里都包含一个org.apache.iotdb.db.engine.trigger.example.AlertListener类,当CREATE TRIGGER使用到这个类时,系统会随机加载其中一个 JAR 包中的类,最终导致触发器执行行为不一致以及其他的问题。
拥有同一个全类名的触发器类的版本管理问题。IoTDB 不允许系统中存在拥有同一全类名但是版本(逻辑)不一样的触发器。
相关问题:IoTDB 预先注册了 10 个org.apache.iotdb.db.engine.trigger.example.AlertListener触发器实例,DBA 更新了org.apache.iotdb.db.engine.trigger.example.AlertListener的实现和对应的 JAR 包,是否可以只卸载其中 5 个,将这 5 个替换为新的实现?
回答:无法做到。只有将预先注册的 10 个触发器全部卸载,才能装载到新的触发器实例。在原有触发器没有全部被卸载的情况下,新注册的拥有相同全类名的触发器行为只会与现有触发器的行为一致。