TsFile 的 Hive 连接器实现了通过 Hive 读取外部 TsFile 类型的文件格式的支持,使用户能够通过 Hive 操作 TsFile。
连接器的主要功能:
Hive 连接器需要能够解析 TsFile 的文件格式,转化为 Hive 能够识别的按行返回的格式。也需要能够根据用户定义的 Table 的形式,格式化输出。所以,Hive 连接器的功能实现主要分成四个部分
上述的主要四个功能模块都有其对应的实现类,下面就分别介绍一下这四个实现类。
该类主要负责对输入的 TsFile 文件的格式化操作,它继承了FileInputFormat<NullWritable, MapWritable>类,一些通用的格式化操作在FileInputFormat中已经有实现,这个类覆写了它的getSplits(JobConf, int)方法,自定义了对于 TsFile 文件的分片方式;以及getRecordReader(InputSpli, JobConf, Reporter)方法,用于生成具体从一个分片中读取数据的 TSFHiveRecordReader。
该类主要负责从一个分片中读取 TsFile 的数据。
它实现了IReaderSet接口,这个接口里是一些设置类内部属性的方法,主要是为了抽出TSRecordReader和TSHiveRecordReader中重复的代码部分。
public interface IReaderSet {
void setReader(TsFileSequenceReader reader);
void setMeasurementIds(List<String> measurementIds);
void setReadDeviceId(boolean isReadDeviceId);
void setReadTime(boolean isReadTime);
}
下面先介绍一下这个类的一些重要字段
private List<QueryDataSet> dataSetList = new ArrayList<>();
这个分片所生成的所有的 QueryDataSet
private List<String> deviceIdList = new ArrayList<>();
设备名列表,这个顺序与 dataSetList 的顺序一致,即 deviceIdList[i] 是 dataSetList[i] 的设备名.
private int currentIndex = 0;
当前正在被处理的 QueryDataSet 的下标
这个类在构造函数里,调用了TSFRecordReader的initialize(TSFInputSplit, Configuration, IReaderSet, List<QueryDataSet>, List<String>)方法去初始化上面提到的一些类字段。它覆写了RecordReader的next()方法,用以返回从 TsFile 里读出的数据。
我们注意到它从 TsFile 读取出来数据之后,是以MapWritable的形式返回的,这里的MapWritable其实就是一个Map,只不过它的 key 与 value 都做了序列化与反序列化的特殊适配,它的读取流程如下
dataSetList当前位置的QueryDataSet还有没有值,如果没有值,则将currentIndex递增1,直到找到第一个有值的QueryDataSetQueryDataSet的next()方法获得RowRecordTSFRecordReader的getCurrentValue()方法,将RowRecord中的值放入MapWritable里这个类继承了AbstractSerDe,也是我们实现Hive从自定义输入格式中读取数据所必须的。
它覆写了AbstractSerDe的initialize()方法,在这个方法里,从用户的建表 sql 里,解析出相应的设备名,传感器名以及传感器对应的类型。还要构建出ObjectInspector对象,这个对象主要负责数据类型的转化,由于 TsFile 只支持原始数据类型,所以当出现其他数据类型时,需要抛出异常,具体的构建过程在createObjectInspectorWorker()方法中可以看到。
这个类的最主要职责就是序列化和反序列化不同文件格式的数据,由于我们的 Hive 连接器暂时只支持读取操作,并不支持 insert 操作,所以只有反序列化的过程,所以仅覆写了deserialize(Writable)方法,该方法里调用了TsFileDeserializer的deserialize()方法。
这个类就是将数据反序列化为 Hive 的输出格式,仅有一个deserialize()方法。
public Object deserialize(List<String>, List<TypeInfo>, Writable, String)
这个方法的Writable参数就是TSFHiveRecordReader的next()生成的MapWritable。
首先判断Writable参数是不是MapWritable类型,如果不是,则抛出异常。
接着依次从MapWritable中取出该设备的传感器的值,如果遇到类型不匹配则抛异常,最后返回生成的结果集。