Spark IOTDB 连接器

设计目的

  • 使用Spark SQL读取IOTDB的数据,以Spark DataFrame的形式返回给客户端

核心思想

由于IOTDB具有解析和执行SQL的能力,故该部分可以直接将SQL转发给IOTDB进程执行,将数据拿到后转换为RDD即可

执行流程

1. 入口

  • src/main/scala/org/apache/iotdb/spark/db/DefaultSource.scala

2. 构建Relation

Relation主要保存了RDD的元信息,比如列名字,分区策略等,调用Relation的buildScan方法可以创建RDD

  • src/main/scala/org/apache/iotdb/spark/db/IoTDBRelation.scala

3. 构建RDD

RDD中执行对IOTDB的SQL请求,保存游标

  • src/main/scala/org/apache/iotdb/spark/db/IoTDBRDD.scala中的compute方法

4. 迭代RDD

由于Spark懒加载机制,用户遍历RDD时才具体调用RDD的迭代,也就是IOTDB的fetch Result

  • src/main/scala/org/apache/iotdb/spark/db/IoTDBRDD.scala中的getNext方法

宽窄表结构转换

宽表结构:IOTDB原生路径格式

timeroot.ln.wf02.wt02.temperatureroot.ln.wf02.wt02.statusroot.ln.wf02.wt02.hardwareroot.ln.wf01.wt01.temperatureroot.ln.wf01.wt01.statusroot.ln.wf01.wt01.hardware
1nulltruenull2.2truenull
2nullfalseaaa2.2nullnull
3nullnullnull2.1truenull
4nulltruebbbnullnullnull
5nullnullnullnullfalsenull
6nullnullcccnullnullnull

窄表结构: 关系型数据库模式,IOTDB align by device格式

timedevice_namestatushardwaretemperature
1root.ln.wf02.wt01truenull2.2
1root.ln.wf02.wt02truenullnull
2root.ln.wf02.wt01nullnull2.2
2root.ln.wf02.wt02falseaaanull
3root.ln.wf02.wt01truenull2.1
4root.ln.wf02.wt02truebbbnull
5root.ln.wf02.wt01falsenullnull
6root.ln.wf02.wt02nullcccnull

由于IOTDB查询到的数据默认为宽表结构,所以需要宽窄表转换,有如下两个实现方法

1. 使用IOTDB的group by device语句

这种方式可以直接拿到窄表结构,计算由IOTDB完成

2. 使用Transformer

可以使用Transformer进行宽窄表之间的转换,计算由Spark完成

  • src/main/scala/org/apache/iotdb/spark/db/Transformer.scala

宽表转窄表使用了遍历device列表,生成对应的窄表,在union起来的策略,并行性较好(无shuffle)

窄表转宽表使用了基于timestamp的join操作,有shuffle,可能存在潜在的性能问题