blob: 495ddf38bd933c88a9e4d7a919671dce4be66a4e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "http/action/http_stream.h"
#include <cstddef>
#include <future>
#include <sstream>
// use string iequal
#include <event2/buffer.h>
#include <event2/bufferevent.h>
#include <event2/http.h>
#include <rapidjson/prettywriter.h>
#include <thrift/protocol/TDebugProtocol.h>
#include "cloud/config.h"
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "common/utils.h"
#include "gen_cpp/FrontendService.h"
#include "gen_cpp/FrontendService_types.h"
#include "gen_cpp/HeartbeatService_types.h"
#include "http/http_channel.h"
#include "http/http_common.h"
#include "http/http_headers.h"
#include "http/http_request.h"
#include "http/utils.h"
#include "io/fs/stream_load_pipe.h"
#include "olap/storage_engine.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "runtime/group_commit_mgr.h"
#include "runtime/load_path_mgr.h"
#include "runtime/stream_load/new_load_stream_mgr.h"
#include "runtime/stream_load/stream_load_context.h"
#include "runtime/stream_load/stream_load_executor.h"
#include "runtime/stream_load/stream_load_recorder.h"
#include "util/byte_buffer.h"
#include "util/doris_metrics.h"
#include "util/metrics.h"
#include "util/string_util.h"
#include "util/thrift_rpc_helper.h"
#include "util/time.h"
#include "util/uid_util.h"
namespace doris {
using namespace ErrorCode;
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(http_stream_requests_total, MetricUnit::REQUESTS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(http_stream_duration_ms, MetricUnit::MILLISECONDS);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(http_stream_current_processing, MetricUnit::REQUESTS);
HttpStreamAction::HttpStreamAction(ExecEnv* exec_env) : _exec_env(exec_env) {
_http_stream_entity =
DorisMetrics::instance()->metric_registry()->register_entity("http_stream");
INT_COUNTER_METRIC_REGISTER(_http_stream_entity, http_stream_requests_total);
INT_COUNTER_METRIC_REGISTER(_http_stream_entity, http_stream_duration_ms);
INT_GAUGE_METRIC_REGISTER(_http_stream_entity, http_stream_current_processing);
}
HttpStreamAction::~HttpStreamAction() {
DorisMetrics::instance()->metric_registry()->deregister_entity(_http_stream_entity);
}
void HttpStreamAction::handle(HttpRequest* req) {
std::shared_ptr<StreamLoadContext> ctx =
std::static_pointer_cast<StreamLoadContext>(req->handler_ctx());
if (ctx == nullptr) {
return;
}
// status already set to fail
if (ctx->status.ok()) {
ctx->status = _handle(req, ctx);
if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
LOG(WARNING) << "handle streaming load failed, id=" << ctx->id
<< ", errmsg=" << ctx->status;
}
}
ctx->load_cost_millis = UnixMillis() - ctx->start_millis;
if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
if (ctx->body_sink != nullptr) {
ctx->body_sink->cancel(ctx->status.to_string());
}
}
if (!ctx->status.ok()) {
auto str = std::string(ctx->to_json());
// add new line at end
str = str + '\n';
HttpChannel::send_reply(req, str);
return;
}
auto str = std::string(ctx->to_json());
// add new line at end
str = str + '\n';
HttpChannel::send_reply(req, str);
if (config::enable_stream_load_record) {
str = ctx->prepare_stream_load_record(str);
_save_stream_load_record(ctx, str);
}
// update statistics
http_stream_requests_total->increment(1);
http_stream_duration_ms->increment(ctx->load_cost_millis);
}
Status HttpStreamAction::_handle(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx) {
if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) {
LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes
<< ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id;
return Status::Error<ErrorCode::NETWORK_ERROR>("receive body don't equal with body bytes");
}
RETURN_IF_ERROR(ctx->body_sink->finish());
// wait stream load finish
RETURN_IF_ERROR(ctx->future.get());
if (ctx->group_commit) {
LOG(INFO) << "skip commit because this is group commit, pipe_id=" << ctx->id.to_string();
return Status::OK();
}
if (ctx->two_phase_commit) {
int64_t pre_commit_start_time = MonotonicNanos();
RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx.get()));
ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - pre_commit_start_time;
} else {
// If put file success we need commit this load
int64_t commit_and_publish_start_time = MonotonicNanos();
RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get()));
ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - commit_and_publish_start_time;
}
return Status::OK();
}
int HttpStreamAction::on_header(HttpRequest* req) {
http_stream_current_processing->increment(1);
std::shared_ptr<StreamLoadContext> ctx = std::make_shared<StreamLoadContext>(_exec_env);
req->set_handler_ctx(ctx);
ctx->load_type = TLoadType::MANUL_LOAD;
ctx->load_src_type = TLoadSourceType::RAW;
ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true";
Status st = _handle_group_commit(req, ctx);
LOG(INFO) << "new income streaming load request." << ctx->brief()
<< " sql : " << req->header(HTTP_SQL) << ", group_commit=" << ctx->group_commit;
if (st.ok()) {
st = _on_header(req, ctx);
}
if (!st.ok()) {
ctx->status = std::move(st);
if (ctx->body_sink != nullptr) {
ctx->body_sink->cancel(ctx->status.to_string());
}
auto str = ctx->to_json();
// add new line at end
str = str + '\n';
HttpChannel::send_reply(req, str);
if (config::enable_stream_load_record) {
str = ctx->prepare_stream_load_record(str);
_save_stream_load_record(ctx, str);
}
return -1;
}
return 0;
}
Status HttpStreamAction::_on_header(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx) {
// auth information
if (!parse_basic_auth(*http_req, &ctx->auth)) {
LOG(WARNING) << "parse basic authorization failed." << ctx->brief();
return Status::NotAuthorized("no valid Basic authorization");
}
// TODO(zs) : need Need to request an FE to obtain information such as format
// check content length
ctx->body_bytes = 0;
size_t csv_max_body_bytes = config::streaming_load_max_mb * 1024 * 1024;
if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
try {
ctx->body_bytes = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
} catch (const std::exception& e) {
return Status::InvalidArgument("invalid HTTP header CONTENT_LENGTH={}: {}",
http_req->header(HttpHeaders::CONTENT_LENGTH), e.what());
}
// csv max body size
if (ctx->body_bytes > csv_max_body_bytes) {
LOG(WARNING) << "body exceed max size." << ctx->brief();
return Status::Error<ErrorCode::EXCEEDED_LIMIT>(
"body size {} exceed BE's conf `streaming_load_max_mb` {}. increase it if you "
"are sure this load is reasonable",
ctx->body_bytes, csv_max_body_bytes);
}
}
auto pipe = std::make_shared<io::StreamLoadPipe>(
io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */,
ctx->body_bytes /* total_length */);
ctx->body_sink = pipe;
ctx->pipe = pipe;
RETURN_IF_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, ctx));
// Here, transactions are set from fe's NativeInsertStmt.
// TODO(zs) : How to support two_phase_commit
return Status::OK();
}
void HttpStreamAction::on_chunk_data(HttpRequest* req) {
std::shared_ptr<StreamLoadContext> ctx =
std::static_pointer_cast<StreamLoadContext>(req->handler_ctx());
if (ctx == nullptr || !ctx->status.ok()) {
return;
}
if (!req->header(HTTP_WAL_ID_KY).empty()) {
ctx->wal_id = std::stoll(req->header(HTTP_WAL_ID_KY));
}
struct evhttp_request* ev_req = req->get_evhttp_request();
auto evbuf = evhttp_request_get_input_buffer(ev_req);
// In HttpStreamAction::on_chunk_data
// -> process_put
// -> StreamLoadExecutor::execute_plan_fragment
// -> exec_plan_fragment
// , SCOPED_ATTACH_TASK will be called.
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->stream_load_pipe_tracker());
int64_t start_read_data_time = MonotonicNanos();
Status st = ctx->allocate_schema_buffer();
if (!st.ok()) {
ctx->status = st;
return;
}
while (evbuffer_get_length(evbuf) > 0) {
ByteBufferPtr bb;
st = ByteBuffer::allocate(128 * 1024, &bb);
if (!st.ok()) {
ctx->status = st;
return;
}
auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
bb->pos = remove_bytes;
bb->flip();
st = ctx->body_sink->append(bb);
// schema_buffer stores 1M of data for parsing column information
// need to determine whether to cache for the first time
if (ctx->is_read_schema) {
if (ctx->schema_buffer()->pos + remove_bytes < config::stream_tvf_buffer_size) {
ctx->schema_buffer()->put_bytes(bb->ptr, remove_bytes);
} else {
LOG(INFO) << "use a portion of data to request fe to obtain column information";
ctx->is_read_schema = false;
ctx->status = process_put(req, ctx);
}
}
if (!st.ok()) {
LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief();
ctx->status = st;
return;
}
ctx->receive_bytes += remove_bytes;
}
// after all the data has been read and it has not reached 1M, it will execute here
if (ctx->is_read_schema) {
LOG(INFO) << "after all the data has been read and it has not reached 1M, it will execute "
<< "here";
ctx->is_read_schema = false;
ctx->status = process_put(req, ctx);
}
ctx->read_data_cost_nanos += (MonotonicNanos() - start_read_data_time);
}
void HttpStreamAction::free_handler_ctx(std::shared_ptr<void> param) {
std::shared_ptr<StreamLoadContext> ctx = std::static_pointer_cast<StreamLoadContext>(param);
if (ctx == nullptr) {
return;
}
// sender is gone, make receiver know it
if (ctx->body_sink != nullptr) {
ctx->body_sink->cancel("sender is gone");
}
// remove stream load context from stream load manager and the resource will be released
ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
http_stream_current_processing->increment(-1);
}
Status HttpStreamAction::process_put(HttpRequest* http_req,
std::shared_ptr<StreamLoadContext> ctx) {
TStreamLoadPutRequest request;
if (http_req != nullptr) {
request.__set_load_sql(http_req->header(HTTP_SQL));
if (!http_req->header(HTTP_MEMTABLE_ON_SINKNODE).empty()) {
bool value = iequal(http_req->header(HTTP_MEMTABLE_ON_SINKNODE), "true");
request.__set_memtable_on_sink_node(value);
}
} else {
request.__set_token(ctx->auth.token);
request.__set_load_sql(ctx->sql_str);
ctx->auth.token = "";
}
set_request_auth(&request, ctx->auth);
request.__set_loadId(ctx->id.to_thrift());
request.__set_label(ctx->label);
if (ctx->group_commit) {
if (!http_req->header(HTTP_GROUP_COMMIT).empty()) {
request.__set_group_commit_mode(http_req->header(HTTP_GROUP_COMMIT));
} else {
// used for wait_internal_group_commit_finish
request.__set_group_commit_mode("sync_mode");
}
}
if (_exec_env->cluster_info()->backend_id != 0) {
request.__set_backend_id(_exec_env->cluster_info()->backend_id);
} else {
LOG(WARNING) << "_exec_env->cluster_info not set backend_id";
}
if (ctx->wal_id > 0) {
request.__set_partial_update(false);
}
// plan this load
TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
int64_t stream_load_put_start_time = MonotonicNanos();
RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&request, ctx](FrontendServiceConnection& client) {
client->streamLoadPut(ctx->put_result, request);
}));
ctx->stream_load_put_cost_nanos = MonotonicNanos() - stream_load_put_start_time;
Status plan_status(Status::create(ctx->put_result.status));
if (!plan_status.ok()) {
LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status << ctx->brief();
return plan_status;
}
if (config::is_cloud_mode() && ctx->two_phase_commit && ctx->is_mow_table()) {
return Status::NotSupported("http stream 2pc is unsupported for mow table");
}
ctx->db = ctx->put_result.pipeline_params.db_name;
ctx->table = ctx->put_result.pipeline_params.table_name;
ctx->txn_id = ctx->put_result.pipeline_params.txn_conf.txn_id;
ctx->label = ctx->put_result.pipeline_params.import_label;
ctx->put_result.pipeline_params.__set_wal_id(ctx->wal_id);
if (http_req != nullptr && http_req->header(HTTP_GROUP_COMMIT) == "async_mode") {
// FIXME find a way to avoid chunked stream load write large WALs
size_t content_length = 0;
if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
try {
content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
} catch (const std::exception& e) {
return Status::InvalidArgument("invalid HTTP header CONTENT_LENGTH={}: {}",
http_req->header(HttpHeaders::CONTENT_LENGTH),
e.what());
}
if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
ctx->format == TFileFormatType::FORMAT_CSV_LZO ||
ctx->format == TFileFormatType::FORMAT_CSV_BZ2 ||
ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME ||
ctx->format == TFileFormatType::FORMAT_CSV_LZOP ||
ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK ||
ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
content_length *= 3;
}
}
ctx->put_result.pipeline_params.__set_content_length(content_length);
}
TPipelineFragmentParamsList mocked;
return _exec_env->stream_load_executor()->execute_plan_fragment(ctx, mocked);
}
void HttpStreamAction::_save_stream_load_record(std::shared_ptr<StreamLoadContext> ctx,
const std::string& str) {
std::shared_ptr<StreamLoadRecorder> stream_load_recorder =
ExecEnv::GetInstance()->storage_engine().get_stream_load_recorder();
if (stream_load_recorder != nullptr) {
std::string key =
std::to_string(ctx->start_millis + ctx->load_cost_millis) + "_" + ctx->label;
auto st = stream_load_recorder->put(key, str);
if (st.ok()) {
LOG(INFO) << "put stream_load_record rocksdb successfully. label: " << ctx->label
<< ", key: " << key;
}
} else {
LOG(WARNING) << "put stream_load_record rocksdb failed. stream_load_recorder is null.";
}
}
Status HttpStreamAction::_handle_group_commit(HttpRequest* req,
std::shared_ptr<StreamLoadContext> ctx) {
std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT);
if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode") &&
!iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode, "off_mode")) {
return Status::InvalidArgument(
"group_commit can only be [async_mode, sync_mode, off_mode]");
}
if (config::wait_internal_group_commit_finish) {
group_commit_mode = "sync_mode";
}
int64_t content_length = req->header(HttpHeaders::CONTENT_LENGTH).empty()
? 0
: std::stoll(req->header(HttpHeaders::CONTENT_LENGTH));
if (content_length < 0) {
std::stringstream ss;
ss << "This http load content length <0 (" << content_length
<< "), please check your content length.";
LOG(WARNING) << ss.str();
return Status::InvalidArgument(ss.str());
}
// allow chunked stream load in flink
auto is_chunk =
!req->header(HttpHeaders::TRANSFER_ENCODING).empty() &&
req->header(HttpHeaders::TRANSFER_ENCODING).find("chunked") != std::string::npos;
if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode") ||
(content_length == 0 && !is_chunk)) {
// off_mode and empty
ctx->group_commit = false;
return Status::OK();
}
if (is_chunk) {
ctx->label = "";
}
auto partial_columns = !req->header(HTTP_PARTIAL_COLUMNS).empty() &&
iequal(req->header(HTTP_PARTIAL_COLUMNS), "true");
auto temp_partitions = !req->header(HTTP_TEMP_PARTITIONS).empty();
auto partitions = !req->header(HTTP_PARTITIONS).empty();
if (!partial_columns && !partitions && !temp_partitions && !ctx->two_phase_commit) {
if (!config::wait_internal_group_commit_finish && !ctx->label.empty()) {
return Status::InvalidArgument("label and group_commit can't be set at the same time");
}
ctx->group_commit = true;
if (iequal(group_commit_mode, "async_mode")) {
if (!load_size_smaller_than_wal_limit(content_length)) {
std::stringstream ss;
ss << "There is no space for group commit http load async WAL. This http load "
"size is "
<< content_length << ". WAL dir info: "
<< ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string();
LOG(WARNING) << ss.str();
return Status::Error<EXCEEDED_LIMIT>(ss.str());
}
}
}
return Status::OK();
}
} // namespace doris