blob: e68977ea64efd40c19a0bf52cd5729616dff8d78 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/sink/load_stream_map_pool.h"
#include "util/debug_points.h"
namespace doris {
#include "common/compile_check_begin.h"
class TExpr;
LoadStreamMap::LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, int num_use,
LoadStreamMapPool* pool)
: _load_id(load_id),
_src_id(src_id),
_num_streams(num_streams),
_use_cnt(num_use),
_num_incremental_streams(0),
_pool(pool),
_tablet_schema_for_index(std::make_shared<IndexToTabletSchema>()),
_enable_unique_mow_for_index(std::make_shared<IndexToEnableMoW>()) {
DCHECK(num_streams > 0) << "stream num should be greater than 0";
DCHECK(num_use > 0) << "use num should be greater than 0";
}
std::shared_ptr<LoadStreamStubs> LoadStreamMap::get_or_create(int64_t dst_id, bool incremental) {
std::lock_guard<std::mutex> lock(_mutex);
std::shared_ptr<LoadStreamStubs> streams = _streams_for_node[dst_id];
if (streams != nullptr) {
return streams;
}
if (incremental) {
_num_incremental_streams.fetch_add(1);
}
streams = std::make_shared<LoadStreamStubs>(_num_streams, _load_id, _src_id,
_tablet_schema_for_index,
_enable_unique_mow_for_index, incremental);
_streams_for_node[dst_id] = streams;
return streams;
}
std::shared_ptr<LoadStreamStubs> LoadStreamMap::at(int64_t dst_id) {
std::lock_guard<std::mutex> lock(_mutex);
return _streams_for_node.at(dst_id);
}
bool LoadStreamMap::contains(int64_t dst_id) {
std::lock_guard<std::mutex> lock(_mutex);
return _streams_for_node.contains(dst_id);
}
void LoadStreamMap::for_each(std::function<void(int64_t, LoadStreamStubs&)> fn) {
decltype(_streams_for_node) snapshot;
{
std::lock_guard<std::mutex> lock(_mutex);
snapshot = _streams_for_node;
}
for (auto& [dst_id, streams] : snapshot) {
fn(dst_id, *streams);
}
}
Status LoadStreamMap::for_each_st(std::function<Status(int64_t, LoadStreamStubs&)> fn) {
decltype(_streams_for_node) snapshot;
{
std::lock_guard<std::mutex> lock(_mutex);
snapshot = _streams_for_node;
}
Status status = Status::OK();
for (auto& [dst_id, streams] : snapshot) {
auto st = fn(dst_id, *streams);
if (!st.ok() && status.ok()) {
status = st;
}
}
return status;
}
void LoadStreamMap::save_tablets_to_commit(int64_t dst_id,
const std::vector<PTabletID>& tablets_to_commit) {
std::lock_guard<std::mutex> lock(_tablets_to_commit_mutex);
auto& tablets = _tablets_to_commit[dst_id];
for (const auto& tablet : tablets_to_commit) {
tablets.emplace(tablet.tablet_id(), tablet);
}
}
bool LoadStreamMap::release() {
int num_use = --_use_cnt;
if (num_use == 0) {
LOG(INFO) << "releasing streams, load_id=" << _load_id;
_pool->erase(_load_id);
return true;
}
LOG(INFO) << "keeping streams, load_id=" << _load_id << ", use_cnt=" << num_use;
return false;
}
void LoadStreamMap::close_load(bool incremental) {
for (auto& [dst_id, streams] : _streams_for_node) {
if (streams->is_incremental() != incremental) {
continue;
}
std::vector<PTabletID> tablets_to_commit;
const auto& tablets = _tablets_to_commit[dst_id];
tablets_to_commit.reserve(tablets.size());
for (const auto& [tablet_id, tablet] : tablets) {
tablets_to_commit.push_back(tablet);
tablets_to_commit.back().set_num_segments(_segments_for_tablet[tablet_id]);
}
auto st = streams->close_load(tablets_to_commit, _num_incremental_streams.load());
if (!st.ok()) {
LOG(WARNING) << "close_load for " << (incremental ? "incremental" : "non-incremental")
<< " streams failed: " << st << ", load_id=" << _load_id;
}
}
}
LoadStreamMapPool::LoadStreamMapPool() = default;
LoadStreamMapPool::~LoadStreamMapPool() = default;
std::shared_ptr<LoadStreamMap> LoadStreamMapPool::get_or_create(UniqueId load_id, int64_t src_id,
int num_streams, int num_use) {
std::lock_guard<std::mutex> lock(_mutex);
std::shared_ptr<LoadStreamMap> streams = _pool[load_id];
if (streams != nullptr) {
return streams;
}
streams = std::make_shared<LoadStreamMap>(load_id, src_id, num_streams, num_use, this);
_pool[load_id] = streams;
return streams;
}
void LoadStreamMapPool::erase(UniqueId load_id) {
std::lock_guard<std::mutex> lock(_mutex);
_pool.erase(load_id);
}
} // namespace doris