blob: c75b9c79388598d4d37dac5d1eecf6bbc69b9372 [file] [log] [blame] [view]
# IoTDB
> IoTDB sink connector
## Description
Used to write data to IoTDB.
## Key features
- [x] [exactly-once](../../concept/connector-v2-features.md)
IoTDB supports the `exactly-once` feature through idempotent writing. If two pieces of data have
the same `key` and `timestamp`, the new data will overwrite the old one.
- [ ] [schema projection](../../concept/connector-v2-features.md)
:::tip
There is a conflict of thrift version between IoTDB and Spark.Therefore, you need to execute `rm -f $SPARK_HOME/jars/libthrift*` and `cp $IOTDB_HOME/lib/libthrift* $SPARK_HOME/jars/` to resolve it.
:::
## Options
| name | type | required | default value |
|-------------------------------|-------------------|----------|-----------------------------------|
| node_urls | list | yes | - |
| username | string | yes | - |
| password | string | yes | - |
| key_device | string | yes | - |
| key_timestamp | string | no | processing time |
| key_measurement_fields | array | no | exclude `device` & `timestamp` |
| storage_group | string | no | - |
| batch_size | int | no | 1024 |
| batch_interval_ms | int | no | - |
| max_retries | int | no | - |
| retry_backoff_multiplier_ms | int | no | - |
| max_retry_backoff_ms | int | no | - |
| default_thrift_buffer_size | int | no | - |
| max_thrift_frame_size | int | no | - |
| zone_id | string | no | - |
| enable_rpc_compression | boolean | no | - |
| connection_timeout_in_ms | int | no | - |
| common-options | | no | - |
### node_urls [list]
`IoTDB` cluster address, the format is `["host:port", ...]`
### username [string]
`IoTDB` user username
### password [string]
`IoTDB` user password
### key_device [string]
Specify field name of the `IoTDB` deviceId in SeaTunnelRow
### key_timestamp [string]
Specify field-name of the `IoTDB` timestamp in SeaTunnelRow. If not specified, use processing-time as timestamp
### key_measurement_fields [array]
Specify field-name of the `IoTDB` measurement list in SeaTunnelRow. If not specified, include all fields but exclude `device` & `timestamp`
### storage_group [string]
Specify device storage group(path prefix)
example: deviceId = ${storage_group} + "." + ${key_device}
### batch_size [int]
For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms`, the data will be flushed into the IoTDB
### batch_interval_ms [int]
For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms`, the data will be flushed into the IoTDB
### max_retries [int]
The number of retries to flush failed
### retry_backoff_multiplier_ms [int]
Using as a multiplier for generating the next delay for backoff
### max_retry_backoff_ms [int]
The amount of time to wait before attempting to retry a request to `IoTDB`
### default_thrift_buffer_size [int]
Thrift init buffer size in `IoTDB` client
### max_thrift_frame_size [int]
Thrift max frame size in `IoTDB` client
### zone_id [string]
java.time.ZoneId in `IoTDB` client
### enable_rpc_compression [boolean]
Enable rpc compression in `IoTDB` client
### connection_timeout_in_ms [int]
The maximum time (in ms) to wait when connect `IoTDB`
### common options
Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details
## Examples
### Case1
Common options:
```hocon
sink {
IoTDB {
node_urls = ["localhost:6667"]
username = "root"
password = "root"
batch_size = 1024
batch_interval_ms = 1000
}
}
```
When you assign `key_device` is `device_name`, for example:
```hocon
sink {
IoTDB {
...
key_device = "device_name"
}
}
```
Upstream SeaTunnelRow data format is the following:
| device_name | field_1 | field_2 |
|----------------------------|-------------|-------------|
| root.test_group.device_a | 1001 | 1002 |
| root.test_group.device_b | 2001 | 2002 |
| root.test_group.device_c | 3001 | 3002 |
Output to `IoTDB` data format is the following:
```shell
IoTDB> SELECT * FROM root.test_group.* align by device;
+------------------------+------------------------+-----------+----------+
| Time| Device| field_1| field_2|
+------------------------+------------------------+----------+-----------+
|2022-09-26T17:50:01.201Z|root.test_group.device_a| 1001| 1002|
|2022-09-26T17:50:01.202Z|root.test_group.device_b| 2001| 2002|
|2022-09-26T17:50:01.203Z|root.test_group.device_c| 3001| 3002|
+------------------------+------------------------+----------+-----------+
```
### Case2
When you assign `key_device``key_timestamp``key_measurement_fields`, for example:
```hocon
sink {
IoTDB {
...
key_device = "device_name"
key_timestamp = "ts"
key_measurement_fields = ["temperature", "moisture"]
}
}
```
Upstream SeaTunnelRow data format is the following:
|ts | device_name | field_1 | field_2 | temperature | moisture |
|--------------------|----------------------------|-------------|-------------|-------------|-------------|
|1664035200001 | root.test_group.device_a | 1001 | 1002 | 36.1 | 100 |
|1664035200001 | root.test_group.device_b | 2001 | 2002 | 36.2 | 101 |
|1664035200001 | root.test_group.device_c | 3001 | 3002 | 36.3 | 102 |
Output to `IoTDB` data format is the following:
```shell
IoTDB> SELECT * FROM root.test_group.* align by device;
+------------------------+------------------------+--------------+-----------+
| Time| Device| temperature| moisture|
+------------------------+------------------------+--------------+-----------+
|2022-09-25T00:00:00.001Z|root.test_group.device_a| 36.1| 100|
|2022-09-25T00:00:00.001Z|root.test_group.device_b| 36.2| 101|
|2022-09-25T00:00:00.001Z|root.test_group.device_c| 36.3| 102|
+------------------------+------------------------+--------------+-----------+
```
## Changelog
### 2.2.0-beta 2022-09-26
- Add IoTDB Sink Connector
### 2.3.0-beta 2022-10-20
- [Improve] Improve IoTDB Sink Connector ([2917](https://github.com/apache/incubator-seatunnel/pull/2917))
- Support align by sql syntax
- Support sql split ignore case
- Support restore split offset to at-least-once
- Support read timestamp from RowRecord
- [BugFix] Fix IoTDB connector sink NPE ([3080](https://github.com/apache/incubator-seatunnel/pull/3080))