The trigger provides a mechanism for listening to changes in time series data. With user-defined logic, tasks such as alerting, data cleaning, and data forwarding can be conducted.
The trigger is implemented based on the reflection mechanism. Users can monitor data changes by implementing the Java interfaces. IoTDB allows users to dynamically register and drop triggers without restarting the server.
The document will help you learn to define and manage triggers.
You need to implement the trigger by writing a Java class, where the dependency shown below is required. If you use Maven, you can search for them directly from the Maven repository.
<dependency> <groupId>org.apache.iotdb</groupId> <artifactId>iotdb-server</artifactId> <version>0.14.0-SNAPSHOT</version> <scope>provided</scope> </dependency>
Note that the dependency version should be correspondent to the target server version.
To implement a trigger, you need to implement the org.apache.iotdb.db.engine.trigger.api.Trigger
class.
This class provides two types of programming interfaces: life cycle hooks and data change listening hooks. All the interfaces in this class are not required to be implemented. When the interfaces are not implemented, the trigger will not respond to the data changes. You can implement only some of these interfaces according to your needs.
Descriptions of the interfaces are as followed.
Interface Definition | Description |
---|---|
void onCreate(TriggerAttributes attributes) throws Exception | When you use the CREATE TRIGGER statement to register the trigger, the hook will be called. In the life cycle of each instance, the hook will and only will be called once. This hook mainly has the following functions: 1. To parse custom attributes in SQL statements (using TriggerAttributes ). 2. To create or apply for resources, such as establishing external links, opening files, etc. |
void onDrop() throws Exception | When you use the DROP TRIGGER statement to drop the trigger, the hook will be called. In the life cycle of each instance, the hook will and only will be called once. The main function of this hook is to perform operations such as releasing resources. |
void onStart() throws Exception | This hook will be called when you use the START TRIGGER statement to manually start the trigger (whose state should be STOPPED ). |
void onStop() throws Exception | This hook will be called when you use the STOP TRIGGER statement to manually stop the trigger (whose state should be STARTED ). |
Currently, triggers can only listen for data insertion operations.
The timing of calling the data change listener hooks is explicitly specified in the CREATE TRIGGER
statement rather than at the programming interface level.
Integer fire(long timestamp, Integer value) throws Exception; Long fire(long timestamp, Long value) throws Exception; Float fire(long timestamp, Float value) throws Exception; Double fire(long timestamp, Double value) throws Exception; Boolean fire(long timestamp, Boolean value) throws Exception; Binary fire(long timestamp, Binary value) throws Exception;
For each data insertion in the registered time series, the trigger will call fire
as a response. The input parameters timestamp
and value
are the time and value of the data point inserted this time. You can write any logic to process data in the fire
hook.
Note that currently the return value of the hook is meaningless.
int[] fire(long[] timestamps, int[] values) throws Exception; long[] fire(long[] timestamps, long[] values) throws Exception; float[] fire(long[] timestamps, float[] values) throws Exception; double[] fire(long[] timestamps, double[] values) throws Exception; boolean[] fire(long[] timestamps, boolean[] values) throws Exception; Binary[] fire(long[] timestamps, Binary[] values) throws Exception;
If you need to use the insertTablet
interface or the insertTablets
interface of the Session API, you can reduce the overhead of trigger calls by implementing the above batch data insertion listening hooks.
It is recommended that the behaviors of the batch data insertion listening hooks and the single point insertion listening hooks be consistent. The default implemetation of the listening hook for batch data insertion is as followed.
default int[] fire(long[] timestamps, int[] values) throws Exception { int size = timestamps.length; for (int i = 0; i < size; ++i) { fire(timestamps[i], values[i]); } return values; }
Note that currently the return value of the hook is meaningless.
You can register, drop, start or stop a trigger instance through SQL statements, and you can also query all registered triggers through SQL statements.
Triggers have two states: STARTED
and STOPPED
. You can start or stop a trigger by executing START TRIGGER
or STOP TRIGGER
.
When the state of a trigger is STOPPED
, it will not respond to the operations on the registered time series (such as inserting a data point), but all status (trigger variables), and registration information will be saved.
Note that the default state of the triggers registered by the CREATE TRIGGER
statement is STARTED
.
A trigger can only be registered on an existing time series. Only one trigger can be registered for any time series.
The time series registered with the trigger will be listened to by the trigger. When there is a data change in the time series, the corresponding hook in the trigger will be called.
Registering a trigger can be carried out as follows:
Implement a complete Trigger class. Assume that the full class name of this class is org.apache.iotdb.db.engine.trigger.example.AlertListener
.
Pack the project into a JAR package. If you use Maven to manage the project, you can refer to the above Maven project example.
Put the JAR package in the directory iotdb-server-0.14.0-SNAPSHOT/ext/trigger
(or a subdirectory of iotdb-server-0.14.0-SNAPSHOT/ext/trigger
).
You can specify the root path to load the trigger JAR packages by modifying the
trigger_root_dir
in the configuration file.
Use the SQL statement to register the trigger. Assume that the name given to the trigger is alert-listener-sg1d1s1
.
CREATE TRIGGER `alert-listener-sg1d1s1` AFTER INSERT ON root.sg1.d1.s1 AS 'org.apache.iotdb.db.engine.trigger.example.AlertListener' WITH ( 'lo' = '0', 'hi' = '100.0' )
The following shows the SQL syntax of registering a trigger.
CREATE TRIGGER <TRIGGER-NAME> (BEFORE | AFTER) INSERT ON <FULL-PATH> AS <CLASSNAME>
You can also set any number of key-value pair attributes for the trigger through the WITH
clause:
CREATE TRIGGER <TRIGGER-NAME> (BEFORE | AFTER) INSERT ON <FULL-PATH> AS <CLASSNAME> WITH ( <KEY-1>=<VALUE-1>, <KEY-2>=<VALUE-2>, ... )
TRIGGER-NAME
is a globally unique ID of the trigger, which is case sensitive.
At present, the trigger can listen to all data insertion operations on the time series. The hook can be called BEFORE
or AFTER
the data is inserted.
FULL-PATH
can be a time-series (measurement) path such as root.sg1.d1.s1, a device path such as root.sg1.d1, a storage group path such as root.sg1, or even a non-measurement path with business semantic such as root.sg1.x.
CLASSNAME
is the full class name of the trigger.
Note
CLASSNAME
, KEY
and VALUE
in the attributes need to be quoted in single or double quotes.full-path
.trigger-sg1d1s1
is registed on root.sg1.d1, trigger trigger-sg1d1
is registed on root.sg1.d1, and trigger trigger-sg1
is registed on root.sg1. When inserting data to root.sg1.d1.s1, triggers will be triggered in the following order: trigger-sg1d1s1 -> trigger-sg1d1 -> trigger-sg1.Triggers will be dropped in the following scenarios:
DELETE TIMESERIES
, the triggers registered on the time series will be dropped.DELETE STORAGE GROUP
, the triggers registered under the storage group will be dropped.DROP TRIGGER
statement.The following shows the SQL syntax of dropping a trigger:
DROP TRIGGER <TRIGGER-NAME>
The following is an example of a DROP TRIGGER
statement:
DROP TRIGGER `alert-listener-sg1d1s1`
This operation changes the state of the trigger from STOPPED
to STARTED
, which will make the trigger re-listen to the operations on the registered time series and respond to data changes.
The following shows the SQL syntax of starting a trigger:
START TRIGGER <TRIGGER-NAME>
The following is an example of a START TRIGGER
statement:
START TRIGGER `alert-listener-sg1d1s1`
Note that the triggers registered by the CREATE TRIGGER
statements are STARTED
by default.
This operation changes the state of the trigger from STARTED
to STOPPED
. When the status of a trigger is STOPPED
, it will not respond to the operations on the registered time series (such as inserting a data point). You can restart a trigger using the START TRIGGER
statement.
The following shows the SQL syntax of stopping a trigger:
STOP TRIGGER <TRIGGER-NAME>
The following is an example of a STOP TRIGGER
statement:
STOP TRIGGER `alert-listener-sg1d1s1`
SHOW TRIGGERS
When a user manages triggers, 4 types of authorities will be involved:
CREATE_TRIGGER
: Only users with this authority are allowed to register triggers. This authority is path dependent.DROP_TRIGGER
: Only users with this authority are allowed to drop triggers. This authority is path dependent.START_TRIGGER
: Only users with this authority are allowed to start triggers. This authority is path dependent.STOP_TRIGGER
: Only users with this authority are allowed to stop triggers. This authority is path dependent.For more information, refer to Authority Management Statement.
Utility classes provide programming paradigms and execution frameworks for the common requirements, which can simplify part of your work of implementing triggers.
The windowing utility can help you define sliding windows and the data processing logic on them. It can construct two types of sliding windows: one has a fixed time interval (SlidingTimeWindowEvaluationHandler
), and the other has fixed number of data points (SlidingSizeWindowEvaluationHandler
).
The windowing utility allows you to define a hook (Evaluator
) on the window (Window
). Whenever a new window is formed, the hook you defined will be called once. You can define any data processing-related logic in the hook. The call of the hook is asynchronous. Therefore, the current thread will not be blocked when the window processing logic is executed.
It is worth noting that whether it is SlidingTimeWindowEvaluationHandler
or SlidingSizeWindowEvaluationHandler
, they can only handle sequences with strictly monotonically increasing timestamps, and incoming data points that do not meet the requirements will be discarded.
For the definition of Window
and Evaluator
, please refer to the org.apache.iotdb.db.utils.windowing.api
package.
There are two construction methods.
The first method requires you to provide the type of data points that the window collects, the window size, the sliding step, and a hook (Evaluator
).
final TSDataType dataType = TSDataType.INT32; final int windowSize = 10; final int slidingStep = 5; SlidingSizeWindowEvaluationHandler handler = new SlidingSizeWindowEvaluationHandler( new SlidingSizeWindowConfiguration(dataType, windowSize, slidingStep), window -> { // do something });
The second method requires you to provide the type of data points that the window collects, the window size, and a hook (Evaluator
). The sliding step is equal to the window size by default.
final TSDataType dataType = TSDataType.INT32; final int windowSize = 10; SlidingSizeWindowEvaluationHandler handler = new SlidingSizeWindowEvaluationHandler( new SlidingSizeWindowConfiguration(dataType, windowSize), window -> { // do something });
The window size and the sliding step must be positive.
final long timestamp = 0; final int value = 0; hander.collect(timestamp, value);
Note that the type of the second parameter accepted by the collect
method needs to be consistent with the dataType
parameter provided during construction.
In addition, the collect
method will only respond to data points whose timestamps are monotonically increasing. If the time stamp of the data point collected by the collect
method is less than or equal to the time stamp of the data point collected by the previous collect
method call, the data point collected this time will be discarded.
Also note that the collect
method is not thread-safe.
There are two construction methods.
The first method requires you to provide the type of data points that the window collects, the time interval, the sliding step, and a hook (Evaluator
).
final TSDataType dataType = TSDataType.INT32; final long timeInterval = 1000; final long slidingStep = 500; SlidingTimeWindowEvaluationHandler handler = new SlidingTimeWindowEvaluationHandler( new SlidingTimeWindowConfiguration(dataType, timeInterval, slidingStep), window -> { // do something });
The second method requires you to provide the type of data points that the window collects, the time interval, and a hook (Evaluator
). The sliding step is equal to the time interval by default.
final TSDataType dataType = TSDataType.INT32; final long timeInterval = 1000; SlidingTimeWindowEvaluationHandler handler = new SlidingTimeWindowEvaluationHandler( new SlidingTimeWindowConfiguration(dataType, timeInterval), window -> { // do something });
The time interval and the sliding step must be positive.
final long timestamp = 0; final int value = 0; hander.collect(timestamp, value);
Note that the type of the second parameter accepted by the collect
method needs to be consistent with the dataType
parameter provided during construction.
In addition, the collect
method will only respond to data points whose timestamps are monotonically increasing. If the time stamp of the data point collected by the collect
method is less than or equal to the time stamp of the data point collected by the previous collect
method call, the data point collected this time will be discarded.
Also note that the collect
method is not thread-safe.
The execution of window evaluation tasks is asynchronous.
When asynchronous tasks cannot be consumed by the execution thread pool in time, tasks will accumulate. In extreme cases, the accumulation of asynchronous tasks can cause the system OOM. Therefore, the number of tasks that the window evaluation thread pool allows to accumulate is set to a finite value.
When the number of accumulated tasks exceeds the limit, the newly submitted tasks will not be able to enter the thread pool for execution. At this time, the system will call the rejection policy hook onRejection
that you have implemented in the listening hook (Evaluator
) for processing.
The default behavior of onRejection
is as follows.
default void onRejection(Window window) { throw new RejectedExecutionException(); }
The way to implement a rejection strategy hook is as follows.
SlidingTimeWindowEvaluationHandler handler = new SlidingTimeWindowEvaluationHandler( new SlidingTimeWindowConfiguration(TSDataType.INT32, 1, 1), new Evaluator() { @Override public void evaluate(Window window) { // do something } @Override public void onRejection(Window window) { // do something } });
The number of threads that can be used for evaluating sliding windows. The value is equals to CPU core number by default.
The maximum number of window evaluation tasks that can be pending for execution. The value is 64 by default.
The sink utility provides the ability for triggers to connect to external systems.
It provides a programming paradigm. Each sink utility contains a Handler
for processing data sending, a Configuration
for configuring Handler
, and an Event
for describing the sending data.
LocalIoTDBSink
is used to insert data points to the local sequence.
Before writing data, it is not required that the time series have been created.
Note, in the scenario used for triggers, the listening time series and the written target time series should not be in the same storage group.
Example:
final String device = "root.alerting"; final String[] measurements = new String[] {"local"}; final TSDataType[] dataTypes = new TSDataType[] {TSDataType.DOUBLE}; LocalIoTDBHandler localIoTDBHandler = new LocalIoTDBHandler(); localIoTDBHandler.open(new LocalIoTDBConfiguration(device, measurements, dataTypes)); // insert 100 data points for (int i = 0; i < 100; ++i) { final long timestamp = i; final double value = i; localIoTDBHandler.onEvent(new LocalIoTDBEvent(timestamp, value)); }
Note that when you need to insert data points to a time series of type TEXT
, you need to use org.apache.iotdb.tsfile.utils.Binary
:
// insert 100 data points for (int i = 0; i < 100; ++i) { final long timestamp = i; final String value = "" + i; localIoTDBHandler.onEvent(new LocalIoTDBEvent(timestamp, Binary.valueOf(value))); }
In triggers, you can use MQTTSink
to send data points to other IoTDB instances.
Before sending data, it is not required that the time series have been created.
Example:
final String host = "127.0.0.1"; final int port = 1883; final String username = "root"; final String password = "root"; final PartialPath device = new PartialPath("root.alerting"); final String[] measurements = new String[] {"remote"}; MQTTHandler mqttHandler = new MQTTHandler(); mqttHandler.open(new MQTTConfiguration(host, port, username, password, device, measurements)); final String topic = "test"; final QoS qos = QoS.EXACTLY_ONCE; final boolean retain = false; // send 100 data points for (int i = 0; i < 100; ++i) { final long timestamp = i; final double value = i; mqttHandler.onEvent(new MQTTEvent(topic, qos, retain, timestamp, value)); }
In a trigger, you can use AlertManagerSink
to send messages to AlertManager。
You need to specify the endpoint to send alerts of your AlertManager when constructing AlertManagerConfiguration
AlertManagerConfiguration(String endpoint);
AlertManagerEvent
offers three types of constructors:
AlertManagerEvent(String alertname); AlertManagerEvent(String alertname, Map<String, String> extraLabels); AlertManagerEvent(String alertname, Map<String, String> extraLabels, Map<String, String> annotations);
alertname
is a required parameter to identify an alert
. The alertname
field can be used for grouping and deduplication when the AlertManager
sends an alert.extraLabels
is optional. In the backend, it is combined with alertname
to form labels
to identify an alert
, which can be used for grouping and deduplication when AlertManager
sends alarms.annotations
is optional, and its value can use Go style template{{.<label_key>}}It will be replaced with
labels[<label_key>]
when the message is finally generated.labels
and annotations
will be parsed into json string and sent to AlertManager
:{ "labels": { "alertname": "<requiredAlertName>", "<labelname>": "<labelvalue>", ... }, "annotations": { "<labelname>": "<labelvalue>", ... } }
Call the onEvent(AlertManagerEvent event)
method of AlertManagerHandler
to send an alert.
Example 1:
Only pass alertname
.
AlertManagerHandler alertManagerHandler = new AlertManagerHandler(); alertManagerHandler.open(new AlertManagerConfiguration("http://127.0.0.1:9093/api/v1/alerts")); final String alertName = "test0"; AlertManagerEvent alertManagerEvent = new AlertManagerEvent(alertName); alertManagerHandler.onEvent(alertManagerEvent);
Example 2:
Pass alertname
and extraLabels
.
AlertManagerHandler alertManagerHandler = new AlertManagerHandler(); alertManagerHandler.open(new AlertManagerConfiguration("http://127.0.0.1:9093/api/v1/alerts")); final String alertName = "test1"; final HashMap<String, String> extraLabels = new HashMap<>(); extraLabels.put("severity", "critical"); extraLabels.put("series", "root.ln.wt01.wf01.temperature"); extraLabels.put("value", String.valueOf(100.0)); AlertManagerEvent alertManagerEvent = new AlertManagerEvent(alertName, extraLabels); alertManagerHandler.onEvent(alertManagerEvent);
Example 3:
Pass alertname
, extraLabels
和 annotations
.
The final value of the description
field will be parsed as test2: root.ln.wt01.wf01.temperature is 100.0
.
AlertManagerHandler alertManagerHandler = new AlertManagerHandler(); alertManagerHandler.open(new AlertManagerConfiguration("http://127.0.0.1:9093/api/v1/alerts")); final String alertName = "test2"; final HashMap<String, String> extraLabels = new HashMap<>(); extraLabels.put("severity", "critical"); extraLabels.put("series", "root.ln.wt01.wf01.temperature"); extraLabels.put("value", String.valueOf(100.0)); final HashMap<String, String> annotations = new HashMap<>(); annotations.put("summary", "high temperature"); annotations.put("description", "{{.alertname}}: {{.series}} is {{.value}}"); alertManagerHandler.onEvent(new AlertManagerEvent(alertName, extraLabels, annotations));
Trigger can use ForwardSink to forward written data through HTTP and MQTT, which has contains HTTPForwardHandler and MQTTForwardHandler. To improve connection efficiency, all HTTPForwardHandlers share a connection pool, while MQTTForwardHandlers with the same host, port and username parameters share a connection pool.
The difference between MQTTForwardHandler and MQTTHandler is that the former uses connection pool while the latter does not, and the message format is also different.
See ForwardTrigger as example.
ForwardTrigger is a built-in trigger for data distribution/forwarding. It uses ForwardSink and consumption queue to realize asynchronous batch processing of trigger events. Asynchronous forwarding can avoid the system blocking caused by forwarding blocking. The connection pool in ForwardSink can make the connections in the pool reuse efficiently and safely, and avoid the overhead of frequent connection establishment and closing.
At present, the message format only supports JSON format of fixed template. The template is as follows:
[{"device":"%s","measurement":"%s","timestamp":%d,"value":%s}]
Create a forward_http trigger with HTTP protocol and a forward_mqtt trigger with mqtt protocol, which subscribes to the prefix path root.http
and root.mqtt
respectively.
CREATE trigger forward_http AFTER INSERT ON root.http AS 'org.apache.iotdb.db.engine.trigger.builtin.ForwardTrigger' WITH ('protocol' = 'http', 'endpoint' = 'http://127.0.0.1:8080/forward_receive') CREATE trigger forward_mqtt AFTER INSERT ON root.mqtt AS 'org.apache.iotdb.db.engine.trigger.builtin.ForwardTrigger' WITH ('protocol' = 'mqtt', 'host' = '127.0.0.1', 'port' = '1883', 'username' = 'root', 'password' = 'root', 'topic' = 'mqtt-test')
Insert data into the sub-path of the two prefix paths to fire the trigger.
INSERT INTO root.http.d1(timestamp, s1) VALUES (1, 1); INSERT INTO root.http.d1(timestamp, s2) VALUES (2, true); INSERT INTO root.mqtt.d1(timestamp, s1) VALUES (1, 1); INSERT INTO root.mqtt.d1(timestamp, s2) VALUES (2, true);
After the trigger is fired, JSON data in follow format will be received at the HTTP receiving end:
[ { "device":"root.http.d1", "measurement":"s1", "timestamp":1, "value":1.0 }, { "device":"root.http.d1", "measurement":"s2", "timestamp":2, "value":true } ]
After the trigger is fired, JSON data in follow format will be received at the MQTT receiving end:
[ { "device":"root.mqtt.d1", "measurement":"s1", "timestamp":1, "value":1.0 }, { "device":"root.mqtt.d1", "measurement":"s2", "timestamp":2, "value":true } ]
Parameter | Required | Default | Max | Description |
---|---|---|---|---|
protocol | true | http | Forward protocol, such as HTTP/MQTT | |
queueNumber | 8 | 8 | The number of queue, comparing to the global parameter trigger_forward_max_queue_number and take the smaller | |
queueSize | 2000 | 2000 | The size of queue, comparing to the global parameter trigger_forward_max_size_per_queue and take the smaller | |
batchSize | 50 | 50 | The size of each forwarding batch, comparing to the global parameter trigger_forward_batch_size and take the smaller | |
stopIfException | false | Stop forwarding if exception occurs | ||
endpoint | true | Request endpoint address (HTTP protocol parameter) Note: HTTP connection pool parameters depend on global parameters: trigger_forward_http_pool_size=200 and trigger_forward_http_pool_max_per_route=20 | ||
host | true | MQTT broker host (MQTT protocol parameter) | ||
port | true | MQTT broker port (MQTT protocol parameter) | ||
username | true | Username (MQTT protocol parameter) | ||
password | true | Password (MQTT protocol parameter) | ||
topic | true | The topic of MQTT message (MQTT protocol parameter) | ||
reconnectDelay | 10ms | Reconnection waiting time (MQTT protocol parameter) | ||
connectAttemptsMax | 3 | Max connection attempts (MQTT protocol parameter) | ||
qos | exactly_once | Quality of Service (MQTT protocol parameter), must be exactly_once, at_least_once or at_most_once | ||
poolSize | 4 | 4 | MQTT Connection Pool Size (MQTT protocol parameter), comparing to the global parameter trigger_forward_mqtt_pool_size and take the smaller | |
retain | false | Let MQTT Broker retain the message after publishing (MQTT protocol parameter) |
If you use Maven, you can refer to our sample project trigger-example.
You can find it here.
It shows:
package org.apache.iotdb.trigger; import org.apache.iotdb.db.engine.trigger.api.Trigger; import org.apache.iotdb.trigger.api.TriggerAttributes; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.engine.trigger.sink.mqtt.MQTTConfiguration; import org.apache.iotdb.db.engine.trigger.sink.mqtt.MQTTEvent; import org.apache.iotdb.db.engine.trigger.sink.mqtt.MQTTHandler; import org.apache.iotdb.db.engine.trigger.sink.local.LocalIoTDBConfiguration; import org.apache.iotdb.db.engine.trigger.sink.local.LocalIoTDBEvent; import org.apache.iotdb.db.engine.trigger.sink.local.LocalIoTDBHandler; import org.apache.iotdb.db.utils.windowing.configuration.SlidingSizeWindowConfiguration; import org.apache.iotdb.db.utils.windowing.handler.SlidingSizeWindowEvaluationHandler; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.fusesource.mqtt.client.QoS; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TriggerExample implements Trigger { private static final Logger LOGGER = LoggerFactory.getLogger(TriggerExample.class); private static final String TARGET_DEVICE = "root.alerting"; private final LocalIoTDBHandler localIoTDBHandler = new LocalIoTDBHandler(); private final MQTTHandler mqttHandler = new MQTTHandler(); private SlidingSizeWindowEvaluationHandler windowEvaluationHandler; @Override public void onCreate(TriggerAttributes attributes) throws Exception { LOGGER.info("onCreate(TriggerAttributes attributes)"); double lo = attributes.getDouble("lo"); double hi = attributes.getDouble("hi"); openSinkHandlers(); windowEvaluationHandler = new SlidingSizeWindowEvaluationHandler( new SlidingSizeWindowConfiguration(TSDataType.DOUBLE, 5, 5), window -> { double avg = 0; for (int i = 0; i < window.size(); ++i) { avg += window.getDouble(i); } avg /= window.size(); if (avg < lo || hi < avg) { localIoTDBHandler.onEvent(new LocalIoTDBEvent(window.getTime(0), avg)); mqttHandler.onEvent( new MQTTEvent("test", QoS.EXACTLY_ONCE, false, window.getTime(0), avg)); } }); } @Override public void onDrop() throws Exception { LOGGER.info("onDrop()"); closeSinkHandlers(); } @Override public void onStart() throws Exception { LOGGER.info("onStart()"); openSinkHandlers(); } @Override public void onStop() throws Exception { LOGGER.info("onStop()"); closeSinkHandlers(); } @Override public Double fire(long timestamp, Double value) { windowEvaluationHandler.collect(timestamp, value); return value; } @Override public double[] fire(long[] timestamps, double[] values) { for (int i = 0; i < timestamps.length; ++i) { windowEvaluationHandler.collect(timestamps[i], values[i]); } return values; } private void openSinkHandlers() throws Exception { localIoTDBHandler.open( new LocalIoTDBConfiguration( TARGET_DEVICE, new String[]{"local"}, new TSDataType[]{TSDataType.DOUBLE})); mqttHandler.open( new MQTTConfiguration( "127.0.0.1", 1883, "root", "root", new PartialPath(TARGET_DEVICE), new String[]{"remote"})); } private void closeSinkHandlers() throws Exception { localIoTDBHandler.close(); mqttHandler.close(); } }
You can try this trigger by following the steps below:
Enable MQTT service by modifying iotdb-datanode.properties
# whether to enable the mqtt service. enable_mqtt_service=true
Start the IoTDB server
Create time series via cli
CREATE TIMESERIES root.sg1.d1.s1 WITH DATATYPE=DOUBLE, ENCODING=PLAIN;
Place the JAR (trigger-example-0.14.0-SNAPSHOT.jar
) of trigger-example in the directory iotdb-server-0.14.0-SNAPSHOT/ext/trigger
(or in a subdirectory of iotdb-server-0.14.0-SNAPSHOT/ext/trigger
)
You can specify the root path to load the trigger JAR package by modifying the
trigger_root_dir
in the configuration file.
Use the SQL statement to register the trigger, assuming that the name given to the trigger is window-avg-alerter
Use the CREATE TRIGGER
statement to register the trigger via cli
CREATE TRIGGER `window-avg-alerter` AFTER INSERT ON root.sg1.d1.s1 AS 'org.apache.iotdb.trigger.TriggerExample' WITH ( 'lo' = '0', 'hi' = '10.0' )
Use cli to insert test data
INSERT INTO root.sg1.d1(timestamp, s1) VALUES (1, 0); INSERT INTO root.sg1.d1(timestamp, s1) VALUES (2, 2); INSERT INTO root.sg1.d1(timestamp, s1) VALUES (3, 4); INSERT INTO root.sg1.d1(timestamp, s1) VALUES (4, 6); INSERT INTO root.sg1.d1(timestamp, s1) VALUES (5, 8); INSERT INTO root.sg1.d1(timestamp, s1) VALUES (6, 10); INSERT INTO root.sg1.d1(timestamp, s1) VALUES (7, 12); INSERT INTO root.sg1.d1(timestamp, s1) VALUES (8, 14); INSERT INTO root.sg1.d1(timestamp, s1) VALUES (9, 16); INSERT INTO root.sg1.d1(timestamp, s1) VALUES (10, 18);
Use cli to query data to verify the behavior of the trigger
SELECT * FROM root.alerting;
Under normal circumstances, the following results should be shown
IoTDB> SELECT * FROM root.alerting; +-----------------------------+--------------------+-------------------+ | Time|root.alerting.remote|root.alerting.local| +-----------------------------+--------------------+-------------------+ |1970-01-01T08:00:00.006+08:00| 14.0| 14.0| +-----------------------------+--------------------+-------------------+ Total line number = 1 It costs 0.006s
That's all, please enjoy it :D
The trigger is implemented based on the reflection mechanism. Triggers can be dynamically registered and dropped without restarting the server.
It is best not to have classes with the same full class name but different function implementations in different JAR packages under trigger_root_dir
. For example: the triggers trigger1
and trigger2
correspond to trigger1.jar
and trigger2.jar
respectively. If both JAR packages contain a org.apache.iotdb.db.engine.trigger.example.AlertListener
class, when this class is used by a CREATE TRIGGER
statement, the system will randomly load the class in one of the JAR packages, which may lead to inconsistent trigger behaviors and other problems.
Version management of trigger classes with the same full class name. Triggers with the same full class name but different versions (logic) are not allowed to register in the system.
Related question: IoTDB pre-registered 10 org.apache.iotdb.db.engine.trigger.example.AlertListener
trigger instances and DBA updated the implementation and corresponding JAR package of org.apache.iotdb.db.engine.trigger.example.AlertListener
. Is it possible to drop only 5 of the instances and replace them with 5 updated trigger instances?
Answer: No. Only by dropping all the 10 pre-registered triggers can you register the updated triggers. If all the original triggers are not dropped, the newly registered triggers with the same full class name will behave in the same way as the existing triggers.