blob: f6a6a26398069afa04fc4f45b574daccc414bec5 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <map>
#include <vector>
#include "olap/olap_common.h"
#include "olap/utils.h"
#include "runtime/thread_context.h"
namespace doris {
// For unique key merge on write table, we should update delete bitmap
// of destination rowset when compaction finished.
// Through the row id correspondence between the source rowset and the
// destination rowset, we can quickly update the delete bitmap of the
// destination rowset.
class RowIdConversion {
public:
RowIdConversion() = default;
~RowIdConversion() { RELEASE_THREAD_MEM_TRACKER(_seg_rowid_map_mem_used); }
// resize segment rowid map to its rows num
Status init_segment_map(const RowsetId& src_rowset_id, const std::vector<uint32_t>& num_rows) {
for (size_t i = 0; i < num_rows.size(); i++) {
constexpr size_t RESERVED_MEMORY = 10 * 1024 * 1024; // 10M
if (doris::GlobalMemoryArbitrator::is_exceed_hard_mem_limit(RESERVED_MEMORY)) {
return Status::MemoryLimitExceeded(fmt::format(
"RowIdConversion init_segment_map failed, process memory exceed limit or "
"sys available memory less than low water mark , {}, "
"consuming "
"tracker:<{}>, peak used {}, current used {}.",
doris::GlobalMemoryArbitrator::process_mem_log_str(),
doris::thread_context()
->thread_mem_tracker_mgr->limiter_mem_tracker()
->label(),
doris::thread_context()
->thread_mem_tracker_mgr->limiter_mem_tracker()
->peak_consumption(),
doris::thread_context()
->thread_mem_tracker_mgr->limiter_mem_tracker()
->consumption()));
}
uint32_t id = _segments_rowid_map.size();
_segment_to_id_map.emplace(std::pair<RowsetId, uint32_t> {src_rowset_id, i}, id);
_id_to_segment_map.emplace_back(src_rowset_id, i);
std::vector<std::pair<uint32_t, uint32_t>> vec(
num_rows[i], std::pair<uint32_t, uint32_t>(UINT32_MAX, UINT32_MAX));
//NOTE: manually count _segments_rowid_map's memory here, because _segments_rowid_map could be used by indexCompaction.
// indexCompaction is a thridparty code, it's too complex to modify it.
// refer compact_column.
track_mem_usage(vec.capacity());
_segments_rowid_map.emplace_back(std::move(vec));
}
return Status::OK();
}
// set dst rowset id
void set_dst_rowset_id(const RowsetId& dst_rowset_id) { _dst_rowst_id = dst_rowset_id; }
const RowsetId get_dst_rowset_id() { return _dst_rowst_id; }
// add row id to the map
void add(const std::vector<RowLocation>& rss_row_ids,
const std::vector<uint32_t>& dst_segments_num_row) {
for (auto& item : rss_row_ids) {
if (item.row_id == -1) {
continue;
}
uint32_t id = _segment_to_id_map.at(
std::pair<RowsetId, uint32_t> {item.rowset_id, item.segment_id});
if (_cur_dst_segment_id < dst_segments_num_row.size() &&
_cur_dst_segment_rowid >= dst_segments_num_row[_cur_dst_segment_id]) {
_cur_dst_segment_id++;
_cur_dst_segment_rowid = 0;
}
_segments_rowid_map[id][item.row_id] =
std::pair<uint32_t, uint32_t> {_cur_dst_segment_id, _cur_dst_segment_rowid++};
}
}
// get destination RowLocation
// return non-zero if the src RowLocation does not exist
int get(const RowLocation& src, RowLocation* dst) const {
auto iter = _segment_to_id_map.find({src.rowset_id, src.segment_id});
if (iter == _segment_to_id_map.end()) {
return -1;
}
const auto& rowid_map = _segments_rowid_map[iter->second];
if (src.row_id >= rowid_map.size()) {
return -1;
}
auto& [dst_segment_id, dst_rowid] = rowid_map[src.row_id];
if (dst_segment_id == UINT32_MAX && dst_rowid == UINT32_MAX) {
return -1;
}
dst->rowset_id = _dst_rowst_id;
dst->segment_id = dst_segment_id;
dst->row_id = dst_rowid;
return 0;
}
const std::vector<std::vector<std::pair<uint32_t, uint32_t>>>& get_rowid_conversion_map()
const {
return _segments_rowid_map;
}
const std::map<std::pair<RowsetId, uint32_t>, uint32_t>& get_src_segment_to_id_map() {
return _segment_to_id_map;
}
std::pair<RowsetId, uint32_t> get_segment_by_id(uint32_t id) const {
DCHECK_GT(_id_to_segment_map.size(), id);
return _id_to_segment_map.at(id);
}
uint32_t get_id_by_segment(const std::pair<RowsetId, uint32_t>& segment) const {
return _segment_to_id_map.at(segment);
}
private:
void track_mem_usage(size_t delta_std_pair_cap) {
_std_pair_cap += delta_std_pair_cap;
size_t new_size =
_std_pair_cap * sizeof(std::pair<uint32_t, uint32_t>) +
_segments_rowid_map.capacity() * sizeof(std::vector<std::pair<uint32_t, uint32_t>>);
CONSUME_THREAD_MEM_TRACKER(new_size - _seg_rowid_map_mem_used);
_seg_rowid_map_mem_used = new_size;
}
private:
// the first level vector: index indicates src segment.
// the second level vector: index indicates row id of source segment,
// value indicates row id of destination segment.
// <UINT32_MAX, UINT32_MAX> indicates current row not exist.
std::vector<std::vector<std::pair<uint32_t, uint32_t>>> _segments_rowid_map;
size_t _seg_rowid_map_mem_used {0};
size_t _std_pair_cap {0};
// Map source segment to 0 to n
std::map<std::pair<RowsetId, uint32_t>, uint32_t> _segment_to_id_map;
// Map 0 to n to source segment
std::vector<std::pair<RowsetId, uint32_t>> _id_to_segment_map;
// dst rowset id
RowsetId _dst_rowst_id;
// current dst segment id
std::uint32_t _cur_dst_segment_id = 0;
// current rowid of dst segment
std::uint32_t _cur_dst_segment_rowid = 0;
};
} // namespace doris