blob: 0f590ec61a5aab208387d3423640b942c9db2bbf [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/sink/load_stream_stub.h"
#include <sstream>
#include "common/cast_set.h"
#include "olap/rowset/rowset_writer.h"
#include "runtime/query_context.h"
#include "util/brpc_client_cache.h"
#include "util/debug_points.h"
#include "util/network_util.h"
#include "util/thrift_util.h"
#include "util/uid_util.h"
namespace doris {
#include "common/compile_check_begin.h"
int LoadStreamReplyHandler::on_received_messages(brpc::StreamId id, butil::IOBuf* const messages[],
size_t size) {
auto stub = _stub.lock();
if (!stub) {
LOG(WARNING) << "stub is not exist when on_received_messages, " << *this
<< ", stream_id=" << id;
return 0;
}
for (size_t i = 0; i < size; i++) {
butil::IOBufAsZeroCopyInputStream wrapper(*messages[i]);
PLoadStreamResponse response;
response.ParseFromZeroCopyStream(&wrapper);
if (response.eos()) {
stub->_is_eos.store(true);
}
Status st = Status::create<false>(response.status());
std::stringstream ss;
ss << "on_received_messages, " << *this << ", stream_id=" << id;
if (response.success_tablet_ids_size() > 0) {
ss << ", success tablet ids:";
for (auto tablet_id : response.success_tablet_ids()) {
ss << " " << tablet_id;
}
std::lock_guard<bthread::Mutex> lock(stub->_success_tablets_mutex);
for (auto tablet_id : response.success_tablet_ids()) {
stub->_success_tablets.push_back(tablet_id);
}
}
if (response.failed_tablets_size() > 0) {
ss << ", failed tablet ids:";
for (auto pb : response.failed_tablets()) {
ss << " " << pb.id() << ":" << Status::create(pb.status());
}
std::lock_guard<bthread::Mutex> lock(stub->_failed_tablets_mutex);
for (auto pb : response.failed_tablets()) {
stub->_failed_tablets.emplace(pb.id(), Status::create(pb.status()));
}
}
if (response.tablet_schemas_size() > 0) {
ss << ", tablet schema num: " << response.tablet_schemas_size();
std::lock_guard<bthread::Mutex> lock(stub->_schema_mutex);
for (const auto& schema : response.tablet_schemas()) {
auto tablet_schema = std::make_unique<TabletSchema>();
tablet_schema->init_from_pb(schema.tablet_schema());
stub->_tablet_schema_for_index->emplace(schema.index_id(),
std::move(tablet_schema));
stub->_enable_unique_mow_for_index->emplace(
schema.index_id(), schema.enable_unique_key_merge_on_write());
}
stub->_schema_cv.notify_all();
}
ss << ", status: " << st;
LOG(INFO) << ss.str();
if (response.has_load_stream_profile()) {
TRuntimeProfileTree tprofile;
const uint8_t* buf =
reinterpret_cast<const uint8_t*>(response.load_stream_profile().data());
uint32_t len = cast_set<uint32_t>(response.load_stream_profile().size());
auto status = deserialize_thrift_msg(buf, &len, false, &tprofile);
if (status.ok()) {
// TODO
//_sink->_state->load_channel_profile()->update(tprofile);
} else {
LOG(WARNING) << "load stream TRuntimeProfileTree deserialize failed, errmsg="
<< status;
}
}
}
return 0;
}
void LoadStreamReplyHandler::on_closed(brpc::StreamId id) {
Defer defer {[this]() { delete this; }};
LOG(INFO) << "on_closed, " << *this << ", stream_id=" << id;
auto stub = _stub.lock();
if (!stub) {
LOG(WARNING) << "stub is not exist when on_closed, " << *this;
return;
}
stub->_is_closed.store(true);
}
inline std::ostream& operator<<(std::ostream& ostr, const LoadStreamReplyHandler& handler) {
ostr << "LoadStreamReplyHandler load_id=" << UniqueId(handler._load_id)
<< ", dst_id=" << handler._dst_id;
return ostr;
}
LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id,
std::shared_ptr<IndexToTabletSchema> schema_map,
std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental)
: _load_id(load_id),
_src_id(src_id),
_tablet_schema_for_index(schema_map),
_enable_unique_mow_for_index(mow_map),
_is_incremental(incremental) {};
LoadStreamStub::~LoadStreamStub() {
if (_is_open.load() && !_is_closed.load()) {
auto ret = brpc::StreamClose(_stream_id);
LOG(INFO) << *this << " is deconstructed, close " << (ret == 0 ? "success" : "failed");
}
}
// open_load_stream
Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
const NodeInfo& node_info, int64_t txn_id,
const OlapTableSchemaParam& schema,
const std::vector<PTabletID>& tablets_for_schema, int total_streams,
int64_t idle_timeout_ms, bool enable_profile) {
std::unique_lock<bthread::Mutex> lock(_open_mutex);
if (_is_init.load()) {
return _status;
}
_is_init.store(true);
_dst_id = node_info.id;
brpc::StreamOptions opt;
opt.max_buf_size = cast_set<int>(config::load_stream_max_buf_size);
opt.idle_timeout_ms = idle_timeout_ms;
opt.messages_in_batch = config::load_stream_messages_in_batch;
opt.handler = new LoadStreamReplyHandler(_load_id, _dst_id, shared_from_this());
brpc::Controller cntl;
if (int ret = brpc::StreamCreate(&_stream_id, cntl, &opt)) {
delete opt.handler;
_status = Status::Error<true>(ret, "Failed to create stream");
return _status;
}
cntl.set_timeout_ms(config::open_load_stream_timeout_ms);
POpenLoadStreamRequest request;
*request.mutable_load_id() = _load_id;
request.set_src_id(_src_id);
request.set_txn_id(txn_id);
request.set_enable_profile(enable_profile);
if (_is_incremental) {
request.set_total_streams(0);
} else if (total_streams > 0) {
request.set_total_streams(total_streams);
} else {
_status = Status::InternalError("total_streams should be greator than 0");
return _status;
}
request.set_idle_timeout_ms(idle_timeout_ms);
schema.to_protobuf(request.mutable_schema());
for (auto& tablet : tablets_for_schema) {
*request.add_tablets() = tablet;
}
POpenLoadStreamResponse response;
// set connection_group "streaming" to distinguish with non-streaming connections
const auto& stub = client_cache->get_client(node_info.host, node_info.brpc_port);
if (stub == nullptr) {
return Status::InternalError("failed to init brpc client to {}:{}", node_info.host,
node_info.brpc_port);
}
stub->open_load_stream(&cntl, &request, &response, nullptr);
for (const auto& resp : response.tablet_schemas()) {
auto tablet_schema = std::make_unique<TabletSchema>();
tablet_schema->init_from_pb(resp.tablet_schema());
_tablet_schema_for_index->emplace(resp.index_id(), std::move(tablet_schema));
_enable_unique_mow_for_index->emplace(resp.index_id(),
resp.enable_unique_key_merge_on_write());
}
if (cntl.Failed()) {
brpc::StreamClose(_stream_id);
_status = Status::InternalError("Failed to connect to backend {}: {}", _dst_id,
cntl.ErrorText());
return _status;
}
LOG(INFO) << "open load stream to host=" << node_info.host << ", port=" << node_info.brpc_port
<< ", " << *this;
_is_open.store(true);
_status = Status::OK();
return _status;
}
// APPEND_DATA
Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64_t tablet_id,
int32_t segment_id, uint64_t offset, std::span<const Slice> data,
bool segment_eos, FileType file_type) {
if (!_is_open.load()) {
add_failed_tablet(tablet_id, _status);
return _status;
}
DBUG_EXECUTE_IF("LoadStreamStub.skip_send_segment", { return Status::OK(); });
PStreamHeader header;
header.set_src_id(_src_id);
*header.mutable_load_id() = _load_id;
header.set_partition_id(partition_id);
header.set_index_id(index_id);
header.set_tablet_id(tablet_id);
header.set_segment_id(segment_id);
header.set_segment_eos(segment_eos);
header.set_offset(offset);
header.set_opcode(doris::PStreamHeader::APPEND_DATA);
header.set_file_type(file_type);
add_write_tablets(tablet_id);
return _encode_and_send(header, data);
}
// ADD_SEGMENT
Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64_t tablet_id,
int32_t segment_id, const SegmentStatistics& segment_stat,
TabletSchemaSPtr flush_schema) {
if (!_is_open.load()) {
add_failed_tablet(tablet_id, _status);
return _status;
}
DBUG_EXECUTE_IF("LoadStreamStub.skip_send_segment", { return Status::OK(); });
PStreamHeader header;
header.set_src_id(_src_id);
*header.mutable_load_id() = _load_id;
header.set_partition_id(partition_id);
header.set_index_id(index_id);
header.set_tablet_id(tablet_id);
header.set_segment_id(segment_id);
header.set_opcode(doris::PStreamHeader::ADD_SEGMENT);
segment_stat.to_pb(header.mutable_segment_statistics());
if (flush_schema != nullptr) {
flush_schema->to_schema_pb(header.mutable_flush_schema());
}
add_write_tablets(tablet_id);
return _encode_and_send(header);
}
// CLOSE_LOAD
Status LoadStreamStub::close_load(const std::vector<PTabletID>& tablets_to_commit) {
if (!_is_open.load()) {
return _status;
}
PStreamHeader header;
*header.mutable_load_id() = _load_id;
header.set_src_id(_src_id);
header.set_opcode(doris::PStreamHeader::CLOSE_LOAD);
for (const auto& tablet : tablets_to_commit) {
*header.add_tablets() = tablet;
}
_status = _encode_and_send(header);
if (!_status.ok()) {
LOG(WARNING) << "stream " << _stream_id << " close failed: " << _status;
return _status;
}
_is_closing.store(true);
return Status::OK();
}
// GET_SCHEMA
Status LoadStreamStub::get_schema(const std::vector<PTabletID>& tablets) {
if (!_is_open.load()) {
return _status;
}
PStreamHeader header;
*header.mutable_load_id() = _load_id;
header.set_src_id(_src_id);
header.set_opcode(doris::PStreamHeader::GET_SCHEMA);
std::ostringstream oss;
oss << "fetching tablet schema from stream " << _stream_id
<< ", load id: " << print_id(_load_id) << ", tablet id:";
for (const auto& tablet : tablets) {
*header.add_tablets() = tablet;
oss << " " << tablet.tablet_id();
}
if (tablets.size() == 0) {
oss << " none";
}
LOG(INFO) << oss.str();
return _encode_and_send(header);
}
Status LoadStreamStub::wait_for_schema(int64_t partition_id, int64_t index_id, int64_t tablet_id,
int64_t timeout_ms) {
if (!_is_open.load()) {
return _status;
}
if (_tablet_schema_for_index->contains(index_id)) {
return Status::OK();
}
PTabletID tablet;
tablet.set_partition_id(partition_id);
tablet.set_index_id(index_id);
tablet.set_tablet_id(tablet_id);
RETURN_IF_ERROR(get_schema({tablet}));
MonotonicStopWatch watch;
watch.start();
while (!_tablet_schema_for_index->contains(index_id) &&
watch.elapsed_time() / 1000 / 1000 < timeout_ms) {
RETURN_IF_ERROR(check_cancel());
static_cast<void>(wait_for_new_schema(100));
}
if (!_tablet_schema_for_index->contains(index_id)) {
return Status::TimedOut("timeout to get tablet schema for index {}", index_id);
}
return Status::OK();
}
Status LoadStreamStub::close_finish_check(RuntimeState* state, bool* is_closed) {
DBUG_EXECUTE_IF("LoadStreamStub::close_wait.long_wait", DBUG_BLOCK);
DBUG_EXECUTE_IF("LoadStreamStub::close_finish_check.close_failed",
{ return Status::InternalError("close failed"); });
*is_closed = true;
if (!_is_open.load()) {
// we don't need to close wait on non-open streams
return Status::OK();
}
if (state->get_query_ctx()->is_cancelled()) {
return state->get_query_ctx()->exec_status();
}
if (!_is_closing.load()) {
*is_closed = false;
return _status;
}
if (_is_closed.load()) {
RETURN_IF_ERROR(check_cancel());
if (!_is_eos.load()) {
return Status::InternalError("Stream closed without EOS, {}", to_string());
}
return Status::OK();
}
*is_closed = false;
return Status::OK();
}
void LoadStreamStub::cancel(Status reason) {
LOG(WARNING) << *this << " is cancelled because of " << reason;
if (_is_open.load()) {
brpc::StreamClose(_stream_id);
}
{
std::lock_guard<bthread::Mutex> lock(_cancel_mutex);
_cancel_st = reason;
_is_cancelled.store(true);
}
_is_closed.store(true);
}
Status LoadStreamStub::_encode_and_send(PStreamHeader& header, std::span<const Slice> data) {
butil::IOBuf buf;
size_t header_len = header.ByteSizeLong();
buf.append(reinterpret_cast<uint8_t*>(&header_len), sizeof(header_len));
buf.append(header.SerializeAsString());
size_t data_len = std::transform_reduce(data.begin(), data.end(), 0, std::plus(),
[](const Slice& s) { return s.get_size(); });
buf.append(reinterpret_cast<uint8_t*>(&data_len), sizeof(data_len));
for (const auto& slice : data) {
buf.append(slice.get_data(), slice.get_size());
}
bool eos = header.opcode() == doris::PStreamHeader::CLOSE_LOAD;
bool get_schema = header.opcode() == doris::PStreamHeader::GET_SCHEMA;
add_bytes_written(buf.size());
return _send_with_buffer(buf, eos || get_schema);
}
Status LoadStreamStub::_send_with_buffer(butil::IOBuf& buf, bool sync) {
butil::IOBuf output;
std::unique_lock<decltype(_buffer_mutex)> buffer_lock(_buffer_mutex);
_buffer.append(buf);
if (!sync && _buffer.size() < config::brpc_streaming_client_batch_bytes) {
return Status::OK();
}
output.swap(_buffer);
// acquire send lock while holding buffer lock, to ensure the message order
std::lock_guard<decltype(_send_mutex)> send_lock(_send_mutex);
buffer_lock.unlock();
VLOG_DEBUG << "send buf size : " << output.size() << ", sync: " << sync;
auto st = _send_with_retry(output);
if (!st.ok()) {
_handle_failure(output, st);
}
return st;
}
void LoadStreamStub::_handle_failure(butil::IOBuf& buf, Status st) {
while (buf.size() > 0) {
// step 1: parse header
size_t hdr_len = 0;
buf.cutn((void*)&hdr_len, sizeof(size_t));
butil::IOBuf hdr_buf;
PStreamHeader hdr;
buf.cutn(&hdr_buf, hdr_len);
butil::IOBufAsZeroCopyInputStream wrapper(hdr_buf);
hdr.ParseFromZeroCopyStream(&wrapper);
// step 2: cut data
size_t data_len = 0;
buf.cutn((void*)&data_len, sizeof(size_t));
butil::IOBuf data_buf;
buf.cutn(&data_buf, data_len);
// step 3: handle failure
switch (hdr.opcode()) {
case PStreamHeader::ADD_SEGMENT:
case PStreamHeader::APPEND_DATA: {
DBUG_EXECUTE_IF("LoadStreamStub._handle_failure.append_data_failed", {
add_failed_tablet(hdr.tablet_id(), st);
return;
});
DBUG_EXECUTE_IF("LoadStreamStub._handle_failure.add_segment_failed", {
add_failed_tablet(hdr.tablet_id(), st);
return;
});
add_failed_tablet(hdr.tablet_id(), st);
} break;
case PStreamHeader::CLOSE_LOAD: {
DBUG_EXECUTE_IF("LoadStreamStub._handle_failure.close_load_failed", {
brpc::StreamClose(_stream_id);
return;
});
brpc::StreamClose(_stream_id);
} break;
case PStreamHeader::GET_SCHEMA: {
DBUG_EXECUTE_IF("LoadStreamStub._handle_failure.get_schema_failed", {
// Just log and let wait_for_schema timeout
std::ostringstream oss;
for (const auto& tablet : hdr.tablets()) {
oss << " " << tablet.tablet_id();
}
LOG(WARNING) << "failed to send GET_SCHEMA request, tablet_id:" << oss.str() << ", "
<< *this;
return;
});
// Just log and let wait_for_schema timeout
std::ostringstream oss;
for (const auto& tablet : hdr.tablets()) {
oss << " " << tablet.tablet_id();
}
LOG(WARNING) << "failed to send GET_SCHEMA request, tablet_id:" << oss.str() << ", "
<< *this;
} break;
default:
LOG(WARNING) << "unexpected stream message " << hdr.opcode() << ", " << *this;
DCHECK(false);
}
}
}
Status LoadStreamStub::_send_with_retry(butil::IOBuf& buf) {
for (;;) {
RETURN_IF_ERROR(check_cancel());
int ret;
{
DBUG_EXECUTE_IF("LoadStreamStub._send_with_retry.delay_before_send", {
int64_t delay_ms = dp->param<int64_t>("delay_ms", 1000);
bthread_usleep(delay_ms * 1000);
});
brpc::StreamWriteOptions options;
options.write_in_background = config::enable_brpc_stream_write_background;
ret = brpc::StreamWrite(_stream_id, buf, &options);
}
DBUG_EXECUTE_IF("LoadStreamStub._send_with_retry.stream_write_failed", { ret = EPIPE; });
switch (ret) {
case 0:
return Status::OK();
case EAGAIN: {
const timespec time = butil::seconds_from_now(config::load_stream_eagain_wait_seconds);
int wait_ret = brpc::StreamWait(_stream_id, &time);
if (wait_ret != 0) {
return Status::InternalError("StreamWait failed, err={}, {}", wait_ret,
to_string());
}
break;
}
default:
return Status::InternalError("StreamWrite failed, err={}, {}", ret, to_string());
}
}
}
std::string LoadStreamStub::to_string() {
std::ostringstream ss;
ss << *this;
return ss.str();
}
inline std::ostream& operator<<(std::ostream& ostr, const LoadStreamStub& stub) {
ostr << "LoadStreamStub load_id=" << print_id(stub._load_id) << ", src_id=" << stub._src_id
<< ", dst_id=" << stub._dst_id << ", stream_id=" << stub._stream_id;
return ostr;
}
Status LoadStreamStubs::open(BrpcClientCache<PBackendService_Stub>* client_cache,
const NodeInfo& node_info, int64_t txn_id,
const OlapTableSchemaParam& schema,
const std::vector<PTabletID>& tablets_for_schema, int total_streams,
int64_t idle_timeout_ms, bool enable_profile) {
bool get_schema = true;
auto status = Status::OK();
for (auto& stream : _streams) {
Status st;
if (get_schema) {
st = stream->open(client_cache, node_info, txn_id, schema, tablets_for_schema,
total_streams, idle_timeout_ms, enable_profile);
} else {
st = stream->open(client_cache, node_info, txn_id, schema, {}, total_streams,
idle_timeout_ms, enable_profile);
}
if (st.ok()) {
get_schema = false;
} else {
LOG(WARNING) << "open stream failed: " << st << "; stream: " << *stream;
status = st;
// no break here to try get schema from the rest streams
}
}
// only mark open when all streams open success
_open_success.store(status.ok());
// cancel all streams if open failed
if (!status.ok()) {
cancel(status);
}
return status;
}
Status LoadStreamStubs::close_load(const std::vector<PTabletID>& tablets_to_commit) {
if (!_open_success.load()) {
return Status::InternalError("streams not open");
}
bool first = true;
auto status = Status::OK();
for (auto& stream : _streams) {
Status st;
if (first) {
st = stream->close_load(tablets_to_commit);
first = false;
} else {
st = stream->close_load({});
}
if (!st.ok()) {
LOG(WARNING) << "close_load failed: " << st << "; stream: " << *stream;
}
}
return status;
}
} // namespace doris