import ChangeLog from ‘../changelog/connector-iotdb.md’;

IoTDB

IoTDB 数据读取器

支持引擎

Spark
Flink
SeaTunnel Zeta

描述

用于从 IoTDB 中读取数据。

主要特性

支持的数据源信息

数据源支持的版本地址
IoTDB0.13.0 <= version <= 1.3.Xlocalhost:6667

数据类型映射

IoTDB 数据类型SeaTunnel 数据类型
BOOLEANBOOLEAN
INT32TINYINT
INT32SMALLINT
INT32INT
INT64BIGINT
FLOATFLOAT
DOUBLEDOUBLE
TEXTSTRING
STRINGSTRING
TIMESTAMPBIGINT
TIMESTAMPTIMESTAMP
BLOBSTRING
DATEDATE

Source 选项

名称类型是否必填默认值描述
node_urlsstring-IoTDB 集群地址,格式为 "host1:port""host1:port,host2:port"
usernamestring-IoTDB 用户名
passwordstring-IoTDB 用户密码
sqlstring-要执行的 SQL 查询语句
schemaconfig-数据模式定义
fetch_sizeint-单次获取数据量:查询时每次从 IoTDB 获取的数据量
lower_boundlong-时间范围下界(通过时间列进行数据分片时使用)
upper_boundlong-时间范围上界(通过时间列进行数据分片时使用)
num_partitionsint-分区数量(通过时间列进行数据分片时使用):
- 1 个分区:使用完整时间范围
- 若分区数 < (上界 -下界),则使用差值作为实际分区数
thrift_default_buffer_sizeint-Thrift 协议缓冲区大小
thrift_max_frame_sizeint-Thrift 最大帧尺寸
enable_cache_leaderboolean-是否启用 Leader 节点缓存
versionstring-客户端 SQL 语义版本(V_0_12 / V_0_13
common-options-Source 插件常用参数,详见 [Source common Options](../Source common Options.md)

我们可以使用时间列进行分区查询。

num_partitions [int]

分区数量

upper_bound [long]

时间范围上界

lower_bound [long]

时间范围下界

     将时间范围分割成 numPartitions 个分区
     
     若 numPartitions = 1,使用完整的时间范围
     若 numPartitions < (upper_bound - lower_bound),使用 (upper_bound - lower_bound) 个分区
     
     例:lower_bound = 1, upper_bound = 10, numPartitions = 2
         sql = "select * from test where age > 0 and age < 10"
     
     分区结果:
     split 1: select * from test  where (time >= 1 and time < 6)  and (  age > 0 and age < 10 )
     split 2: select * from test  where (time >= 6 and time < 11) and (  age > 0 and age < 10 )

示例

env {
  parallelism = 2
  job.mode = "BATCH"
}

source {
  IoTDB {
    node_urls = "localhost:6667"
    username = "root"
    password = "root"
    sql = "SELECT temperature, moisture, c_int, c_bigint, c_float, c_double, c_string, c_boolean FROM root.test_group.* WHERE time < 4102329600000 align by device"
    schema {
      fields {
        ts = timestamp
        device_name = string
        temperature = float
        moisture = bigint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_string = string
        c_boolean = boolean
      }
    }
  }
}

sink {
  Console {
  }
}

上游 IoTDB 的数据格式如下所示:

IoTDB> SELECT temperature, moisture, c_int, c_bigint, c_float, c_double, c_string, c_boolean FROM root.test_group.* WHERE time < 4102329600000 align by device;
+------------------------+------------------------+--------------+-----------+--------+--------------+----------+---------+---------+----------+
|                    Time|                  Device|   temperature|   moisture|   c_int|      c_bigint|   c_float| c_double| c_string| c_boolean|
+------------------------+------------------------+--------------+-----------+--------+--------------+----------+---------+---------+----------+
|2022-09-25T00:00:00.001Z|root.test_group.device_a|          36.1|        100|       1|   21474836470|      1.0f|     1.0d|      abc|      true|
|2022-09-25T00:00:00.001Z|root.test_group.device_b|          36.2|        101|       2|   21474836470|      2.0f|     2.0d|      abc|      true|
|2022-09-25T00:00:00.001Z|root.test_group.device_c|          36.3|        102|       3|   21474836470|      3.0f|     3.0d|      abc|      true|
+------------------------+------------------------+--------------+-----------+--------+--------------+----------+---------+---------+----------+

读取到 SeaTunnelRow 的数据格式如下所示:

tsdevice_nametemperaturemoisturec_intc_bigintc_floatc_doublec_stringc_boolean
1664035200001root.test_group.device_a36.11001214748364701.0f1.0dabctrue
1664035200001root.test_group.device_b36.21012214748364702.0f2.0dabctrue
1664035200001root.test_group.device_c36.31023214748364703.0f3.0dabctrue

变更日志