blob: 9e368d81438ee74c24a6a5836f1485c695b62e3e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/group_commit_mgr.h"
#include <gen_cpp/Types_types.h>
#include <glog/logging.h>
#include <chrono>
#include "client_cache.h"
#include "cloud/config.h"
#include "common/compiler_util.h"
#include "common/config.h"
#include "common/status.h"
#include "pipeline/dependency.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "util/debug_points.h"
#include "util/thrift_rpc_helper.h"
namespace doris {
std::string LoadBlockQueue::_get_load_ids() {
std::stringstream ss;
ss << "[";
for (auto& id : _load_ids_to_write_dep) {
ss << id.first.to_string() << ", ";
}
ss << "]";
return ss.str();
}
Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
std::shared_ptr<vectorized::Block> block, bool write_wal,
UniqueId& load_id) {
DBUG_EXECUTE_IF("LoadBlockQueue.add_block.failed",
{ return Status::InternalError("LoadBlockQueue.add_block.failed"); });
std::unique_lock l(mutex);
RETURN_IF_ERROR(status);
if (UNLIKELY(runtime_state->is_cancelled())) {
return runtime_state->cancel_reason();
}
RETURN_IF_ERROR(status);
LOG(INFO) << "query_id: " << print_id(runtime_state->query_id())
<< ", add block rows=" << block->rows() << ", use group_commit label=" << label;
DBUG_EXECUTE_IF("LoadBlockQueue.add_block.block", DBUG_BLOCK);
if (block->rows() > 0) {
if (!config::group_commit_wait_replay_wal_finish) {
_block_queue.emplace_back(block);
_data_bytes += block->bytes();
int before_block_queues_bytes = _all_block_queues_bytes->load();
_all_block_queues_bytes->fetch_add(block->bytes(), std::memory_order_relaxed);
VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::add_block). "
<< "Cur block rows=" << block->rows() << ", bytes=" << block->bytes()
<< ". all block queues bytes from " << before_block_queues_bytes << " to "
<< _all_block_queues_bytes->load() << ", queue size=" << _block_queue.size()
<< ". txn_id=" << txn_id << ", label=" << label
<< ", instance_id=" << load_instance_id << ", load_ids=" << _get_load_ids();
}
if (write_wal || config::group_commit_wait_replay_wal_finish) {
auto st = _v_wal_writer->write_wal(block.get());
if (!st.ok()) {
_cancel_without_lock(st);
return st;
}
}
if (!runtime_state->is_cancelled() && status.ok() &&
_all_block_queues_bytes->load(std::memory_order_relaxed) >=
config::group_commit_queue_mem_limit) {
DCHECK(_load_ids_to_write_dep.find(load_id) != _load_ids_to_write_dep.end());
_load_ids_to_write_dep[load_id]->block();
VLOG_DEBUG << "block add_block for load_id=" << load_id
<< ", memory=" << _all_block_queues_bytes->load(std::memory_order_relaxed)
<< ". inner load_id=" << load_instance_id << ", label=" << label;
}
}
if (!_need_commit) {
if (_data_bytes >= _group_commit_data_bytes) {
VLOG_DEBUG << "group commit meets commit condition for data size, label=" << label
<< ", instance_id=" << load_instance_id << ", data_bytes=" << _data_bytes;
_need_commit = true;
data_size_condition = true;
}
if (std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() -
_start_time)
.count() >= _group_commit_interval_ms) {
VLOG_DEBUG << "group commit meets commit condition for time interval, label=" << label
<< ", instance_id=" << load_instance_id << ", data_bytes=" << _data_bytes;
_need_commit = true;
}
}
for (auto read_dep : _read_deps) {
read_dep->set_ready();
VLOG_DEBUG << "set ready for inner load_id=" << load_instance_id;
}
return Status::OK();
}
Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block* block,
bool* find_block, bool* eos,
std::shared_ptr<pipeline::Dependency> get_block_dep) {
*find_block = false;
*eos = false;
std::unique_lock l(mutex);
if (runtime_state->is_cancelled() || !status.ok()) {
auto st = runtime_state->cancel_reason();
_cancel_without_lock(st);
return status;
}
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - _start_time)
.count();
if (!_need_commit && duration >= _group_commit_interval_ms) {
_need_commit = true;
}
if (_block_queue.empty()) {
if (_need_commit && duration >= 10 * _group_commit_interval_ms) {
auto last_print_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - _last_print_time)
.count();
if (last_print_duration >= 10000) {
_last_print_time = std::chrono::steady_clock::now();
LOG(INFO) << "find one group_commit need to commit, txn_id=" << txn_id
<< ", label=" << label << ", instance_id=" << load_instance_id
<< ", duration=" << duration << ", load_ids=" << _get_load_ids();
}
}
VLOG_DEBUG << "get_block for inner load_id=" << load_instance_id << ", but queue is empty";
if (!_need_commit) {
get_block_dep->block();
VLOG_DEBUG << "block get_block for inner load_id=" << load_instance_id;
}
} else {
const BlockData block_data = _block_queue.front();
block->swap(*block_data.block);
*find_block = true;
_block_queue.pop_front();
int before_block_queues_bytes = _all_block_queues_bytes->load();
_all_block_queues_bytes->fetch_sub(block_data.block_bytes, std::memory_order_relaxed);
VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::get_block). "
<< "Cur block rows=" << block->rows() << ", bytes=" << block->bytes()
<< ". all block queues bytes from " << before_block_queues_bytes << " to "
<< _all_block_queues_bytes->load() << ", queue size=" << _block_queue.size()
<< ". txn_id=" << txn_id << ", label=" << label
<< ", instance_id=" << load_instance_id << ", load_ids=" << _get_load_ids();
}
if (_block_queue.empty() && _need_commit && _load_ids_to_write_dep.empty()) {
*eos = true;
} else {
*eos = false;
}
if (_all_block_queues_bytes->load(std::memory_order_relaxed) <
config::group_commit_queue_mem_limit) {
for (auto& id : _load_ids_to_write_dep) {
id.second->set_ready();
}
VLOG_DEBUG << "set ready for load_ids=" << _get_load_ids()
<< ". inner load_id=" << load_instance_id << ", label=" << label;
}
return Status::OK();
}
Status LoadBlockQueue::remove_load_id(const UniqueId& load_id) {
std::unique_lock l(mutex);
if (_load_ids_to_write_dep.find(load_id) != _load_ids_to_write_dep.end()) {
_load_ids_to_write_dep[load_id]->set_always_ready();
_load_ids_to_write_dep.erase(load_id);
for (auto read_dep : _read_deps) {
read_dep->set_ready();
}
VLOG_DEBUG << "set ready for load_id=" << load_id << ", inner load_id=" << load_instance_id;
return Status::OK();
}
return Status::NotFound<false>("load_id=" + load_id.to_string() +
" not in block queue, label=" + label);
}
bool LoadBlockQueue::contain_load_id(const UniqueId& load_id) {
std::unique_lock l(mutex);
return _load_ids_to_write_dep.find(load_id) != _load_ids_to_write_dep.end();
}
Status LoadBlockQueue::add_load_id(const UniqueId& load_id,
const std::shared_ptr<pipeline::Dependency> put_block_dep) {
std::unique_lock l(mutex);
if (_need_commit) {
return Status::InternalError<false>("block queue is set need commit, id=" +
load_instance_id.to_string());
}
_load_ids_to_write_dep[load_id] = put_block_dep;
group_commit_load_count.fetch_add(1);
return Status::OK();
}
void LoadBlockQueue::cancel(const Status& st) {
DCHECK(!st.ok());
std::unique_lock l(mutex);
_cancel_without_lock(st);
}
void LoadBlockQueue::_cancel_without_lock(const Status& st) {
LOG(INFO) << "cancel group_commit, instance_id=" << load_instance_id << ", label=" << label
<< ", status=" << st.to_string();
status =
Status::Cancelled("cancel group_commit, label=" + label + ", status=" + st.to_string());
int before_block_queues_bytes = _all_block_queues_bytes->load();
while (!_block_queue.empty()) {
const BlockData& block_data = _block_queue.front().block;
_all_block_queues_bytes->fetch_sub(block_data.block_bytes, std::memory_order_relaxed);
_block_queue.pop_front();
}
VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::_cancel_without_block). "
<< "all block queues bytes from " << before_block_queues_bytes << " to "
<< _all_block_queues_bytes->load() << ", queue size=" << _block_queue.size()
<< ", txn_id=" << txn_id << ", label=" << label
<< ", instance_id=" << load_instance_id << ", load_ids=" << _get_load_ids();
for (auto& id : _load_ids_to_write_dep) {
id.second->set_always_ready();
}
for (auto read_dep : _read_deps) {
read_dep->set_ready();
}
VLOG_DEBUG << "set ready for load_ids=" << _get_load_ids()
<< ", inner load_id=" << load_instance_id;
}
Status GroupCommitTable::get_first_block_load_queue(
int64_t table_id, int64_t base_schema_version, int64_t index_size, const UniqueId& load_id,
std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version,
std::shared_ptr<MemTrackerLimiter> mem_tracker,
std::shared_ptr<pipeline::Dependency> create_plan_dep,
std::shared_ptr<pipeline::Dependency> put_block_dep) {
DCHECK(table_id == _table_id);
std::unique_lock l(_lock);
auto try_to_get_matched_queue = [&]() -> Status {
for (const auto& [_, inner_block_queue] : _load_block_queues) {
if (inner_block_queue->contain_load_id(load_id)) {
load_block_queue = inner_block_queue;
return Status::OK();
}
}
for (const auto& [_, inner_block_queue] : _load_block_queues) {
if (!inner_block_queue->need_commit()) {
if (base_schema_version == inner_block_queue->schema_version &&
index_size == inner_block_queue->index_size) {
if (inner_block_queue->add_load_id(load_id, put_block_dep).ok()) {
load_block_queue = inner_block_queue;
return Status::OK();
}
} else {
return Status::DataQualityError<false>(
"schema version not match, maybe a schema change is in process. "
"Please retry this load manually.");
}
}
}
return Status::InternalError<false>("can not get a block queue for table_id: " +
std::to_string(_table_id) + _create_plan_failed_reason);
};
if (try_to_get_matched_queue().ok()) {
return Status::OK();
}
create_plan_dep->block();
_create_plan_deps.emplace(load_id, std::make_tuple(create_plan_dep, put_block_dep,
base_schema_version, index_size));
if (!_is_creating_plan_fragment) {
_is_creating_plan_fragment = true;
RETURN_IF_ERROR(
_thread_pool->submit_func([&, be_exe_version, mem_tracker, dep = create_plan_dep] {
Defer defer {[&, dep = dep]() {
std::unique_lock l(_lock);
for (auto it : _create_plan_deps) {
std::get<0>(it.second)->set_ready();
}
_create_plan_deps.clear();
_is_creating_plan_fragment = false;
}};
auto st = _create_group_commit_load(be_exe_version, mem_tracker);
if (!st.ok()) {
LOG(WARNING) << "create group commit load error: " << st.to_string();
_create_plan_failed_reason = ". create group commit load error: " +
st.to_string().substr(0, 300);
} else {
_create_plan_failed_reason = "";
}
}));
}
return try_to_get_matched_queue();
}
void GroupCommitTable::remove_load_id(const UniqueId& load_id) {
std::unique_lock l(_lock);
if (_create_plan_deps.find(load_id) != _create_plan_deps.end()) {
_create_plan_deps.erase(load_id);
return;
}
for (const auto& [_, inner_block_queue] : _load_block_queues) {
if (inner_block_queue->remove_load_id(load_id).ok()) {
return;
}
}
}
Status GroupCommitTable::_create_group_commit_load(int be_exe_version,
std::shared_ptr<MemTrackerLimiter> mem_tracker) {
Status st = Status::OK();
TStreamLoadPutResult result;
std::string label;
int64_t txn_id;
TUniqueId instance_id;
{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker);
UniqueId load_id = UniqueId::gen_uid();
TUniqueId tload_id;
tload_id.__set_hi(load_id.hi);
tload_id.__set_lo(load_id.lo);
std::regex reg("-");
label = "group_commit_" + std::regex_replace(load_id.to_string(), reg, "_");
std::stringstream ss;
ss << "insert into doris_internal_table_id(" << _table_id << ") WITH LABEL " << label
<< " select * from group_commit(\"table_id\"=\"" << _table_id << "\")";
TStreamLoadPutRequest request;
request.__set_load_sql(ss.str());
request.__set_loadId(tload_id);
request.__set_label(label);
request.__set_token("group_commit"); // this is a fake, fe not check it now
request.__set_max_filter_ratio(1.0);
request.__set_strictMode(false);
request.__set_partial_update(false);
// this is an internal interface, use admin to pass the auth check
request.__set_user("admin");
if (_exec_env->cluster_info()->backend_id != 0) {
request.__set_backend_id(_exec_env->cluster_info()->backend_id);
} else {
LOG(WARNING) << "_exec_env->cluster_info not set backend_id";
}
TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
st = ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&result, &request](FrontendServiceConnection& client) {
client->streamLoadPut(result, request);
},
10000L);
if (!st.ok()) {
LOG(WARNING) << "create group commit load rpc error, st=" << st.to_string();
return st;
}
st = Status::create<false>(result.status);
if (st.ok() && !result.__isset.pipeline_params) {
st = Status::InternalError("Non-pipeline is disabled!");
}
if (!st.ok()) {
LOG(WARNING) << "create group commit load error, st=" << st.to_string();
return st;
}
auto& pipeline_params = result.pipeline_params;
auto schema_version = pipeline_params.fragment.output_sink.olap_table_sink.schema.version;
auto index_size =
pipeline_params.fragment.output_sink.olap_table_sink.schema.indexes.size();
DCHECK(pipeline_params.fragment.output_sink.olap_table_sink.db_id == _db_id);
txn_id = pipeline_params.txn_conf.txn_id;
DCHECK(pipeline_params.local_params.size() == 1);
instance_id = pipeline_params.local_params[0].fragment_instance_id;
VLOG_DEBUG << "create plan fragment, db_id=" << _db_id << ", table=" << _table_id
<< ", schema version=" << schema_version << ", index size=" << index_size
<< ", label=" << label << ", txn_id=" << txn_id
<< ", instance_id=" << print_id(instance_id);
{
auto load_block_queue = std::make_shared<LoadBlockQueue>(
instance_id, label, txn_id, schema_version, index_size, _all_block_queues_bytes,
result.wait_internal_group_commit_finish, result.group_commit_interval_ms,
result.group_commit_data_bytes);
RETURN_IF_ERROR(load_block_queue->create_wal(
_db_id, _table_id, txn_id, label, _exec_env->wal_mgr(),
pipeline_params.fragment.output_sink.olap_table_sink.schema.slot_descs,
be_exe_version));
std::unique_lock l(_lock);
_load_block_queues.emplace(instance_id, load_block_queue);
std::vector<UniqueId> success_load_ids;
for (const auto& [id, load_info] : _create_plan_deps) {
auto create_dep = std::get<0>(load_info);
auto put_dep = std::get<1>(load_info);
if (load_block_queue->schema_version == std::get<2>(load_info) &&
load_block_queue->index_size == std::get<3>(load_info)) {
if (load_block_queue->add_load_id(id, put_dep).ok()) {
create_dep->set_ready();
success_load_ids.emplace_back(id);
}
}
}
for (const auto& id : success_load_ids) {
_create_plan_deps.erase(id);
}
}
}
st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, result.pipeline_params);
if (!st.ok()) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker);
auto finish_st = _finish_group_commit_load(_db_id, _table_id, label, txn_id, instance_id,
st, nullptr);
if (!finish_st.ok()) {
LOG(WARNING) << "finish group commit error, label=" << label
<< ", st=" << finish_st.to_string();
}
}
return st;
}
Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_id,
const std::string& label, int64_t txn_id,
const TUniqueId& instance_id, Status& status,
RuntimeState* state) {
Status st;
Status result_status;
DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_status",
{ status = Status::InternalError(""); });
DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.load_error",
{ status = Status::InternalError("load_error"); });
if (status.ok()) {
DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.commit_error",
{ status = Status::InternalError(""); });
// commit txn
TLoadTxnCommitRequest request;
// deprecated and should be removed in 3.1, use token instead
request.__set_auth_code(0);
request.__set_token(_exec_env->cluster_info()->curr_auth_token);
request.__set_db_id(db_id);
request.__set_table_id(table_id);
request.__set_txnId(txn_id);
request.__set_thrift_rpc_timeout_ms(config::txn_commit_rpc_timeout_ms);
request.__set_groupCommit(true);
request.__set_receiveBytes(state->num_bytes_load_total());
if (_exec_env->cluster_info()->backend_id != 0) {
request.__set_backendId(_exec_env->cluster_info()->backend_id);
} else {
LOG(WARNING) << "_exec_env->cluster_info not set backend_id";
}
if (state) {
request.__set_commitInfos(state->tablet_commit_infos());
}
TLoadTxnCommitResult result;
TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
int retry_times = 0;
while (retry_times < config::mow_stream_load_commit_retry_times) {
st = ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&request, &result](FrontendServiceConnection& client) {
client->loadTxnCommit(result, request);
},
config::txn_commit_rpc_timeout_ms);
result_status = Status::create(result.status);
// DELETE_BITMAP_LOCK_ERROR will be retried
if (result_status.ok() || !result_status.is<ErrorCode::DELETE_BITMAP_LOCK_ERROR>()) {
break;
}
LOG_WARNING("Failed to commit txn on group commit")
.tag("label", label)
.tag("txn_id", txn_id)
.tag("retry_times", retry_times)
.error(result_status);
retry_times++;
}
DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.commit_success_and_rpc_error",
{ result_status = Status::InternalError("commit_success_and_rpc_error"); });
} else {
// abort txn
TLoadTxnRollbackRequest request;
// deprecated and should be removed in 3.1, use token instead
request.__set_auth_code(0);
request.__set_token(_exec_env->cluster_info()->curr_auth_token);
request.__set_db_id(db_id);
request.__set_txnId(txn_id);
request.__set_reason(status.to_string());
TLoadTxnRollbackResult result;
TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
st = ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&request, &result](FrontendServiceConnection& client) {
client->loadTxnRollback(result, request);
});
result_status = Status::create<false>(result.status);
DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_status", {
std ::string msg = "abort txn";
LOG(INFO) << "debug promise set: " << msg;
ExecEnv::GetInstance()->group_commit_mgr()->debug_promise.set_value(
Status ::InternalError(msg));
});
}
std::shared_ptr<LoadBlockQueue> load_block_queue;
{
std::lock_guard<std::mutex> l(_lock);
auto it = _load_block_queues.find(instance_id);
if (it != _load_block_queues.end()) {
load_block_queue = it->second;
if (!status.ok()) {
load_block_queue->cancel(status);
}
//close wal
RETURN_IF_ERROR(load_block_queue->close_wal());
// notify sync mode loads
{
std::unique_lock l2(load_block_queue->mutex);
load_block_queue->process_finish = true;
for (auto dep : load_block_queue->dependencies) {
dep->set_always_ready();
}
}
}
_load_block_queues.erase(instance_id);
}
// status: exec_plan_fragment result
// st: commit txn rpc status
// result_status: commit txn result
DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_st",
{ st = Status::InternalError(""); });
if (status.ok() && st.ok() &&
(result_status.ok() || result_status.is<ErrorCode::PUBLISH_TIMEOUT>())) {
if (!config::group_commit_wait_replay_wal_finish) {
auto delete_st = _exec_env->wal_mgr()->delete_wal(table_id, txn_id);
if (!delete_st.ok()) {
LOG(WARNING) << "fail to delete wal " << txn_id << ", st=" << delete_st.to_string();
}
}
} else {
std::string wal_path;
RETURN_IF_ERROR(_exec_env->wal_mgr()->get_wal_path(txn_id, wal_path));
RETURN_IF_ERROR(_exec_env->wal_mgr()->add_recover_wal(db_id, table_id, txn_id, wal_path));
}
std::stringstream ss;
ss << "finish group commit, db_id=" << db_id << ", table_id=" << table_id << ", label=" << label
<< ", txn_id=" << txn_id << ", instance_id=" << print_id(instance_id)
<< ", exec_plan_fragment status=" << status.to_string()
<< ", commit/abort txn rpc status=" << st.to_string()
<< ", commit/abort txn status=" << result_status.to_string()
<< ", this group commit includes " << load_block_queue->group_commit_load_count << " loads"
<< ", flush because meet "
<< (load_block_queue->data_size_condition ? "data size " : "time ") << "condition"
<< ", wal space info:" << ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string();
if (state) {
if (!state->get_error_log_file_path().empty()) {
ss << ", error_url=" << state->get_error_log_file_path();
}
ss << ", rows=" << state->num_rows_load_success();
}
LOG(INFO) << ss.str();
DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg", {
if (dp->param<int64_t>("table_id", -1) == table_id) {
std ::string msg = _exec_env->wal_mgr()->get_wal_dirs_info_string();
LOG(INFO) << "table_id" << std::to_string(table_id) << " set debug promise: " << msg;
ExecEnv::GetInstance()->group_commit_mgr()->debug_promise.set_value(
Status ::InternalError(msg));
}
};);
return st;
}
Status GroupCommitTable::_exec_plan_fragment(int64_t db_id, int64_t table_id,
const std::string& label, int64_t txn_id,
const TPipelineFragmentParams& pipeline_params) {
auto finish_cb = [db_id, table_id, label, txn_id, this](RuntimeState* state, Status* status) {
DCHECK(state);
auto finish_st = _finish_group_commit_load(db_id, table_id, label, txn_id,
state->fragment_instance_id(), *status, state);
if (!finish_st.ok()) {
LOG(WARNING) << "finish group commit error, label=" << label
<< ", st=" << finish_st.to_string();
}
};
TPipelineFragmentParamsList mocked;
return _exec_env->fragment_mgr()->exec_plan_fragment(
pipeline_params, QuerySource::GROUP_COMMIT_LOAD, finish_cb, mocked);
}
Status GroupCommitTable::get_load_block_queue(const TUniqueId& instance_id,
std::shared_ptr<LoadBlockQueue>& load_block_queue,
std::shared_ptr<pipeline::Dependency> get_block_dep) {
std::unique_lock l(_lock);
auto it = _load_block_queues.find(instance_id);
if (it == _load_block_queues.end()) {
return Status::InternalError("group commit load instance " + print_id(instance_id) +
" not found");
}
load_block_queue = it->second;
load_block_queue->append_read_dependency(get_block_dep);
return Status::OK();
}
GroupCommitMgr::GroupCommitMgr(ExecEnv* exec_env) : _exec_env(exec_env) {
static_cast<void>(ThreadPoolBuilder("GroupCommitThreadPool")
.set_min_threads(1)
.set_max_threads(config::group_commit_insert_threads)
.build(&_thread_pool));
_all_block_queues_bytes = std::make_shared<std::atomic_size_t>(0);
}
GroupCommitMgr::~GroupCommitMgr() {
LOG(INFO) << "GroupCommitMgr is destoried";
}
void GroupCommitMgr::stop() {
_thread_pool->shutdown();
LOG(INFO) << "GroupCommitMgr is stopped";
}
Status GroupCommitMgr::get_first_block_load_queue(
int64_t db_id, int64_t table_id, int64_t base_schema_version, int64_t index_size,
const UniqueId& load_id, std::shared_ptr<LoadBlockQueue>& load_block_queue,
int be_exe_version, std::shared_ptr<MemTrackerLimiter> mem_tracker,
std::shared_ptr<pipeline::Dependency> create_plan_dep,
std::shared_ptr<pipeline::Dependency> put_block_dep) {
std::shared_ptr<GroupCommitTable> group_commit_table;
{
std::lock_guard wlock(_lock);
if (_table_map.find(table_id) == _table_map.end()) {
_table_map.emplace(table_id, std::make_shared<GroupCommitTable>(
_exec_env, _thread_pool.get(), db_id, table_id,
_all_block_queues_bytes));
}
group_commit_table = _table_map[table_id];
}
RETURN_IF_ERROR(group_commit_table->get_first_block_load_queue(
table_id, base_schema_version, index_size, load_id, load_block_queue, be_exe_version,
mem_tracker, create_plan_dep, put_block_dep));
return Status::OK();
}
Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId& instance_id,
std::shared_ptr<LoadBlockQueue>& load_block_queue,
std::shared_ptr<pipeline::Dependency> get_block_dep) {
std::shared_ptr<GroupCommitTable> group_commit_table;
{
std::lock_guard<std::mutex> l(_lock);
auto it = _table_map.find(table_id);
if (it == _table_map.end()) {
return Status::NotFound("table_id: " + std::to_string(table_id) +
", instance_id: " + print_id(instance_id) + " dose not exist");
}
group_commit_table = it->second;
}
return group_commit_table->get_load_block_queue(instance_id, load_block_queue, get_block_dep);
}
void GroupCommitMgr::remove_load_id(int64_t table_id, const UniqueId& load_id) {
std::lock_guard wlock(_lock);
if (_table_map.find(table_id) != _table_map.end()) {
_table_map.find(table_id)->second->remove_load_id(load_id);
}
}
Status LoadBlockQueue::create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id,
const std::string& import_label, WalManager* wal_manager,
std::vector<TSlotDescriptor>& slot_desc, int be_exe_version) {
std::string real_label = config::group_commit_wait_replay_wal_finish
? import_label + "_test_wait"
: import_label;
RETURN_IF_ERROR(ExecEnv::GetInstance()->wal_mgr()->create_wal_path(
db_id, tb_id, wal_id, real_label, _wal_base_path, WAL_VERSION));
_v_wal_writer = std::make_shared<vectorized::VWalWriter>(
db_id, tb_id, wal_id, real_label, wal_manager, slot_desc, be_exe_version);
return _v_wal_writer->init();
}
Status LoadBlockQueue::close_wal() {
if (_v_wal_writer != nullptr) {
RETURN_IF_ERROR(_v_wal_writer->close());
}
return Status::OK();
}
void LoadBlockQueue::append_dependency(std::shared_ptr<pipeline::Dependency> finish_dep) {
std::lock_guard<std::mutex> lock(mutex);
// If not finished, dependencies should be blocked.
if (!process_finish) {
finish_dep->block();
dependencies.push_back(finish_dep);
}
}
void LoadBlockQueue::append_read_dependency(std::shared_ptr<pipeline::Dependency> read_dep) {
std::lock_guard<std::mutex> lock(mutex);
_read_deps.push_back(read_dep);
}
bool LoadBlockQueue::has_enough_wal_disk_space(size_t estimated_wal_bytes) {
DBUG_EXECUTE_IF("LoadBlockQueue.has_enough_wal_disk_space.low_space", { return false; });
auto* wal_mgr = ExecEnv::GetInstance()->wal_mgr();
size_t available_bytes = 0;
{
Status st = wal_mgr->get_wal_dir_available_size(_wal_base_path, &available_bytes);
if (!st.ok()) {
LOG(WARNING) << "get wal dir available size failed, st=" << st.to_string();
}
}
if (estimated_wal_bytes < available_bytes) {
Status st =
wal_mgr->update_wal_dir_estimated_wal_bytes(_wal_base_path, estimated_wal_bytes, 0);
if (!st.ok()) {
LOG(WARNING) << "update wal dir estimated_wal_bytes failed, reason: " << st.to_string();
}
return true;
} else {
return false;
}
}
} // namespace doris