The trigger provides a mechanism for listening to changes in time series data. With user-defined logic, tasks such as alerting and data forwarding can be conducted.
The trigger is implemented based on the reflection mechanism. Users can monitor data changes by implementing the Java interfaces. IoTDB allows users to dynamically register and drop triggers without restarting the server.
The document will help you learn to define and manage triggers.
A single trigger can be used to listen for data changes in a time series that match a specific pattern. For example, a trigger can listen for the data changes of time series root.sg.a
, or time series that match the pattern root.sg.*
. When you register a trigger, you can specify the path pattern that the trigger listens on through an SQL statement.
There are currently two types of triggers, and you can specify the type through an SQL statement when registering a trigger:
There are currently two trigger events for the trigger, and other trigger events will be expanded in the future. When you register a trigger, you can specify the trigger event through an SQL statement:
You need to implement the trigger by writing a Java class, where the dependency shown below is required. If you use Maven, you can search for them directly from the Maven repository.
<dependency> <groupId>org.apache.iotdb</groupId> <artifactId>iotdb-server</artifactId> <version>1.0.0</version> <scope>provided</scope> </dependency>
Note that the dependency version should be correspondent to the target server version.
To implement a trigger, you need to implement the org.apache.iotdb.trigger.api.Trigger
class.
import org.apache.iotdb.trigger.api.enums.FailureStrategy; import org.apache.iotdb.tsfile.write.record.Tablet; public interface Trigger { /** * This method is mainly used to validate {@link TriggerAttributes} before calling {@link * Trigger#onCreate(TriggerAttributes)}. * * @param attributes TriggerAttributes * @throws Exception e */ default void validate(TriggerAttributes attributes) throws Exception {} /** * This method will be called when creating a trigger after validation. * * @param attributes TriggerAttributes * @throws Exception e */ default void onCreate(TriggerAttributes attributes) throws Exception {} /** * This method will be called when dropping a trigger. * * @throws Exception e */ default void onDrop() throws Exception {} /** * When restarting a DataNode, Triggers that have been registered will be restored and this method * will be called during the process of restoring. * * @throws Exception e */ default void restore() throws Exception {} /** * Overrides this method to set the expected FailureStrategy, {@link FailureStrategy#OPTIMISTIC} * is the default strategy. * * @return {@link FailureStrategy} */ default FailureStrategy getFailureStrategy() { return FailureStrategy.OPTIMISTIC; } /** * @param tablet see {@link Tablet} for detailed information of data structure. Data that is * inserted will be constructed as a Tablet and you can define process logic with {@link * Tablet}. * @return true if successfully fired * @throws Exception e */ default boolean fire(Tablet tablet) throws Exception { return true; } }
This class provides two types of programming interfaces: Lifecycle related interfaces and data change listening related interfaces. All the interfaces in this class are not required to be implemented. When the interfaces are not implemented, the trigger will not respond to the data changes. You can implement only some of these interfaces according to your needs.
Descriptions of the interfaces are as followed.
Interface | Description |
---|---|
default void validate(TriggerAttributes attributes) throws Exception {} | When you creates a trigger using the CREATE TRIGGER statement, you can specify the parameters that the trigger needs to use, and this interface will be used to verify the correctness of the parameters。 |
default void onCreate(TriggerAttributes attributes) throws Exception {} | This interface is called once when you create a trigger using the CREATE TRIGGER statement. During the lifetime of each trigger instance, this interface will be called only once. This interface is mainly used for the following functions: helping users to parse custom attributes in SQL statements (using TriggerAttributes ). You can create or apply for resources, such as establishing external links, opening files, etc. |
default void onDrop() throws Exception {} | This interface is called when you drop a trigger using the DROP TRIGGER statement. During the lifetime of each trigger instance, this interface will be called only once. This interface mainly has the following functions: it can perform the operation of resource release and can be used to persist the results of trigger calculations. |
default void restore() throws Exception {} | When the DataNode is restarted, the cluster will restore the trigger instance registered on the DataNode, and this interface will be called once for stateful trigger during the process. After the DataNode where the stateful trigger instance is located goes down, the cluster will restore the trigger instance on another available DataNode, calling this interface once in the process. This interface can be used to customize recovery logic. |
/** * @param tablet see {@link Tablet} for detailed information of data structure. Data that is * inserted will be constructed as a Tablet and you can define process logic with {@link * Tablet}. * @return true if successfully fired * @throws Exception e */ default boolean fire(Tablet tablet) throws Exception { return true; }
When the data changes, the trigger uses the Tablet as the unit of firing operation. You can obtain the metadata and data of the corresponding sequence through Tablet, and then perform the corresponding trigger operation. If the fire process is successful, the return value should be true. If the interface returns false or throws an exception, we consider the trigger fire process as failed. When the trigger fire process fails, we will perform corresponding operations according to the listening strategy interface.
When performing an INSERT operation, for each time series in it, we will detect whether there is a trigger that listens to the path pattern, and then assemble the time series data that matches the path pattern listened by the same trigger into a new Tablet for trigger fire interface. Can be understood as:
Map<PartialPath, List<Trigger>> pathToTriggerListMap => Map<Trigger, Tablet>
Note that currently we do not make any guarantees about the order in which triggers fire.
Here is an example:
Suppose there are three triggers, and the trigger event of the triggers are all BEFORE INSERT:
root.sg.*
root.sg.a
root.sg.b
Insertion statement:
insert into root.sg(time, a, b) values (1, 1, 1);
The time series root.sg.a
matches Trigger1 and Trigger2, and the sequence root.sg.b
matches Trigger1 and Trigger3, then:
root.sg.a
and root.sg.b
will be assembled into a new tablet1, and Trigger1.fire(tablet1) will be executed at the corresponding Trigger Event.root.sg.a
will be assembled into a new tablet2, and Trigger2.fire(tablet2) will be executed at the corresponding Trigger Event.root.sg.b
will be assembled into a new tablet3, and Trigger3.fire(tablet3) will be executed at the corresponding Trigger Event.When the trigger fails to fire, we will take corresponding actions according to the strategy set by the listening strategy interface. You can set org.apache.iotdb.trigger.api.enums.FailureStrategy
. There are currently two strategies, optimistic and pessimistic:
/** * Overrides this method to set the expected FailureStrategy, {@link FailureStrategy#OPTIMISTIC} * is the default strategy. * * @return {@link FailureStrategy} */ default FailureStrategy getFailureStrategy() { return FailureStrategy.OPTIMISTIC; }
If you use Maven, you can refer to our sample project trigger-example.
You can find it here.
Here is the code from one of the sample projects:
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.iotdb.trigger; import org.apache.iotdb.db.storageengine.trigger.sink.alertmanager.AlertManagerConfiguration; import org.apache.iotdb.db.storageengine.trigger.sink.alertmanager.AlertManagerEvent; import org.apache.iotdb.db.storageengine.trigger.sink.alertmanager.AlertManagerHandler; import org.apache.iotdb.trigger.api.Trigger; import org.apache.iotdb.trigger.api.TriggerAttributes; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.write.record.Tablet; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.HashMap; import java.util.List; public class ClusterAlertingExample implements Trigger { private static final Logger LOGGER = LoggerFactory.getLogger(ClusterAlertingExample.class); private final AlertManagerHandler alertManagerHandler = new AlertManagerHandler(); private final AlertManagerConfiguration alertManagerConfiguration = new AlertManagerConfiguration("http://127.0.0.1:9093/api/v2/alerts"); private String alertname; private final HashMap<String, String> labels = new HashMap<>(); private final HashMap<String, String> annotations = new HashMap<>(); @Override public void onCreate(TriggerAttributes attributes) throws Exception { alertname = "alert_test"; labels.put("series", "root.ln.wf01.wt01.temperature"); labels.put("value", ""); labels.put("severity", ""); annotations.put("summary", "high temperature"); annotations.put("description", "{{.alertname}}: {{.series}} is {{.value}}"); alertManagerHandler.open(alertManagerConfiguration); } @Override public void onDrop() throws IOException { alertManagerHandler.close(); } @Override public boolean fire(Tablet tablet) throws Exception { List<MeasurementSchema> measurementSchemaList = tablet.getSchemas(); for (int i = 0, n = measurementSchemaList.size(); i < n; i++) { if (measurementSchemaList.get(i).getType().equals(TSDataType.DOUBLE)) { // for example, we only deal with the columns of Double type double[] values = (double[]) tablet.values[i]; for (double value : values) { if (value > 100.0) { LOGGER.info("trigger value > 100"); labels.put("value", String.valueOf(value)); labels.put("severity", "critical"); AlertManagerEvent alertManagerEvent = new AlertManagerEvent(alertname, labels, annotations); alertManagerHandler.onEvent(alertManagerEvent); } else if (value > 50.0) { LOGGER.info("trigger value > 50"); labels.put("value", String.valueOf(value)); labels.put("severity", "warning"); AlertManagerEvent alertManagerEvent = new AlertManagerEvent(alertname, labels, annotations); alertManagerHandler.onEvent(alertManagerEvent); } } } } return true; } }
You can create and drop a trigger through an SQL statement, and you can also query all registered triggers through an SQL statement.
We recommend that you stop insertion while creating triggers.
Triggers can be registered on arbitrary path patterns. The time series registered with the trigger will be listened to by the trigger. When there is data change on the series, the corresponding fire method in the trigger will be called.
Registering a trigger can be done as follows:
org.apache.iotdb.trigger.ClusterAlertingExample
validate
and onCreate
interfaces of the trigger will only be called once. For details, please refer to the chapter of How to implement a Trigger.The complete SQL syntax is as follows:
// Create Trigger createTrigger : CREATE triggerType TRIGGER triggerName=identifier triggerEventClause ON pathPattern AS className=STRING_LITERAL uriClause? triggerAttributeClause? ; triggerType : STATELESS | STATEFUL ; triggerEventClause : (BEFORE | AFTER) INSERT ; uriClause : USING URI uri ; uri : STRING_LITERAL ; triggerAttributeClause : WITH LR_BRACKET triggerAttribute (COMMA triggerAttribute)* RR_BRACKET ; triggerAttribute : key=attributeKey operator_eq value=attributeValue ;
Below is the explanation for the SQL syntax:
Here is an example SQL statement to help you understand:
CREATE STATELESS TRIGGER triggerTest BEFORE INSERT ON root.sg.** AS 'org.apache.iotdb.trigger.ClusterAlertingExample' USING URI '/jar/ClusterAlertingExample.jar' WITH ( "name" = "trigger", "limit" = "100" )
The above SQL statement creates a trigger named triggerTest:
org.apache.iotdb.trigger.ClusterAlertingExample
The trigger can be dropped by specifying the trigger ID. During the process of dropping the trigger, the onDrop
interface of the trigger will be called only once.
The SQL syntax is:
// Drop Trigger dropTrigger : DROP TRIGGER triggerName=identifier ;
Here is an example statement:
DROP TRIGGER triggerTest1
The above statement will drop the trigger with ID triggerTest1.
You can query information about triggers that exist in the cluster through an SQL statement.
The SQL syntax is as follows:
SHOW TRIGGERS
The result set format of this statement is as follows:
TriggerName | Event | Type | State | PathPattern | ClassName | NodeId |
---|---|---|---|---|---|---|
triggerTest1 | BEFORE_INSERT / AFTER_INSERT | STATELESS / STATEFUL | INACTIVE / ACTIVE / DROPPING / TRANSFFERING | root.** | org.apache.iotdb.trigger.TriggerExample | ALL(STATELESS) / DATA_NODE_ID(STATEFUL) |
During the process of creating and dropping triggers in the cluster, we maintain the states of the triggers. The following is a description of these states:
State | Description | Is it recommended to insert data? |
---|---|---|
INACTIVE | The intermediate state of executing CREATE TRIGGER , the cluster has just recorded the trigger information on the ConfigNode, and the trigger has not been activated on any DataNode. | NO |
ACTIVE | Status after successful execution of CREATE TRIGGE , the trigger is available on all DataNodes in the cluster. | YES |
DROPPING | Intermediate state of executing DROP TRIGGER , the cluster is in the process of dropping the trigger. | NO |
TRANSFERRING | The cluster is migrating the location of this trigger instance. | NO |
config_node_ratis_log_appender_buffer_size_max
, 2G), where config_node_ratis_log_appender_buffer_size_max
is a configuration item. For the specific meaning, please refer to the IOTDB configuration item description.org.apache.iotdb.trigger.example.AlertListener
class, when CREATE TRIGGER
uses this class, the system will randomly load the class in one of the JAR packages, which will eventually leads the inconsistent behavior of trigger and other issues.Parameter | Meaning |
---|---|
trigger_lib_dir | Directory to save the trigger jar package |
stateful_trigger_retry_num_when_not_found | How many times will we retry to found an instance of stateful trigger on DataNodes if not found |