| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "load/stream_load/stream_load_executor.h" |
| |
| #include <bvar/bvar.h> |
| #include <bvar/latency_recorder.h> |
| #include <gen_cpp/FrontendService.h> |
| #include <gen_cpp/FrontendService_types.h> |
| #include <gen_cpp/HeartbeatService_types.h> |
| #include <gen_cpp/PaloInternalService_types.h> |
| #include <gen_cpp/Types_types.h> |
| #include <glog/logging.h> |
| #include <stdint.h> |
| |
| #include <functional> |
| #include <future> |
| #include <map> |
| #include <memory> |
| #include <mutex> |
| #include <ostream> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include "common/config.h" |
| #include "common/metrics/doris_metrics.h" |
| #include "common/status.h" |
| #include "common/utils.h" |
| #include "load/message_body_sink.h" |
| #include "load/stream_load/new_load_stream_mgr.h" |
| #include "load/stream_load/stream_load_context.h" |
| #include "runtime/exec_env.h" |
| #include "runtime/fragment_mgr.h" |
| #include "runtime/runtime_state.h" |
| #include "thrift/protocol/TDebugProtocol.h" |
| #include "util/client_cache.h" |
| #include "util/debug_points.h" |
| #include "util/thrift_rpc_helper.h" |
| #include "util/time.h" |
| #include "util/uid_util.h" |
| |
| namespace doris { |
| using namespace ErrorCode; |
| |
| #ifdef BE_TEST |
| TLoadTxnBeginResult k_stream_load_begin_result; |
| TLoadTxnCommitResult k_stream_load_commit_result; |
| TLoadTxnRollbackResult k_stream_load_rollback_result; |
| Status k_stream_load_plan_status; |
| #endif |
| |
| bvar::LatencyRecorder g_stream_load_begin_txn_latency("stream_load", "begin_txn"); |
| bvar::LatencyRecorder g_stream_load_precommit_txn_latency("stream_load", "precommit_txn"); |
| bvar::LatencyRecorder g_stream_load_commit_txn_latency("stream_load", "commit_txn"); |
| |
| Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadContext> ctx, |
| const TPipelineFragmentParamsList& parent) { |
| return execute_plan_fragment(ctx, parent, [](std::shared_ptr<StreamLoadContext> ctx) {}); |
| } |
| |
| Status StreamLoadExecutor::execute_plan_fragment( |
| std::shared_ptr<StreamLoadContext> ctx, const TPipelineFragmentParamsList& parent, |
| const std::function<void(std::shared_ptr<StreamLoadContext> ctx)>& cb) { |
| // submit this params |
| #ifndef BE_TEST |
| ctx->put_result.pipeline_params.query_options.__set_enable_strict_cast(false); |
| ctx->start_write_data_nanos = MonotonicNanos(); |
| LOG(INFO) << "begin to execute stream load. label=" << ctx->label << ", txn_id=" << ctx->txn_id |
| << ", query_id=" << ctx->id; |
| Status st; |
| std::shared_ptr<bool> is_prepare_success = std::make_shared<bool>(false); |
| |
| auto exec_fragment = [ctx, cb, this, is_prepare_success](RuntimeState* state, Status* status) { |
| if (ctx->group_commit) { |
| ctx->label = state->import_label(); |
| ctx->txn_id = state->wal_id(); |
| } |
| ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id); |
| ctx->commit_infos = state->tablet_commit_infos(); |
| ctx->number_total_rows = state->num_rows_load_total(); |
| ctx->number_loaded_rows = state->num_rows_load_success(); |
| ctx->number_filtered_rows = state->num_rows_load_filtered(); |
| ctx->number_unselected_rows = state->num_rows_load_unselected(); |
| ctx->loaded_bytes = state->num_bytes_load_total(); |
| int64_t num_selected_rows = ctx->number_total_rows - ctx->number_unselected_rows; |
| ctx->error_url = to_load_error_http_path(state->get_error_log_file_path()); |
| if (status->ok() && !ctx->group_commit && num_selected_rows > 0 && |
| (double)ctx->number_filtered_rows / num_selected_rows > ctx->max_filter_ratio) { |
| // NOTE: Do not modify the error message here, for historical reasons, |
| // some users may rely on this error message. |
| if (ctx->need_commit_self) { |
| *status = |
| Status::DataQualityError("too many filtered rows, url: {}", ctx->error_url); |
| } else { |
| *status = Status::DataQualityError("too many filtered rows"); |
| } |
| } |
| |
| if (status->ok()) { |
| DorisMetrics::instance()->stream_receive_bytes_total->increment(ctx->receive_bytes); |
| DorisMetrics::instance()->stream_load_rows_total->increment(ctx->number_loaded_rows); |
| } else { |
| LOG(WARNING) << "fragment execute failed" |
| << ", err_msg=" << status->to_string() << ", " << ctx->brief(); |
| ctx->number_loaded_rows = 0; |
| ctx->first_error_msg = state->get_first_error_msg(); |
| // cancel body_sink, make sender known it |
| if (ctx->body_sink != nullptr) { |
| ctx->body_sink->cancel(status->to_string()); |
| } |
| |
| switch (ctx->load_src_type) { |
| // reset the stream load ctx's kafka commit offset |
| case TLoadSourceType::KAFKA: |
| ctx->kafka_info->reset_offset(); |
| break; |
| default: |
| break; |
| } |
| } |
| ctx->write_data_cost_nanos = MonotonicNanos() - ctx->start_write_data_nanos; |
| ctx->load_status_promise.set_value(*status); |
| |
| if (!status->ok() && ctx->body_sink != nullptr) { |
| // In some cases, the load execution is exited early. |
| // For example, when max_filter_ratio is 0 and illegal data is encountered |
| // during stream loading, the entire load process is terminated early. |
| // However, the http connection may still be sending data to stream_load_pipe |
| // and waiting for it to be consumed. |
| // Therefore, we need to actively cancel to end the pipe. |
| ctx->body_sink->cancel(status->to_string()); |
| } |
| |
| if (ctx->need_commit_self && ctx->body_sink != nullptr) { |
| if (ctx->body_sink->cancelled() || !status->ok()) { |
| ctx->status = *status; |
| this->rollback_txn(ctx.get()); |
| } else { |
| static_cast<void>(this->commit_txn(ctx.get())); |
| } |
| } |
| |
| if (*is_prepare_success) { |
| // if prepare failed, on_header will send reply |
| cb(ctx); |
| } |
| }; |
| st = _exec_env->fragment_mgr()->exec_plan_fragment(ctx->put_result.pipeline_params, |
| QuerySource::STREAM_LOAD, exec_fragment, |
| parent, is_prepare_success); |
| |
| if (!st.ok()) { |
| // no need to check unref's return value |
| return st; |
| } |
| #else |
| ctx->load_status_promise.set_value(k_stream_load_plan_status); |
| #endif |
| return Status::OK(); |
| } |
| |
| Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) { |
| DorisMetrics::instance()->stream_load_txn_begin_request_total->increment(1); |
| |
| TLoadTxnBeginRequest request; |
| set_request_auth(&request, ctx->auth); |
| request.__set_db(ctx->db); |
| request.__set_tbl(ctx->table); |
| request.__set_label(ctx->label); |
| // set timestamp |
| request.__set_timestamp(GetCurrentTimeMicros()); |
| if (ctx->timeout_second != -1) { |
| request.__set_timeout(ctx->timeout_second); |
| } |
| request.__set_request_id(ctx->id.to_thrift()); |
| request.__set_backend_id(_exec_env->cluster_info()->backend_id); |
| if (ctx->group_commit_mode.empty()) { |
| request.__set_use_table_group_commit_mode(true); |
| } |
| |
| TLoadTxnBeginResult result; |
| Status status; |
| int64_t duration_ns = 0; |
| auto master_addr_provider = [this]() { return _exec_env->cluster_info()->master_fe_addr; }; |
| TNetworkAddress master_addr = master_addr_provider(); |
| if (master_addr.hostname.empty() || master_addr.port == 0) { |
| status = Status::Error<SERVICE_UNAVAILABLE>("Have not get FE Master heartbeat yet"); |
| } else { |
| SCOPED_RAW_TIMER(&duration_ns); |
| #ifndef BE_TEST |
| RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( |
| master_addr_provider, [&request, &result](FrontendServiceConnection& client) { |
| client->loadTxnBegin(result, request); |
| })); |
| #else |
| result = k_stream_load_begin_result; |
| #endif |
| status = Status::create<false>(result.status); |
| } |
| g_stream_load_begin_txn_latency << duration_ns / 1000; |
| if (!status.ok()) { |
| LOG(WARNING) << "begin transaction failed, errmsg=" << status << ctx->brief(); |
| if (result.__isset.job_status) { |
| ctx->existing_job_status = result.job_status; |
| } |
| return status; |
| } |
| if (ctx->group_commit_mode.empty() && result.__isset.table_group_commit_mode) { |
| auto table_group_commit_mode = result.table_group_commit_mode; |
| if (iequal(table_group_commit_mode, "async_mode") || |
| iequal(table_group_commit_mode, "sync_mode")) { |
| ctx->group_commit = true; |
| ctx->group_commit_mode = table_group_commit_mode; |
| return Status::OK(); |
| } |
| } |
| ctx->txn_id = result.txnId; |
| if (result.__isset.db_id) { |
| ctx->db_id = result.db_id; |
| } |
| ctx->need_rollback = true; |
| |
| return Status::OK(); |
| } |
| |
| Status StreamLoadExecutor::pre_commit_txn(StreamLoadContext* ctx) { |
| TLoadTxnCommitRequest request; |
| get_commit_request(ctx, request); |
| |
| TLoadTxnCommitResult result; |
| int64_t duration_ns = 0; |
| { |
| SCOPED_RAW_TIMER(&duration_ns); |
| #ifndef BE_TEST |
| auto master_addr_provider = [this]() { return _exec_env->cluster_info()->master_fe_addr; }; |
| RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( |
| master_addr_provider, |
| [&request, &result](FrontendServiceConnection& client) { |
| client->loadTxnPreCommit(result, request); |
| }, |
| config::txn_commit_rpc_timeout_ms)); |
| #else |
| result = k_stream_load_commit_result; |
| #endif |
| } |
| g_stream_load_precommit_txn_latency << duration_ns / 1000; |
| // Return if this transaction is precommitted successful; otherwise, we need try |
| // to |
| // rollback this transaction |
| Status status(Status::create(result.status)); |
| if (!status.ok()) { |
| LOG(WARNING) << "precommit transaction failed, errmsg=" << status << ctx->brief(); |
| if (status.is<PUBLISH_TIMEOUT>()) { |
| ctx->need_rollback = false; |
| } |
| ctx->status = status; |
| return status; |
| } |
| // precommit success, set need_rollback to false |
| ctx->need_rollback = false; |
| return Status::OK(); |
| } |
| |
| Status StreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) { |
| TLoadTxn2PCRequest request; |
| set_request_auth(&request, ctx->auth); |
| request.__set_db(ctx->db); |
| request.__set_operation(ctx->txn_operation); |
| request.__set_thrift_rpc_timeout_ms(config::txn_commit_rpc_timeout_ms); |
| request.__set_label(ctx->label); |
| if (ctx->txn_id != doris::StreamLoadContext::default_txn_id) { |
| request.__set_txnId(ctx->txn_id); |
| } |
| |
| TLoadTxn2PCResult result; |
| int64_t duration_ns = 0; |
| { |
| SCOPED_RAW_TIMER(&duration_ns); |
| auto master_addr_provider = [this]() { return _exec_env->cluster_info()->master_fe_addr; }; |
| RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( |
| master_addr_provider, |
| [&request, &result](FrontendServiceConnection& client) { |
| client->loadTxn2PC(result, request); |
| }, |
| config::txn_commit_rpc_timeout_ms)); |
| } |
| g_stream_load_commit_txn_latency << duration_ns / 1000; |
| Status status(Status::create(result.status)); |
| if (!status.ok()) { |
| LOG(WARNING) << "2PC commit transaction failed, errmsg=" << status; |
| return status; |
| } |
| return Status::OK(); |
| } |
| |
| void StreamLoadExecutor::get_commit_request(StreamLoadContext* ctx, |
| TLoadTxnCommitRequest& request) { |
| set_request_auth(&request, ctx->auth); |
| request.__set_db(ctx->db); |
| if (ctx->db_id > 0) { |
| request.__set_db_id(ctx->db_id); |
| } |
| request.__set_tbl(ctx->table); |
| request.__set_txnId(ctx->txn_id); |
| request.__set_sync(true); |
| request.__set_commitInfos(ctx->commit_infos); |
| request.__set_thrift_rpc_timeout_ms(config::txn_commit_rpc_timeout_ms); |
| request.__set_tbls(ctx->table_list); |
| |
| VLOG_DEBUG << "commit txn request:" << apache::thrift::ThriftDebugString(request); |
| |
| // set attachment if has |
| TTxnCommitAttachment attachment; |
| if (collect_load_stat(ctx, &attachment)) { |
| request.__set_txnCommitAttachment(attachment); |
| } |
| } |
| |
| Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) { |
| DBUG_EXECUTE_IF("StreamLoadExecutor.commit_txn.block", DBUG_BLOCK); |
| |
| DorisMetrics::instance()->stream_load_txn_commit_request_total->increment(1); |
| |
| TLoadTxnCommitRequest request; |
| get_commit_request(ctx, request); |
| |
| TLoadTxnCommitResult result; |
| #ifndef BE_TEST |
| auto master_addr_provider = [this]() { return _exec_env->cluster_info()->master_fe_addr; }; |
| RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( |
| master_addr_provider, |
| [&request, &result](FrontendServiceConnection& client) { |
| client->loadTxnCommit(result, request); |
| }, |
| config::txn_commit_rpc_timeout_ms)); |
| #else |
| result = k_stream_load_commit_result; |
| #endif |
| // Return if this transaction is committed successful; otherwise, we need try |
| // to |
| // rollback this transaction |
| Status status(Status::create(result.status)); |
| if (!status.ok()) { |
| LOG(WARNING) << "commit transaction failed, errmsg=" << status << ", " << ctx->brief(); |
| if (status.is<PUBLISH_TIMEOUT>()) { |
| ctx->need_rollback = false; |
| } |
| ctx->status = status; |
| return status; |
| } |
| // commit success, set need_rollback to false |
| ctx->need_rollback = false; |
| return Status::OK(); |
| } |
| |
| void StreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) { |
| DorisMetrics::instance()->stream_load_txn_rollback_request_total->increment(1); |
| |
| TLoadTxnRollbackRequest request; |
| set_request_auth(&request, ctx->auth); |
| request.__set_db(ctx->db); |
| if (ctx->db_id > 0) { |
| request.__set_db_id(ctx->db_id); |
| } |
| request.__set_tbl(ctx->table); |
| request.__set_txnId(ctx->txn_id); |
| request.__set_reason(ctx->status.to_string()); |
| request.__set_tbls(ctx->table_list); |
| request.__set_label(ctx->label); |
| |
| // set attachment if has |
| TTxnCommitAttachment attachment; |
| if (collect_load_stat(ctx, &attachment)) { |
| request.__set_txnCommitAttachment(attachment); |
| } |
| |
| TLoadTxnRollbackResult result; |
| #ifndef BE_TEST |
| auto master_addr_provider = [this]() { return _exec_env->cluster_info()->master_fe_addr; }; |
| auto rpc_st = ThriftRpcHelper::rpc<FrontendServiceClient>( |
| master_addr_provider, [&request, &result](FrontendServiceConnection& client) { |
| client->loadTxnRollback(result, request); |
| }); |
| if (!rpc_st.ok()) { |
| LOG(WARNING) << "transaction rollback failed. errmsg=" << rpc_st << ctx->brief(); |
| } |
| #else |
| result = k_stream_load_rollback_result; |
| #endif |
| } |
| |
| bool StreamLoadExecutor::collect_load_stat(StreamLoadContext* ctx, TTxnCommitAttachment* attach) { |
| if (ctx->load_type != TLoadType::ROUTINE_LOAD && ctx->load_type != TLoadType::MINI_LOAD) { |
| // currently, only routine load and mini load need to be set attachment |
| return false; |
| } |
| switch (ctx->load_type) { |
| case TLoadType::MINI_LOAD: { |
| throw Exception(Status::FatalError("mini load is not supported any more")); |
| } |
| case TLoadType::ROUTINE_LOAD: { |
| attach->loadType = TLoadType::ROUTINE_LOAD; |
| |
| TRLTaskTxnCommitAttachment rl_attach; |
| rl_attach.jobId = ctx->job_id; |
| rl_attach.id = ctx->id.to_thrift(); |
| rl_attach.__set_loadedRows(ctx->number_loaded_rows); |
| rl_attach.__set_filteredRows(ctx->number_filtered_rows); |
| rl_attach.__set_unselectedRows(ctx->number_unselected_rows); |
| rl_attach.__set_receivedBytes(ctx->receive_bytes); |
| rl_attach.__set_loadedBytes(ctx->loaded_bytes); |
| rl_attach.__set_loadCostMs(ctx->load_cost_millis); |
| |
| attach->rlTaskTxnCommitAttachment = rl_attach; |
| attach->__isset.rlTaskTxnCommitAttachment = true; |
| break; |
| } |
| default: |
| // unknown load type, should not happened |
| return false; |
| } |
| |
| switch (ctx->load_src_type) { |
| case TLoadSourceType::KAFKA: { |
| TRLTaskTxnCommitAttachment& rl_attach = attach->rlTaskTxnCommitAttachment; |
| rl_attach.loadSourceType = TLoadSourceType::KAFKA; |
| |
| TKafkaRLTaskProgress kafka_progress; |
| kafka_progress.partitionCmtOffset = ctx->kafka_info->cmt_offset; |
| |
| rl_attach.kafkaRLTaskProgress = kafka_progress; |
| rl_attach.__isset.kafkaRLTaskProgress = true; |
| if (!ctx->error_url.empty()) { |
| rl_attach.__set_errorLogUrl(ctx->error_url); |
| } |
| return true; |
| } |
| default: |
| return true; |
| } |
| return false; |
| } |
| |
| } // namespace doris |