blob: e982acea923bc9f5c809afe730f740151440719c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "cloud/cloud_stream_load_executor.h"
#include <bvar/bvar.h>
#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_storage_engine.h"
#include "cloud/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "runtime/stream_load/stream_load_context.h"
#include "util/debug_points.h"
namespace doris {
bvar::Adder<uint64_t> stream_load_commit_retry_counter("stream_load_commit_retry_counter");
bvar::Window<bvar::Adder<uint64_t>> stream_load_commit_retry_counter_minute(
"stream_load_commit_retry_counter", "1m", &stream_load_commit_retry_counter, 60);
enum class TxnOpParamType : int {
ILLEGAL,
WITH_TXN_ID,
WITH_LABEL,
};
CloudStreamLoadExecutor::CloudStreamLoadExecutor(ExecEnv* exec_env)
: StreamLoadExecutor(exec_env) {}
CloudStreamLoadExecutor::~CloudStreamLoadExecutor() = default;
Status CloudStreamLoadExecutor::pre_commit_txn(StreamLoadContext* ctx) {
auto st = _exec_env->storage_engine().to_cloud().meta_mgr().precommit_txn(*ctx);
if (!st.ok()) {
LOG(WARNING) << "Failed to precommit txn: " << st << ", " << ctx->brief();
return st;
}
ctx->need_rollback = false;
return st;
}
Status CloudStreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) {
std::stringstream ss;
ss << "db_id=" << ctx->db_id << " txn_id=" << ctx->txn_id << " label=" << ctx->label
<< " txn_2pc_op=" << ctx->txn_operation;
std::string op_info = ss.str();
VLOG_DEBUG << "operate_txn_2pc " << op_info;
TxnOpParamType topt = ctx->txn_id > 0 ? TxnOpParamType::WITH_TXN_ID
: !ctx->label.empty() ? TxnOpParamType::WITH_LABEL
: TxnOpParamType::ILLEGAL;
Status st = Status::InternalError<false>("impossible branch reached, " + op_info);
if (ctx->txn_operation.compare("commit") == 0) {
if (!config::enable_stream_load_commit_txn_on_be) {
VLOG_DEBUG << "2pc commit stream load txn with FE support: " << op_info;
st = StreamLoadExecutor::operate_txn_2pc(ctx);
} else if (topt == TxnOpParamType::WITH_TXN_ID) {
VLOG_DEBUG << "2pc commit stream load txn directly: " << op_info;
st = _exec_env->storage_engine().to_cloud().meta_mgr().commit_txn(*ctx, true);
} else if (topt == TxnOpParamType::WITH_LABEL) {
VLOG_DEBUG << "2pc commit stream load txn with FE support: " << op_info;
st = StreamLoadExecutor::operate_txn_2pc(ctx);
} else {
st = Status::InternalError<false>(
"failed to 2pc commit txn, with TxnOpParamType::illegal input, " + op_info);
}
} else if (ctx->txn_operation.compare("abort") == 0) {
if (topt == TxnOpParamType::WITH_TXN_ID) {
LOG(INFO) << "2pc abort stream load txn directly: " << op_info;
st = _exec_env->storage_engine().to_cloud().meta_mgr().abort_txn(*ctx);
WARN_IF_ERROR(st, "failed to rollback txn " + op_info);
} else if (topt == TxnOpParamType::WITH_LABEL) { // maybe a label send to FE to abort
VLOG_DEBUG << "2pc abort stream load txn with FE support: " << op_info;
StreamLoadExecutor::rollback_txn(ctx);
st = Status::OK();
} else {
st = Status::InternalError<false>("failed abort txn, with illegal input, " + op_info);
}
} else {
std::string msg =
"failed to operate_txn_2pc, unrecognized operation: " + ctx->txn_operation;
LOG(WARNING) << msg << " " << op_info;
st = Status::InternalError<false>(msg + " " + op_info);
}
WARN_IF_ERROR(st, "failed to operate_txn_2pc " + op_info)
return st;
}
Status CloudStreamLoadExecutor::commit_txn(StreamLoadContext* ctx) {
DBUG_EXECUTE_IF("StreamLoadExecutor.commit_txn.block", DBUG_BLOCK);
// forward to fe to excute commit transaction for MoW table
if (ctx->is_mow_table() || !config::enable_stream_load_commit_txn_on_be ||
ctx->load_type == TLoadType::ROUTINE_LOAD) {
Status st;
int retry_times = 0;
while (retry_times < config::mow_stream_load_commit_retry_times) {
st = StreamLoadExecutor::commit_txn(ctx);
// DELETE_BITMAP_LOCK_ERROR will be retried
if (st.ok() || !st.is<ErrorCode::DELETE_BITMAP_LOCK_ERROR>()) {
break;
}
LOG_WARNING("Failed to commit txn")
.tag("txn_id", ctx->txn_id)
.tag("retry_times", retry_times)
.error(st);
retry_times++;
stream_load_commit_retry_counter << 1;
}
return st;
}
auto st = _exec_env->storage_engine().to_cloud().meta_mgr().commit_txn(*ctx, false);
if (!st.ok()) {
LOG(WARNING) << "Failed to commit txn: " << st << ", " << ctx->brief();
return st;
}
ctx->need_rollback = false;
return st;
}
void CloudStreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) {
std::stringstream ss;
ss << "db_id=" << ctx->db_id << " txn_id=" << ctx->txn_id << " label=" << ctx->label;
std::string op_info = ss.str();
LOG(INFO) << "rollback stream load txn " << op_info;
TxnOpParamType topt = ctx->txn_id > 0 ? TxnOpParamType::WITH_TXN_ID
: !ctx->label.empty() ? TxnOpParamType::WITH_LABEL
: TxnOpParamType::ILLEGAL;
if (topt == TxnOpParamType::WITH_TXN_ID && ctx->load_type != TLoadType::ROUTINE_LOAD) {
VLOG_DEBUG << "abort stream load txn directly: " << op_info;
WARN_IF_ERROR(_exec_env->storage_engine().to_cloud().meta_mgr().abort_txn(*ctx),
"failed to rollback txn " + op_info);
} else { // maybe a label send to FE to abort
// does not care about the return status
// ctx->db_id > 0 && !ctx->label.empty()
VLOG_DEBUG << "abort stream load txn with FE support: " << op_info;
StreamLoadExecutor::rollback_txn(ctx);
}
}
} // namespace doris