TsFile 的 Hive 连接器

TsFile 的 Hive 连接器实现了通过 Hive 读取外部 TsFile 类型的文件格式的支持,使用户能够通过 Hive 操作 TsFile。

连接器的主要功能:

  • 将单个 TsFile 文件加载进 Hive,不论文件是存储在本地文件系统或者是 HDFS 中
  • 将某个特定目录下的所有文件加载进 Hive,不论文件是存储在本地文件系统或者是 HDFS 中
  • 使用 HQL 查询 TsFile
  • 到现在为止, 写操作在 hive-connector 中还不支持. 所以, HQL 中的 insert 操作是不被允许的

设计原理

Hive 连接器需要能够解析 TsFile 的文件格式,转化为 Hive 能够识别的按行返回的格式。也需要能够根据用户定义的 Table 的形式,格式化输出。所以,Hive 连接器的功能实现主要分成四个部分

  • 将整个 TsFile 文件分片
  • 从分片中读取数据,转化为 Hive 能够识别的数据类型
  • 解析用户自定义的 Table
  • 将数据反序列化为 Hive 的输出格式

具体实现类

上述的主要四个功能模块都有其对应的实现类,下面就分别介绍一下这四个实现类。

org.apache.iotdb.hive.TSFHiveInputFormat

该类主要负责对输入的 TsFile 文件的格式化操作,它继承了FileInputFormat<NullWritable, MapWritable>类,一些通用的格式化操作在FileInputFormat中已经有实现,这个类覆写了它的getSplits(JobConf, int)方法,自定义了对于 TsFile 文件的分片方式;以及getRecordReader(InputSpli, JobConf, Reporter)方法,用于生成具体从一个分片中读取数据的 TSFHiveRecordReader

org.apache.iotdb.hive.TSFHiveRecordReader

该类主要负责从一个分片中读取 TsFile 的数据。

它实现了IReaderSet接口,这个接口里是一些设置类内部属性的方法,主要是为了抽出TSRecordReaderTSHiveRecordReader中重复的代码部分。

public interface IReaderSet {

  void setReader(TsFileSequenceReader reader);

  void setMeasurementIds(List<String> measurementIds);

  void setReadDeviceId(boolean isReadDeviceId);

  void setReadTime(boolean isReadTime);
}

下面先介绍一下这个类的一些重要字段

  • private List<QueryDataSet> dataSetList = new ArrayList<>();
    

    这个分片所生成的所有的 QueryDataSet

  • private List<String> deviceIdList = new ArrayList<>();
    

    设备名列表,这个顺序与 dataSetList 的顺序一致,即 deviceIdList[i] 是 dataSetList[i] 的设备名.

  • private int currentIndex = 0;

    当前正在被处理的 QueryDataSet 的下标

这个类在构造函数里,调用了TSFRecordReaderinitialize(TSFInputSplit, Configuration, IReaderSet, List<QueryDataSet>, List<String>)方法去初始化上面提到的一些类字段。它覆写了RecordReadernext()方法,用以返回从 TsFile 里读出的数据。

next(NullWritable, MapWritable)

我们注意到它从 TsFile 读取出来数据之后,是以MapWritable的形式返回的,这里的MapWritable其实就是一个Map,只不过它的 key 与 value 都做了序列化与反序列化的特殊适配,它的读取流程如下

  1. 首先判断dataSetList当前位置的QueryDataSet还有没有值,如果没有值,则将currentIndex递增1,直到找到第一个有值的QueryDataSet
  2. 然后调用QueryDataSetnext()方法获得RowRecord
  3. 最后调用TSFRecordReadergetCurrentValue()方法,将RowRecord中的值放入MapWritable

org.apache.iotdb.hive.TsFileSerDe

这个类继承了AbstractSerDe,也是我们实现Hive从自定义输入格式中读取数据所必须的。

它覆写了AbstractSerDeinitialize()方法,在这个方法里,从用户的建表 sql 里,解析出相应的设备名,传感器名以及传感器对应的类型。还要构建出ObjectInspector对象,这个对象主要负责数据类型的转化,由于 TsFile 只支持原始数据类型,所以当出现其他数据类型时,需要抛出异常,具体的构建过程在createObjectInspectorWorker()方法中可以看到。

这个类的最主要职责就是序列化和反序列化不同文件格式的数据,由于我们的 Hive 连接器暂时只支持读取操作,并不支持 insert 操作,所以只有反序列化的过程,所以仅覆写了deserialize(Writable)方法,该方法里调用了TsFileDeserializerdeserialize()方法。

org.apache.iotdb.hive.TsFileDeserializer

这个类就是将数据反序列化为 Hive 的输出格式,仅有一个deserialize()方法。

public Object deserialize(List<String>, List<TypeInfo>, Writable, String)

这个方法的Writable参数就是TSFHiveRecordReadernext()生成的MapWritable

首先判断Writable参数是不是MapWritable类型,如果不是,则抛出异常。

接着依次从MapWritable中取出该设备的传感器的值,如果遇到类型不匹配则抛异常,最后返回生成的结果集。