blob: 42ac15ea51ac4350fa98a61f7403e58e2ac3862b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/load_stream.h"
#include <brpc/stream.h>
#include <bthread/bthread.h>
#include <bthread/condition_variable.h>
#include <bthread/mutex.h>
#include <olap/rowset/rowset_factory.h>
#include <olap/rowset/rowset_meta.h>
#include <olap/storage_engine.h>
#include <olap/tablet_manager.h>
#include <runtime/exec_env.h>
#include <memory>
#include <sstream>
#include "bvar/bvar.h"
#include "cloud/config.h"
#include "common/signal_handler.h"
#include "exec/tablet_info.h"
#include "olap/tablet.h"
#include "olap/tablet_fwd.h"
#include "olap/tablet_schema.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "runtime/load_channel.h"
#include "runtime/load_stream_mgr.h"
#include "runtime/load_stream_writer.h"
#include "runtime/workload_group/workload_group_manager.h"
#include "util/debug_points.h"
#include "util/runtime_profile.h"
#include "util/thrift_util.h"
#include "util/uid_util.h"
#define UNKNOWN_ID_FOR_TEST 0x7c00
namespace doris {
#include "common/compile_check_begin.h"
bvar::Adder<int64_t> g_load_stream_cnt("load_stream_count");
bvar::LatencyRecorder g_load_stream_flush_wait_ms("load_stream_flush_wait_ms");
bvar::Adder<int> g_load_stream_flush_running_threads("load_stream_flush_wait_threads");
TabletStream::TabletStream(const PUniqueId& load_id, int64_t id, int64_t txn_id,
LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile)
: _id(id),
_next_segid(0),
_load_id(load_id),
_txn_id(txn_id),
_load_stream_mgr(load_stream_mgr) {
load_stream_mgr->create_token(_flush_token);
_profile = profile->create_child(fmt::format("TabletStream {}", id), true, true);
_append_data_timer = ADD_TIMER(_profile, "AppendDataTime");
_add_segment_timer = ADD_TIMER(_profile, "AddSegmentTime");
_close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime");
}
inline std::ostream& operator<<(std::ostream& ostr, const TabletStream& tablet_stream) {
ostr << "load_id=" << print_id(tablet_stream._load_id) << ", txn_id=" << tablet_stream._txn_id
<< ", tablet_id=" << tablet_stream._id << ", status=" << tablet_stream._status.status();
return ostr;
}
Status TabletStream::init(std::shared_ptr<OlapTableSchemaParam> schema, int64_t index_id,
int64_t partition_id) {
WriteRequest req {
.tablet_id = _id,
.txn_id = _txn_id,
.index_id = index_id,
.partition_id = partition_id,
.load_id = _load_id,
.table_schema_param = schema,
// TODO(plat1ko): write_file_cache
.storage_vault_id {},
};
_load_stream_writer = std::make_shared<LoadStreamWriter>(&req, _profile);
DBUG_EXECUTE_IF("TabletStream.init.uninited_writer", {
_status.update(Status::Uninitialized("fault injection"));
return _status.status();
});
_status.update(_load_stream_writer->init());
if (!_status.ok()) {
LOG(INFO) << "failed to init rowset builder due to " << *this;
}
return _status.status();
}
Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data) {
if (!_status.ok()) {
return _status.status();
}
// dispatch add_segment request
if (header.opcode() == PStreamHeader::ADD_SEGMENT) {
return add_segment(header, data);
}
SCOPED_TIMER(_append_data_timer);
int64_t src_id = header.src_id();
uint32_t segid = header.segment_id();
// Ensure there are enough space and mapping are built.
SegIdMapping* mapping = nullptr;
{
std::lock_guard lock_guard(_lock);
if (!_segids_mapping.contains(src_id)) {
_segids_mapping[src_id] = std::make_unique<SegIdMapping>();
}
mapping = _segids_mapping[src_id].get();
}
if (segid + 1 > mapping->size()) {
// TODO: Each sender lock is enough.
std::lock_guard lock_guard(_lock);
ssize_t origin_size = mapping->size();
if (segid + 1 > origin_size) {
mapping->resize(segid + 1, std::numeric_limits<uint32_t>::max());
for (size_t index = origin_size; index <= segid; index++) {
mapping->at(index) = _next_segid;
_next_segid++;
VLOG_DEBUG << "src_id=" << src_id << ", segid=" << index << " to "
<< " segid=" << _next_segid - 1 << ", " << *this;
}
}
}
// Each sender sends data in one segment sequential, so we also do not
// need a lock here.
bool eos = header.segment_eos();
FileType file_type = header.file_type();
uint32_t new_segid = mapping->at(segid);
DCHECK(new_segid != std::numeric_limits<uint32_t>::max());
butil::IOBuf buf = data->movable();
auto flush_func = [this, new_segid, eos, buf, header, file_type]() mutable {
signal::set_signal_task_id(_load_id);
g_load_stream_flush_running_threads << -1;
auto st = _load_stream_writer->append_data(new_segid, header.offset(), buf, file_type);
if (!st.ok() && !config::is_cloud_mode()) {
auto res = ExecEnv::get_tablet(_id);
TabletSharedPtr tablet =
res.has_value() ? std::dynamic_pointer_cast<Tablet>(res.value()) : nullptr;
if (tablet) {
tablet->report_error(st);
}
}
if (eos && st.ok()) {
DBUG_EXECUTE_IF("TabletStream.append_data.unknown_file_type",
{ file_type = static_cast<FileType>(-1); });
if (file_type == FileType::SEGMENT_FILE || file_type == FileType::INVERTED_INDEX_FILE) {
st = _load_stream_writer->close_writer(new_segid, file_type);
} else {
st = Status::InternalError(
"appent data failed, file type error, file type = {}, "
"segment_id={}",
file_type, new_segid);
}
}
DBUG_EXECUTE_IF("TabletStream.append_data.append_failed",
{ st = Status::InternalError("fault injection"); });
if (!st.ok()) {
_status.update(st);
LOG(WARNING) << "write data failed " << st << ", " << *this;
}
};
auto load_stream_flush_token_max_tasks = config::load_stream_flush_token_max_tasks;
auto load_stream_max_wait_flush_token_time_ms =
config::load_stream_max_wait_flush_token_time_ms;
DBUG_EXECUTE_IF("TabletStream.append_data.long_wait", {
load_stream_flush_token_max_tasks = 0;
load_stream_max_wait_flush_token_time_ms = 1000;
});
MonotonicStopWatch timer;
timer.start();
while (_flush_token->num_tasks() >= load_stream_flush_token_max_tasks) {
if (timer.elapsed_time() / 1000 / 1000 >= load_stream_max_wait_flush_token_time_ms) {
_status.update(
Status::Error<true>("wait flush token back pressure time is more than "
"load_stream_max_wait_flush_token_time {}",
load_stream_max_wait_flush_token_time_ms));
return _status.status();
}
bthread_usleep(2 * 1000); // 2ms
}
timer.stop();
int64_t time_ms = timer.elapsed_time() / 1000 / 1000;
g_load_stream_flush_wait_ms << time_ms;
g_load_stream_flush_running_threads << 1;
Status st = Status::OK();
DBUG_EXECUTE_IF("TabletStream.append_data.submit_func_failed",
{ st = Status::InternalError("fault injection"); });
if (st.ok()) {
st = _flush_token->submit_func(flush_func);
}
if (!st.ok()) {
_status.update(st);
}
return _status.status();
}
Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data) {
if (!_status.ok()) {
return _status.status();
}
SCOPED_TIMER(_add_segment_timer);
DCHECK(header.has_segment_statistics());
SegmentStatistics stat(header.segment_statistics());
int64_t src_id = header.src_id();
uint32_t segid = header.segment_id();
uint32_t new_segid;
DBUG_EXECUTE_IF("TabletStream.add_segment.unknown_segid", { segid = UNKNOWN_ID_FOR_TEST; });
{
std::lock_guard lock_guard(_lock);
if (!_segids_mapping.contains(src_id)) {
_status.update(Status::InternalError(
"add segment failed, no segment written by this src be yet, src_id={}, "
"segment_id={}",
src_id, segid));
return _status.status();
}
DBUG_EXECUTE_IF("TabletStream.add_segment.segid_never_written",
{ segid = static_cast<uint32_t>(_segids_mapping[src_id]->size()); });
if (segid >= _segids_mapping[src_id]->size()) {
_status.update(Status::InternalError(
"add segment failed, segment is never written, src_id={}, segment_id={}",
src_id, segid));
return _status.status();
}
new_segid = _segids_mapping[src_id]->at(segid);
}
DCHECK(new_segid != std::numeric_limits<uint32_t>::max());
auto add_segment_func = [this, new_segid, stat]() {
signal::set_signal_task_id(_load_id);
auto st = _load_stream_writer->add_segment(new_segid, stat);
DBUG_EXECUTE_IF("TabletStream.add_segment.add_segment_failed",
{ st = Status::InternalError("fault injection"); });
if (!st.ok()) {
_status.update(st);
LOG(INFO) << "add segment failed " << *this;
}
};
Status st = Status::OK();
DBUG_EXECUTE_IF("TabletStream.add_segment.submit_func_failed",
{ st = Status::InternalError("fault injection"); });
if (st.ok()) {
st = _flush_token->submit_func(add_segment_func);
}
if (!st.ok()) {
_status.update(st);
}
return _status.status();
}
Status TabletStream::_run_in_heavy_work_pool(std::function<Status()> fn) {
bthread::Mutex mu;
std::unique_lock<bthread::Mutex> lock(mu);
bthread::ConditionVariable cv;
auto st = Status::OK();
auto func = [this, &mu, &cv, &st, &fn] {
signal::set_signal_task_id(_load_id);
st = fn();
std::lock_guard<bthread::Mutex> lock(mu);
cv.notify_one();
};
bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(func);
if (!ret) {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
"there is not enough thread resource for close load");
}
cv.wait(lock);
return st;
}
void TabletStream::pre_close() {
if (!_status.ok()) {
// cancel all pending tasks, wait all running tasks to finish
_flush_token->shutdown();
return;
}
SCOPED_TIMER(_close_wait_timer);
_status.update(_run_in_heavy_work_pool([this]() {
_flush_token->wait();
return Status::OK();
}));
// it is necessary to check status after wait_func,
// for create_rowset could fail during add_segment when loading to MOW table,
// in this case, should skip close to avoid submit_calc_delete_bitmap_task which could cause coredump.
if (!_status.ok()) {
return;
}
DBUG_EXECUTE_IF("TabletStream.close.segment_num_mismatch", { _num_segments++; });
if (_check_num_segments && (_next_segid.load() != _num_segments)) {
_status.update(Status::Corruption(
"segment num mismatch in tablet {}, expected: {}, actual: {}, load_id: {}", _id,
_num_segments, _next_segid.load(), print_id(_load_id)));
return;
}
_status.update(_run_in_heavy_work_pool([this]() { return _load_stream_writer->pre_close(); }));
}
Status TabletStream::close() {
if (!_status.ok()) {
return _status.status();
}
SCOPED_TIMER(_close_wait_timer);
_status.update(_run_in_heavy_work_pool([this]() { return _load_stream_writer->close(); }));
return _status.status();
}
IndexStream::IndexStream(const PUniqueId& load_id, int64_t id, int64_t txn_id,
std::shared_ptr<OlapTableSchemaParam> schema,
LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile)
: _id(id),
_load_id(load_id),
_txn_id(txn_id),
_schema(schema),
_load_stream_mgr(load_stream_mgr) {
_profile = profile->create_child(fmt::format("IndexStream {}", id), true, true);
_append_data_timer = ADD_TIMER(_profile, "AppendDataTime");
_close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime");
}
Status IndexStream::append_data(const PStreamHeader& header, butil::IOBuf* data) {
SCOPED_TIMER(_append_data_timer);
int64_t tablet_id = header.tablet_id();
TabletStreamSharedPtr tablet_stream;
{
std::lock_guard lock_guard(_lock);
auto it = _tablet_streams_map.find(tablet_id);
if (it == _tablet_streams_map.end()) {
_init_tablet_stream(tablet_stream, tablet_id, header.partition_id());
} else {
tablet_stream = it->second;
}
}
return tablet_stream->append_data(header, data);
}
void IndexStream::_init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id,
int64_t partition_id) {
tablet_stream = std::make_shared<TabletStream>(_load_id, tablet_id, _txn_id, _load_stream_mgr,
_profile);
_tablet_streams_map[tablet_id] = tablet_stream;
auto st = tablet_stream->init(_schema, _id, partition_id);
if (!st.ok()) {
LOG(WARNING) << "tablet stream init failed " << *tablet_stream;
}
}
void IndexStream::close(const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablets) {
std::lock_guard lock_guard(_lock);
SCOPED_TIMER(_close_wait_timer);
// open all need commit tablets
for (const auto& tablet : tablets_to_commit) {
if (_id != tablet.index_id()) {
continue;
}
TabletStreamSharedPtr tablet_stream;
auto it = _tablet_streams_map.find(tablet.tablet_id());
if (it == _tablet_streams_map.end()) {
_init_tablet_stream(tablet_stream, tablet.tablet_id(), tablet.partition_id());
} else {
tablet_stream = it->second;
}
if (tablet.has_num_segments()) {
tablet_stream->add_num_segments(tablet.num_segments());
} else {
// for compatibility reasons (sink from old version BE)
tablet_stream->disable_num_segments_check();
}
}
for (auto& [_, tablet_stream] : _tablet_streams_map) {
tablet_stream->pre_close();
}
for (auto& [_, tablet_stream] : _tablet_streams_map) {
auto st = tablet_stream->close();
if (st.ok()) {
success_tablet_ids->push_back(tablet_stream->id());
} else {
LOG(INFO) << "close tablet stream " << *tablet_stream << ", status=" << st;
failed_tablets->emplace_back(tablet_stream->id(), st);
}
}
}
// TODO: Profile is temporary disabled, because:
// 1. It's not being processed by the upstream for now
// 2. There are some problems in _profile->to_thrift()
LoadStream::LoadStream(const PUniqueId& load_id, LoadStreamMgr* load_stream_mgr,
bool enable_profile)
: _load_id(load_id), _enable_profile(false), _load_stream_mgr(load_stream_mgr) {
g_load_stream_cnt << 1;
_profile = std::make_unique<RuntimeProfile>("LoadStream");
_append_data_timer = ADD_TIMER(_profile, "AppendDataTime");
_close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime");
TUniqueId load_tid = ((UniqueId)load_id).to_thrift();
#ifndef BE_TEST
std::shared_ptr<QueryContext> query_context =
ExecEnv::GetInstance()->fragment_mgr()->get_query_ctx(load_tid);
if (query_context != nullptr) {
_resource_ctx = query_context->resource_ctx();
} else {
_resource_ctx = ResourceContext::create_shared();
_resource_ctx->task_controller()->set_task_id(load_tid);
std::shared_ptr<MemTrackerLimiter> mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::LOAD,
fmt::format("(FromLoadStream)Load#Id={}", ((UniqueId)load_id).to_string()));
_resource_ctx->memory_context()->set_mem_tracker(mem_tracker);
}
#else
_resource_ctx = ResourceContext::create_shared();
_resource_ctx->task_controller()->set_task_id(load_tid);
std::shared_ptr<MemTrackerLimiter> mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::LOAD,
fmt::format("(FromLoadStream)Load#Id={}", ((UniqueId)load_id).to_string()));
_resource_ctx->memory_context()->set_mem_tracker(mem_tracker);
#endif
}
LoadStream::~LoadStream() {
g_load_stream_cnt << -1;
LOG(INFO) << "load stream is deconstructed " << *this;
}
Status LoadStream::init(const POpenLoadStreamRequest* request) {
_txn_id = request->txn_id();
_total_streams = static_cast<int32_t>(request->total_streams());
_is_incremental = (_total_streams == 0);
_schema = std::make_shared<OlapTableSchemaParam>();
RETURN_IF_ERROR(_schema->init(request->schema()));
for (auto& index : request->schema().indexes()) {
_index_streams_map[index.id()] = std::make_shared<IndexStream>(
_load_id, index.id(), _txn_id, _schema, _load_stream_mgr, _profile.get());
}
LOG(INFO) << "succeed to init load stream " << *this;
return Status::OK();
}
bool LoadStream::close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablets) {
std::lock_guard<bthread::Mutex> lock_guard(_lock);
SCOPED_TIMER(_close_wait_timer);
// we do nothing until recv CLOSE_LOAD from all stream to ensure all data are handled before ack
_open_streams[src_id]--;
if (_open_streams[src_id] == 0) {
_open_streams.erase(src_id);
}
_close_load_cnt++;
LOG(INFO) << "received CLOSE_LOAD from sender " << src_id << ", remaining "
<< _total_streams - _close_load_cnt << " senders, " << *this;
_tablets_to_commit.insert(_tablets_to_commit.end(), tablets_to_commit.begin(),
tablets_to_commit.end());
if (_close_load_cnt < _total_streams) {
// do not return commit info if there is remaining streams.
return false;
}
for (auto& [_, index_stream] : _index_streams_map) {
index_stream->close(_tablets_to_commit, success_tablet_ids, failed_tablets);
}
LOG(INFO) << "close load " << *this << ", success_tablet_num=" << success_tablet_ids->size()
<< ", failed_tablet_num=" << failed_tablets->size();
return true;
}
void LoadStream::_report_result(StreamId stream, const Status& status,
const std::vector<int64_t>& success_tablet_ids,
const FailedTablets& failed_tablets, bool eos) {
LOG(INFO) << "report result " << *this << ", success tablet num " << success_tablet_ids.size()
<< ", failed tablet num " << failed_tablets.size();
butil::IOBuf buf;
PLoadStreamResponse response;
response.set_eos(eos);
status.to_protobuf(response.mutable_status());
for (auto& id : success_tablet_ids) {
response.add_success_tablet_ids(id);
}
for (auto& [id, st] : failed_tablets) {
auto pb = response.add_failed_tablets();
pb->set_id(id);
st.to_protobuf(pb->mutable_status());
}
if (_enable_profile && _close_load_cnt == _total_streams) {
TRuntimeProfileTree tprofile;
ThriftSerializer ser(false, 4096);
uint8_t* profile_buf = nullptr;
uint32_t len = 0;
std::unique_lock<bthread::Mutex> l(_lock);
_profile->to_thrift(&tprofile);
auto st = ser.serialize(&tprofile, &len, &profile_buf);
if (st.ok()) {
response.set_load_stream_profile(profile_buf, len);
} else {
LOG(WARNING) << "TRuntimeProfileTree serialize failed, errmsg=" << st << ", " << *this;
}
}
buf.append(response.SerializeAsString());
auto wst = _write_stream(stream, buf);
if (!wst.ok()) {
LOG(WARNING) << " report result failed with " << wst << ", " << *this;
}
}
void LoadStream::_report_schema(StreamId stream, const PStreamHeader& hdr) {
butil::IOBuf buf;
PLoadStreamResponse response;
Status st = Status::OK();
for (const auto& req : hdr.tablets()) {
BaseTabletSPtr tablet;
if (auto res = ExecEnv::get_tablet(req.tablet_id()); res.has_value()) {
tablet = std::move(res).value();
} else {
st = std::move(res).error();
break;
}
auto* resp = response.add_tablet_schemas();
resp->set_index_id(req.index_id());
resp->set_enable_unique_key_merge_on_write(tablet->enable_unique_key_merge_on_write());
tablet->tablet_schema()->to_schema_pb(resp->mutable_tablet_schema());
}
st.to_protobuf(response.mutable_status());
buf.append(response.SerializeAsString());
auto wst = _write_stream(stream, buf);
if (!wst.ok()) {
LOG(WARNING) << " report result failed with " << wst << ", " << *this;
}
}
Status LoadStream::_write_stream(StreamId stream, butil::IOBuf& buf) {
for (;;) {
int ret = 0;
DBUG_EXECUTE_IF("LoadStream._write_stream.EAGAIN", { ret = EAGAIN; });
if (ret == 0) {
ret = brpc::StreamWrite(stream, buf);
}
switch (ret) {
case 0:
return Status::OK();
case EAGAIN: {
const timespec time = butil::seconds_from_now(config::load_stream_eagain_wait_seconds);
int wait_ret = brpc::StreamWait(stream, &time);
if (wait_ret != 0) {
return Status::InternalError("StreamWait failed, err={}", wait_ret);
}
break;
}
default:
return Status::InternalError("StreamWrite failed, err={}", ret);
}
}
return Status::OK();
}
void LoadStream::_parse_header(butil::IOBuf* const message, PStreamHeader& hdr) {
butil::IOBufAsZeroCopyInputStream wrapper(*message);
hdr.ParseFromZeroCopyStream(&wrapper);
VLOG_DEBUG << "header parse result: " << hdr.DebugString();
}
Status LoadStream::_append_data(const PStreamHeader& header, butil::IOBuf* data) {
SCOPED_TIMER(_append_data_timer);
IndexStreamSharedPtr index_stream;
int64_t index_id = header.index_id();
DBUG_EXECUTE_IF("TabletStream._append_data.unknown_indexid",
{ index_id = UNKNOWN_ID_FOR_TEST; });
auto it = _index_streams_map.find(index_id);
if (it == _index_streams_map.end()) {
return Status::Error<ErrorCode::INVALID_ARGUMENT>("unknown index_id {}", index_id);
} else {
index_stream = it->second;
}
return index_stream->append_data(header, data);
}
int LoadStream::on_received_messages(StreamId id, butil::IOBuf* const messages[], size_t size) {
VLOG_DEBUG << "on_received_messages " << id << " " << size;
for (size_t i = 0; i < size; ++i) {
while (messages[i]->size() > 0) {
// step 1: parse header
size_t hdr_len = 0;
messages[i]->cutn((void*)&hdr_len, sizeof(size_t));
butil::IOBuf hdr_buf;
PStreamHeader hdr;
messages[i]->cutn(&hdr_buf, hdr_len);
_parse_header(&hdr_buf, hdr);
// step 2: cut data
size_t data_len = 0;
messages[i]->cutn((void*)&data_len, sizeof(size_t));
butil::IOBuf data_buf;
PStreamHeader data;
messages[i]->cutn(&data_buf, data_len);
// step 3: dispatch
_dispatch(id, hdr, &data_buf);
}
}
return 0;
}
void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf* data) {
VLOG_DEBUG << PStreamHeader_Opcode_Name(hdr.opcode()) << " from " << hdr.src_id()
<< " with tablet " << hdr.tablet_id();
SCOPED_ATTACH_TASK(_resource_ctx);
// CLOSE_LOAD message should not be fault injected,
// otherwise the message will be ignored and causing close wait timeout
if (hdr.opcode() != PStreamHeader::CLOSE_LOAD) {
DBUG_EXECUTE_IF("LoadStream._dispatch.unknown_loadid", {
PStreamHeader& t_hdr = const_cast<PStreamHeader&>(hdr);
PUniqueId* load_id = t_hdr.mutable_load_id();
load_id->set_hi(UNKNOWN_ID_FOR_TEST);
load_id->set_lo(UNKNOWN_ID_FOR_TEST);
});
DBUG_EXECUTE_IF("LoadStream._dispatch.unknown_srcid", {
PStreamHeader& t_hdr = const_cast<PStreamHeader&>(hdr);
t_hdr.set_src_id(UNKNOWN_ID_FOR_TEST);
});
}
if (UniqueId(hdr.load_id()) != UniqueId(_load_id)) {
Status st = Status::Error<ErrorCode::INVALID_ARGUMENT>(
"invalid load id {}, expected {}", print_id(hdr.load_id()), print_id(_load_id));
_report_failure(id, st, hdr);
return;
}
{
std::lock_guard lock_guard(_lock);
if (!_open_streams.contains(hdr.src_id())) {
Status st = Status::Error<ErrorCode::INVALID_ARGUMENT>("no open stream from source {}",
hdr.src_id());
_report_failure(id, st, hdr);
return;
}
}
switch (hdr.opcode()) {
case PStreamHeader::ADD_SEGMENT: // ADD_SEGMENT will be dispatched inside TabletStream
case PStreamHeader::APPEND_DATA: {
auto st = _append_data(hdr, data);
if (!st.ok()) {
_report_failure(id, st, hdr);
}
} break;
case PStreamHeader::CLOSE_LOAD: {
std::vector<int64_t> success_tablet_ids;
FailedTablets failed_tablets;
std::vector<PTabletID> tablets_to_commit(hdr.tablets().begin(), hdr.tablets().end());
bool all_closed =
close(hdr.src_id(), tablets_to_commit, &success_tablet_ids, &failed_tablets);
_report_result(id, Status::OK(), success_tablet_ids, failed_tablets, true);
std::lock_guard<bthread::Mutex> lock_guard(_lock);
// if incremental stream, we need to wait for all non-incremental streams to be closed
// before closing incremental streams. We need a fencing mechanism to avoid use after closing
// across different be.
if (hdr.has_num_incremental_streams() && hdr.num_incremental_streams() > 0) {
_closing_stream_ids.push_back(id);
} else {
brpc::StreamClose(id);
}
if (all_closed) {
for (auto& closing_id : _closing_stream_ids) {
brpc::StreamClose(closing_id);
}
_closing_stream_ids.clear();
}
} break;
case PStreamHeader::GET_SCHEMA: {
_report_schema(id, hdr);
} break;
default:
LOG(WARNING) << "unexpected stream message " << hdr.opcode() << ", " << *this;
DCHECK(false);
}
}
void LoadStream::on_idle_timeout(StreamId id) {
LOG(WARNING) << "closing load stream on idle timeout, " << *this;
brpc::StreamClose(id);
}
void LoadStream::on_closed(StreamId id) {
// `this` may be freed by other threads after increasing `_close_rpc_cnt`,
// format string first to prevent use-after-free
std::stringstream ss;
ss << *this;
auto remaining_streams = _total_streams - _close_rpc_cnt.fetch_add(1) - 1;
LOG(INFO) << "stream " << id << " on_closed, remaining streams = " << remaining_streams << ", "
<< ss.str();
if (remaining_streams == 0) {
_load_stream_mgr->clear_load(_load_id);
}
}
inline std::ostream& operator<<(std::ostream& ostr, const LoadStream& load_stream) {
ostr << "load_id=" << print_id(load_stream._load_id) << ", txn_id=" << load_stream._txn_id;
return ostr;
}
#include "common/compile_check_end.h"
} // namespace doris