TsFile 写入过程中存在两类内存,性质截然不同:
| 内存类型 | 来源 | 是否压缩 | 何时释放 | 能否控制 |
|---|---|---|---|---|
| M_data | ChunkWriter 数据页 | 是(写盘时压缩) | flush 后立即释放 | 控制批次大小 F |
| M_meta | ChunkGroup/Chunk 元数据 | 否 | 只有 close 才释放 | 控制每文件 flush 次数 K |
M_meta 是核心约束来源:只要文件不 close,每次 flush 都会向内存中追加一批不可压缩的元数据(chunk header、chunk group header、timeseries index 节点)。设备数越多、flush 次数越多,M_meta 积压越严重。
因此,文件轮转(close + 重新打开)的根本原因不是磁盘空间,而是为了释放 M_meta。
TsFileWriter / TsFileTableWriter 将所有决策完全交给用户:
用户在资源受限环境(嵌入式设备、容器限额、边缘节点)中无法做出这些决策。
实验(见同目录 TsFile 内存控制.pdf)确认写入过程内存由三部分构成:
M_total = M_init + M_data + M_meta
| 组成 | 特征 | 公式 |
|---|---|---|
| M_init | 固定 | ≈ 900 KB |
| M_data | 周期性,flush 后释放 | s × F |
| M_meta | 单调递增,不可压缩,close 才释放 | K × D × b |
其中:
F:每次 flush 写入的行数(批次大小)K:当前文件已 flush 的次数D:当前文件已出现的设备数(唯一 TAG 组合数)s = s_tablet + s_data:每行在内存中的综合字节数b:每设备每次 flush 新增的元数据字节数(不可压缩)对于 Table 模式:
s_tablet = 8 + Σ sizeof(col_type) // 所有列(TAG + FIELD) s_data = Σ (8 + sizeof(field_col_type)) // 仅 FIELD 列进 ChunkWriter s = s_tablet + s_data b = n_field_cols × 104 + 96 // chunk_meta + chunk_group_meta(字节)
总量已知时(EOQ 模型最优解):
F_opt = √(R × D × b / s) K_opt = R / F_opt M_min = M_init + 2 × √(R × s × D × b)
当 M_min ≤ M_limit 时,所有数据可写入单个文件。
总量未知时(流式,预算对半分,F×K 最大):
M_avail = M_limit - M_init F = M_avail / (2 × s) K = M_avail / (2 × D × b)
配置:50 设备,50 FIELD 列 int32,总行数 100,000
| max_rows | 实测内存 | 公式估算 | 误差 |
|---|---|---|---|
| 10,000 | 11.7 MB | 11.05 MB | 5.5% |
| 8,000 | 10.8 MB | 10.3 MB | 4.6% |
| 6,000 | 10.6 MB | 9.63 MB | 9.1% |
| 5,000 | 11.2 MB | 9.75 MB | 12.9% |
公式误差 5–15%,主要来源:变长字符串开销、schema 结构节点、OS 内存分配粒度。
write_table(),flush / close / 文件轮转全部自动处理。ModStat,所有估算仅使用轻量整数计算。TsFileWriter 内部的 ChunkWriter 本身就是 M_data 的缓冲区,每次 write_table() 都将数据写入 ChunkWriter,M_data 持续积累,直到显式调用 flush() 时才落盘并清零。
M_data 控制:在内部追踪累计写入行数,达到 F 时 flush()
→ 用户提交任意大小 Tablet,系统计数,F 行后自动刷盘
M_meta 控制:公式估算 flush_count × D × b,达到阈值时 rotate_file()
→ 文件轮转是释放不可压缩元数据的唯一手段
每次 write_table(tablet):
ChunkWriter 积累数据,M_data 增加
accumulated_rows += tablet.rows
if accumulated_rows >= F: // 达到数据预算上限
flush() // M_data 清零,M_meta 增加 D×b
flush_count++
accumulated_rows = 0
if flush_count×D×b >= rotate_thresh:
rotate_file() // M_meta 清零
两者结合,M_total 始终受控:
写入中(未达 F):M_total = M_init + s×accumulated + flush_count×D×b flush 后: M_total = M_init + flush_count×D×b rotate 后: M_total = M_init
close() 调用时,M_data 已为 0(最后一次 flush 已完成)。峰值为:
M_at_close = M_init + M_meta_final
由于我们在 flush_count × D × b >= meta_budget × SAFETY_FACTOR 时触发 rotate:
M_meta_final ≤ meta_budget × SAFETY_FACTOR ≤ meta_budget ≤ M_avail → M_at_close ≤ M_init + M_avail = M_limit ✓
关闭时不会超出内存上限,SAFETY_FACTOR=0.85 同时覆盖了序列化开销和公式误差。
TsFileWriter 内部有自适应区间 M_data 检查机制,通过 chunk_group_size_threshold_ 控制:
if (record_count_since_last_flush_ >= record_count_for_next_mem_check_) { mem_size = calculate_mem_size_for_all_group(); record_count_for_next_mem_check_ = record_count_since_last_flush_ * chunk_group_size_threshold_ / mem_size; if (mem_size > chunk_group_size_threshold_) flush(); }
ConstrainedTsFileWriter 在 accumulated_rows_ >= F 时显式调用 flush(),M_data 控制已由上层保证。该内置机制作为双重保险保留(设置 chunk_group_size_threshold_ = data_budget_),防止单个超大 Tablet 导致 M_data 短暂超限。
chunk_group_size_threshold_是进程全局配置,多个实例并发时会互相覆盖,当前设计仅支持单写入器场景。
ModStat 通过 ATOMIC_FAA 追踪各模块内存,排除原因:
ENABLE_MEM_STAT 编译选项下开启,不适合生产路径struct WriteConstraints { int64_t memory_limit_bytes; // 必填:内存上限 // 可选:数据规模提示(有则使用 EOQ 最优策略) int n_devices = 0; // 0 = 未知,运行时动态追踪 int64_t total_rows_hint = -1; // 每设备总行数,-1 = 未知 };
构造时执行,失败则 init_status() 返回非 E_OK:
// 最小可行内存:F=1 行,K=1 次,D=1 设备
M_min = M_INIT_BYTES + s + b
if M_min > memory_limit_bytes:
ERROR "内存不足:schema 最小写入需 {M_min} 字节,当前限制 {memory_limit_bytes}"
M_avail = memory_limit_bytes - M_INIT_BYTES
D_eff = max(1, n_devices) // n_devices=0 时用 1 作为初始保守估计
// 尝试 EOQ 最优(总量已知时)
if total_rows_hint > 0 && n_devices > 0:
B = D_eff × b
F = √(total_rows_hint × B / s)
K = total_rows_hint / F
peak = M_INIT_BYTES + s×F + K×B
if peak ≤ memory_limit_bytes:
return {rows_per_flush=F, flushes_per_file=K, estimated_peak_mem=peak}
// 超出内存,退化为对半分
// 对半分预算(流式最优,F×K 最大)
F = M_avail / (2 × s)
K = M_avail / (2 × D_eff × b)
K = max(1LL, K)
return {rows_per_flush=F, flushes_per_file=K, estimated_peak_mem=M_INIT_BYTES+s×F+K×D_eff×b}
data_budget = s × F
meta_budget = M_avail - data_budget
rotate_thresh = meta_budget × SAFETY_FACTOR // SAFETY_FACTOR = 0.85
// 每次 flush 后判断
if flush_count × D_eff × b >= rotate_thresh:
rotate_file()
sizeof_datatype() 映射:
| TSDataType | 字节数 |
|---|---|
| BOOLEAN | 1 |
| INT32 / FLOAT | 4 |
| INT64 / DOUBLE | 8 |
| STRING | 32(保守估计) |
struct WriteConstraints { int64_t memory_limit_bytes; int n_devices = 0; int64_t total_rows_hint = -1; }; struct WritingStrategy { int64_t rows_per_flush; // F:每次 flush 的行数 int64_t flushes_per_file; // K:每文件最多 flush 次数 int64_t estimated_peak_mem; // 预估峰值内存(bytes) }; struct WriteStats { int64_t rows_written_total; // 累计写入行数(跨所有文件) int64_t files_completed; // 已完成(已 close)的文件数 int64_t current_file_flush_count; // 当前文件已 flush 次数 int64_t estimated_current_mem_bytes; // 当前估算 M_meta(M_data 已 flush) };
class ConstrainedTsFileWriter { public: // 构造:立即执行可行性校验 ConstrainedTsFileWriter( const WriteConstraints& constraints, const std::string& output_dir, const std::string& file_prefix, const std::shared_ptr<TableSchema>& schema ); ~ConstrainedTsFileWriter(); int init_status() const; // E_OK 表示约束可满足,否则不可写入 // 写入任意大小 Tablet,内部自动拆分、flush、轮转 int write_table(Tablet& tablet); int close(); // 观测接口 const WriteStats& stats() const; const WritingStrategy& strategy() const; const std::vector<std::string>& created_files() const; // 纯计算接口:不创建文件,仅返回策略 static WritingStrategy compute_strategy( const WriteConstraints& constraints, const std::shared_ptr<TableSchema>& schema ); // 可行性检查:返回 E_OK 或错误码,可选输出错误描述 static int check_feasibility( const WriteConstraints& constraints, const std::shared_ptr<TableSchema>& schema, std::string* error_msg = nullptr ); };
{output_dir}/{file_prefix}_{index:04d}.tsfile
例:/data/output/sensor_0000.tsfile
/data/output/sensor_0001.tsfile
基本用法
WriteConstraints c; c.memory_limit_bytes = 10 * 1024 * 1024; // 10 MB c.n_devices = 50; ConstrainedTsFileWriter writer(c, "/data/output", "sensor", schema); if (writer.init_status() != E_OK) { return writer.init_status(); // 内存不足以写入此 schema } // 用户可提交任意大小的 Tablet,系统自动拆分 while (has_more_data) { Tablet tablet(...); fill(tablet); writer.write_table(tablet); // 拆分 + flush + rotate 全自动 } writer.close();
仅查询策略
auto s = ConstrainedTsFileWriter::compute_strategy(c, schema); // s.rows_per_flush = 每批次建议行数 // s.flushes_per_file = 每文件建议 flush 次数 // s.estimated_peak_mem
运行时观测
const auto& st = writer.stats(); printf("已写 %lld 行,当前文件 flush %lld 次,估算内存 %lld KB\n", st.rows_written_total, st.current_file_flush_count, st.estimated_current_mem_bytes / 1024);
src/writer/ ├── tsfile_writer.h/.cc (已有) ├── tsfile_table_writer.h/.cc (已有) ├── constrained_tsfile_writer.h (新增) └── constrained_tsfile_writer.cc (新增)
内部直接持有 TsFileWriter*(而非 TsFileTableWriter),以便在 rotate 时精确控制 open() 和 register_table()。
// 固定参数(构造时确定) WriteConstraints constraints_; WritingStrategy strategy_; int64_t s_; // 每行综合字节数 int64_t b_per_device_; // 每设备每 flush 的不可压缩元数据增量 int64_t meta_budget_; // 元数据内存预算 int64_t rotate_thresh_; // = meta_budget_ × SAFETY_FACTOR int init_status_; // 文件管理 TsFileWriter* writer_; std::shared_ptr<TableSchema> table_schema_; int file_index_; std::vector<std::string> created_files_; std::string output_dir_; std::string file_prefix_; // 运行时状态 WriteStats stats_; int64_t accumulated_rows_; // 上次 flush 后累计写入的行数 // 动态设备追踪(n_devices == 0 时) bool dynamic_mode_; int64_t observed_max_devices_; // 本文件内见到的最多设备数(rotate 后重置)
ConstrainedTsFileWriter(constraints, output_dir, prefix, schema):
1. 从 schema 计算 s_, b_per_device_
2. check_feasibility() → init_status_
if init_status_ != E_OK: return(不创建文件)
3. compute_strategy() → strategy_
4. data_budget_ = s_ × strategy_.rows_per_flush
meta_budget_ = M_avail - data_budget_
rotate_thresh_ = meta_budget_ × SAFETY_FACTOR
5. g_config_value_.chunk_group_size_threshold_ = data_budget_
6. open_next_file()
用户提交任意大小的 Tablet,系统内部累计行数,达到 F 时自动 flush。
write_table(tablet):
if init_status_ != E_OK: return init_status_
writer_->write_table(tablet) // 数据写入 ChunkWriter,M_data 增加
accumulated_rows_ += tablet.cur_row_size_
stats_.rows_written_total += tablet.cur_row_size_
// 动态设备追踪(每个 Tablet 都更新,取本文件内最大值)
if dynamic_mode_:
d = tablet.find_all_device_boundaries().size() - 1
if d > observed_max_devices_:
observed_max_devices_ = d
recompute_rotate_thresh() // D 增大 → thresh 减小
if accumulated_rows_ >= strategy_.rows_per_flush:
do_flush()
do_flush():
writer_->flush() // M_data 落盘,M_meta 增加 D×b
accumulated_rows_ = 0
stats_.current_file_flush_count++
D_eff = dynamic_mode_ ? observed_max_devices_ : constraints_.n_devices
meta_est = stats_.current_file_flush_count × D_eff × b_per_device_
stats_.estimated_current_mem_bytes = M_INIT_BYTES + meta_est
if meta_est >= rotate_thresh_:
rotate_file()
rotate_file():
1. writer_->close() // 写盘,释放 M_meta
2. created_files_.push_back(current_path)
3. stats_.files_completed++
4. delete writer_; writer_ = nullptr
5. file_index_++
6. g_config_value_.chunk_group_size_threshold_ = s_ × strategy_.rows_per_flush
7. writer_ = new TsFileWriter()
8. writer_->open(make_file_path(file_index_))
9. writer_->register_table(table_schema_)
10. stats_.current_file_flush_count = 0
11. if dynamic_mode_:
observed_max_devices_ = 0
recompute_rotate_thresh()
recompute_rotate_thresh():
D = max(1LL, observed_max_devices_)
// rotate_thresh 以"flush 次数"表示
rotate_thresh_ = (int64_t)(meta_budget_ / (D × b_per_device_) × SAFETY_FACTOR)
rotate_thresh_ = max(1LL, rotate_thresh_)
observed_max_devices_ 在单个文件内单调递增,rotate 后重置为 0。设备数越多,阈值越小,天然保守。
配置:50 设备,50 FIELD 列 int32,memory_limit=12MB
s_tablet = 8 + 100×4 = 408 bytes/row (假设 50 TAG + 50 FIELD 各 4 字节) s_data = 50×(8+4) = 600 bytes/row s = 1008 bytes/row b = 50×104 + 96 = 5296 bytes/设备/flush (不可压缩元数据) M_avail ≈ 11.5 MB 对半分策略: F = 11.5MB / (2×1008) ≈ 5,694 行 K = 11.5MB / (2×50×5296) ≈ 21 次 data_budget = 5694 × 1008 ≈ 5.74 MB meta_budget ≈ 5.75 MB rotate_thresh = 5.75MB × 0.85 / (50×5296) ≈ 18.4 → 第 18 次 flush 后触发
| 事件 | M_meta 估算 | 操作 |
|---|---|---|
| flush 1 | 0.26 MB | 继续 |
| flush 10 | 2.65 MB | 继续 |
| flush 18 | 4.77 MB ≥ thresh | rotate:close 释放 4.77 MB 元数据,重开新文件 |
| flush 1(新文件) | 0.26 MB | 继续 |
文件轮转的本质:每 18 次 flush 必须 close 一次,强制释放积压的不可压缩元数据。
超大 Tablet 的短暂 M_data 超限:若单个 Tablet 的行数超过 F,写入时 M_data 会短暂超出 data_budget,随后的 do_flush() 会立即释放。chunk_group_size_threshold_ = data_budget_ 作为双重保险在此场景下触发保护(见 §3.3)。若需严格保证不超限,可在 write_table 前检查 Tablet 行数并返回错误。
STRING 列内存估算误差:使用固定 32 字节估算变长字符串,通过 SAFETY_FACTOR=0.85 和保守 M_INIT_BYTES 缓解。
Schema 同构假设:假设所有设备有相同的 FIELD 列结构。异构 schema 取最大 FIELD 列数作为保守估计。
chunk_group_size_threshold_ 是进程全局配置:多个实例并发运行时会互相覆盖。当前设计仅支持单写入器场景。
单线程写入:不考虑并发。
M_INIT_BYTES=900KB:是否随注册的时间序列数量显著变化。b = n_field_cols × 104 + 96:104 bytes/chunk_meta 和 96 bytes/chunk_group_meta 为近似值,需根据实际序列化大小校准。find_all_device_boundaries().size() - 1:确认等于本次 Tablet 中唯一设备数。原始方向:用公式 M_data = s × F、M_meta = K × D × b 预测内存,提前计算 F/K/s/b 等参数,通过行数计数驱动 flush 和 rotate。
转变原因:TsFile 已有两个精确的内存测量接口,无需估算:
M_data 的直接测量:TsFileWriter::calculate_mem_size_for_all_group() 已存在,遍历所有 ChunkWriter 求和:
// ChunkWriter / ValueChunkWriter / TimeChunkWriter 均实现: int64_t estimate_max_series_mem_size() { return chunk_data_.total_size() // 已完成页(编码后字节) + page_writer_.estimate_max_mem_size() // 当前页已编码 + encoder pending 上界 + page_header_overhead + statistic_sizeof; }
测量的是编码后的实际大小,不是原始数据大小。高效编码(delta、RLE)时返回值远小于 s × F,天然适应实际编码效率。
M_meta 的直接测量:所有元数据(ChunkMeta、ChunkGroupMeta、Statistic)均从 TsFileIOWriter::meta_allocator_(类型 PageArena)分配。只需给 PageArena 新增一个方法:
// page_arena.h 新增 int64_t get_total_used_bytes() const { int64_t total = 0; Page* p = dummy_head_.next_; while (p) { total += p->cur_alloc_ - (char*)(p + 1); p = p->next_; } return total; } // tsfile_io_writer.h 新增 int64_t get_meta_size() const { return meta_allocator_.get_total_used_bytes(); }
M_meta 是不经过数据编码的结构体,PageArena 的已用字节数即为精确的 M_meta 大小。
| M_data | M_meta | |
|---|---|---|
| 存储位置 | ChunkWriter 内部 ByteStream | meta_allocator_(PageArena) |
| 测量方法 | calculate_mem_size_for_all_group() | meta_allocator_.get_total_used_bytes() |
| 精度 | 编码后实际大小 + encoder pending 上界(保守) | 精确 |
| 编码影响 | 已体现,反映实际编码效率 | 无关,元数据不经数据编码 |
| 需新增代码 | 已有,直接暴露为 get_data_size() | PageArena 新增一个方法 + IOWriter getter |
有了直接监控,原来需要计算的 F、K、s、b、D 等参数大部分消失。“最优策略”退化为唯一一个决策:如何在 M_avail 中分配 data_budget 和 meta_budget。
M_avail = memory_limit - M_init // 流式(总量未知):50/50 是数学最优(最大化单文件容纳数据量) data_budget = M_avail / 2 meta_budget = M_avail / 2 // 总量已知:EOQ 给出最优分配比(同样只是两个数值)
运行时控制变为纯粹的阈值比较:
write_table(tablet):
writer->write_table(tablet)
if writer->get_data_size() >= data_budget:
writer->flush()
if writer->get_meta_size() >= meta_budget:
rotate_file()
去掉的东西:accumulated_rows 计数、observed_max_devices 追踪、find_all_device_boundaries 调用、b_per_device 计算、SAFETY_FACTOR(仅覆盖 M_init 估算误差,可提高到 0.95)。
监控接口和自动管理层相互独立,服务不同用户:
TsFileWriter(已有 + 小幅扩展) ├── get_data_size() ← calculate_mem_size_for_all_group() 直接暴露 └── get_meta_size() ← meta_allocator_.get_total_used_bytes() ConstrainedTsFileWriter(建立在上面两个接口之上) └── write_table() ← 内部用两个阈值自动判断 flush / rotate