IoTDB 的元数据统一由 MManger 管理,包括以下几个部分:
维护 tag 倒排索引:Map<String, Map<String, Set<LeafMNode>>> tagIndex
tag key -> tag value -> timeseries LeafMNode
该类初始化时,会replay mlog中的内容,将元数据信息还原出来,元数据操作日志主要有六种类型,每个操作前都会先获得整个元数据的写锁(存储在MManager中),操作完后释放:
创建时间序列
删除时间序列
设置存储组
删除存储组
设置TTL
改变时间序列标签信息offset
改变时间序列的别名
除了这七种需要记录日志的操作外,还有六种对时间序列标签/属性信息进行更新的操作,同样的,每个操作前都会先获得整个元数据的写锁(存储在MManager中),操作完后释放:
重命名标签或属性
重新设置标签或属性的值
删除已经存在的标签或属性
添加新的标签
添加新的属性
更新插入标签和属性
树中包括三种节点:StorageGroupMNode、InternalMNode(非叶子节点)、LeafMNode(叶子节点),他们都是MNode的子类。
每个InternalMNode中都有一个读写锁,查询元数据信息时,需要获得路径上每一个InternalMNode的读锁,修改元数据信息时,如果修改的是LeafMNode,需要获得其父节点的写锁,若修改的是InternalMNode,则只需获得本身的写锁。若该InternalMNode位于Device层,则还包含了一个Map<String, MNode> aliasChildren,用于存储别名信息。
StorageGroupMNode 继承 InternalMNode,包含存储组的元数据信息,如 TTL。
LeafMNode 中包含了对应时间序列的Schema信息,其别名(若没有别名,则为null)以及该时间序列的标签/属性信息在tlog文件中的offset(若没有标签/属性,则为-1)
示例:
IoTDB 的元数据管理采用目录树的形式,倒数第二层为设备层,最后一层为传感器层。
默认存在根节点 root,创建存储组、删除存储组、创建时间序列、删除时间序列均为对树的节点的操作。
创建存储组(root.a.b.sg)
创建时间序列(root.a.b.sg.d.s1)
删除存储组和删除时间序列的操作相似,即将存储组或时间序列节点在其父节点中删除,时间序列节点还需要将其别名在父节点中删除;若在删除过程中,发现某一节点没有任何子节点了,还需要递归删除此节点。
为了加快 IoTDB 重启速度,我们为 MTree 设置了检查点,这样避免了在重启时按行读取并复现 mlog.bin 中的信息。创建 MTree 的快照有两种方式:
mlog.bin 文件超过1小时没有修改mlog.bin 中积累了100000行日志(可配置)create snapshot for schema命令手动触发创建 MTree 快照方法见MManager.createMTreeSnapshot():
mtree.snapshot.tmp)。MTree 的序列化采用“先子节点、后父节点”的深度优先序列化方式,将节点的信息按照类型转化成对应格式的字符串,便于反序列化时读取和组装MTree。mtree.snapshot),防止在序列化过程中出现服务器人为或意外关闭,导致序列化失败的情况。MLogWriter.clear()方法,清空 mlog.bin:mlog.bin文件;logNumber 置为0,logNumber 记录mlog.bin的日志行数,用于在后台检查时判断其是否超过用户配置的阈值而触发自动创建快照。方法见MManager.initFromLog():
mtree.snapshot.tmp是否存在,如果存在证明在创建快照的序列化过程中出现服务器人为或意外关闭,导致序列化失败,删除临时文件;mtree.snapshot是否存在。如果不存在,则使用新的 MTree;否则启动反序列化过程,得到 MTreemlog.bin中的内容,逐行读取并操作,完成 MTree 的恢复。读取过程中更新 logNumber,并返回,用于后面mlog.bin行数的记录。所有元数据的操作均会记录到元数据日志文件中,此文件默认为 data/system/schema/mlog.bin。
系统重启时会重做 mlog 中的日志,重做之前需要标记不需要记录日志。当重启结束后,标记需要记录日志。
元数据日志的类型由 MetadataOperationType 类记录。mlog 直接存储字符串编码。
示例 sql 及对应的 mlog 记录:
set storage group to root.turbine
mlog: 2,root.turbine
格式: 2,path
delete storage group root.turbine
mlog: 1,root.turbine
格式: 1,path
create timeseries root.turbine.d1.s1(temprature) with datatype=FLOAT, encoding=RLE, compression=SNAPPY tags(tag1=v1, tag2=v2) attributes(attr1=v1, attr2=v2)
mlog: 0,root.turbine.d1.s1,3,2,1,,温度,offset
格式: 0,path,TSDataType,TSEncoding,CompressionType,[properties],[alias],[tag-attribute offset]
delete timeseries root.turbine.d1.s1
mlog: 1,root.turbine.d1.s1
格式: 1,path
set ttl to root.turbine 10
mlog: 10,root.turbine,10
格式: 10,path,ttl
alter timeseries root.turbine.d1.s1 add tags(tag1=v1)
只有当root.turbine.d1.s1原来不存在任何标签/属性信息时,该sql才会产生日志
mlog: 12,root.turbine.d1.s1,10
格式: 10,path,[change offset]
alter timeseries root.turbine.d1.s1 UPSERT ALIAS=newAlias
mlog: 13,root.turbine.d1.s1,newAlias
格式: 13,path,[new alias]
所有时间序列的标签/属性信息都会保存在标签文件中,此文件默认为 data/system/schema/mlog.bin。
每条时间序列的 tags 和 attributes 持久化总字节数为 L,在 iotdb-engine.properties 中配置。
持久化内容:Map<String,String> tags, Map<String,String> attributes,如果内容不足 L,则需补空。
create timeseries root.turbine.d1.s1(temprature) with datatype=FLOAT, encoding=RLE, compression=SNAPPY tags(tag1=v1, tag2=v2) attributes (attr1=v1, attr2=v2)
在tlog.txt中的内容:
tagsSize (tag1=v1, tag2=v2) attributesSize (attr1=v1, attr2=v2)
主要查询逻辑封装在MManager的showTimeseries(ShowTimeSeriesPlan plan)方法中
首先判断需不需要根据热度排序,如果需要,则调用MTree的getAllMeasurementSchemaByHeatOrder方法,否则调用getAllMeasurementSchema方法
这里的热度是用每个时间序列的lastTimeStamp来表征的,所以需要先取出所有满足条件的序列,然后根据lastTimeStamp进行排序,然后再做offset和limit的截断
这里需要在findPath的时候就将limit(如果没有limit,则将请求的fetchSize当成limit)和offset参数传递下去,减少内存占用。
这个方法封装了在MTree中遍历得到满足条件的时间序列的逻辑,是个递归方法,由根节点往下递归寻找,直到当前时间序列数量达到limit或者已经遍历完整个MTree。
这里的过滤条件只能是tag属性,否则抛异常。
通过在MManager中维护的tag的倒排索引,获得所有满足索引条件的MeasurementMNode。
若需要根据热度排序,则根据lastTimeStamp进行排序,否则根据序列名的字母序排序,然后再做offset和limit的截断。
如果元数据量过多,一次show timeseries的结果可能导致OOM,所以增加fetch size参数,客户端跟服务器端交互时,服务器端一次最多只会取fetch size个时间序列。
多次交互的状态信息就存在ShowTimeseriesDataSet中。ShowTimeseriesDataSet中保存了此次的ShowTimeSeriesPlan,当前的游标index以及缓存的结果行列表List<RowRecord> result。
index是否等于缓存的结果行List<RowRecord> result的sizeshowTimeseries方法取结果,放入缓存hasLimit为false,则将index重新置为0index < result.size(),返回trueindex > result.size(),返回false