blob: d8c6a2ff8acc0e70205aaa337b28e9798accf81d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "io/cache/peer_file_cache_reader.h"
#include <brpc/controller.h>
#include <bvar/latency_recorder.h>
#include <bvar/reducer.h>
#include <fmt/format.h>
#include <gen_cpp/internal_service.pb.h>
#include <glog/logging.h>
#include <algorithm>
#include <utility>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "runtime/exec_env.h"
#include "runtime/thread_context.h"
#include "util/brpc_client_cache.h"
#include "util/bvar_helper.h"
#include "util/debug_points.h"
#include "util/defer_op.h"
#include "util/doris_metrics.h"
#include "util/network_util.h"
#include "util/runtime_profile.h"
namespace doris::io {
// read from peer
bvar::Adder<uint64_t> peer_cache_reader_failed_counter("peer_cache_reader", "failed_counter");
bvar::Adder<uint64_t> peer_cache_reader_succ_counter("peer_cache_reader", "succ_counter");
bvar::LatencyRecorder peer_bytes_per_read("peer_cache_reader", "bytes_per_read"); // also QPS
bvar::Adder<uint64_t> peer_cache_reader_total("peer_cache_reader", "total_num");
bvar::Adder<uint64_t> peer_cache_being_read("peer_cache_reader", "file_being_read");
bvar::Adder<uint64_t> peer_cache_reader_read_counter("peer_cache_reader", "read_at");
bvar::LatencyRecorder peer_cache_reader_latency("peer_cache_reader", "peer_latency");
bvar::PerSecond<bvar::Adder<uint64_t>> peer_get_request_qps("peer_cache_reader", "peer_get_request",
&peer_cache_reader_read_counter);
bvar::Adder<uint64_t> peer_bytes_read_total("peer_cache_reader", "bytes_read");
bvar::PerSecond<bvar::Adder<uint64_t>> peer_read_througthput("peer_cache_reader",
"peer_read_throughput",
&peer_bytes_read_total);
PeerFileCacheReader::PeerFileCacheReader(const io::Path& file_path, bool is_doris_table,
std::string host, int port)
: _path(file_path), _is_doris_table(is_doris_table), _host(host), _port(port) {
peer_cache_reader_total << 1;
peer_cache_being_read << 1;
}
PeerFileCacheReader::~PeerFileCacheReader() {
peer_cache_being_read << -1;
}
Status PeerFileCacheReader::fetch_blocks(const std::vector<FileBlockSPtr>& blocks, size_t off,
Slice s, size_t* bytes_read, size_t file_size,
const IOContext* ctx) {
VLOG_DEBUG << "enter PeerFileCacheReader::fetch_blocks, off=" << off
<< " bytes_read=" << *bytes_read;
if (blocks.empty()) {
*bytes_read = 0;
return Status::OK();
}
if (!_is_doris_table) {
return Status::NotSupported<false>("peer cache fetch only supports doris table segments");
}
PFetchPeerDataRequest req;
req.set_type(PFetchPeerDataRequest_Type_PEER_FILE_CACHE_BLOCK);
req.set_path(_path.filename().native());
req.set_file_size(static_cast<int64_t>(file_size));
for (const auto& blk : blocks) {
auto* cb = req.add_cache_req();
cb->set_block_offset(static_cast<int64_t>(blk->range().left));
cb->set_block_size(static_cast<int64_t>(blk->range().size()));
}
std::string realhost = _host;
int port = _port;
auto dns_cache = ExecEnv::GetInstance()->dns_cache();
if (dns_cache == nullptr) {
LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve";
} else if (!is_valid_ip(realhost)) {
Status status = dns_cache->get(_host, &realhost);
if (!status.ok()) {
peer_cache_reader_failed_counter << 1;
LOG(WARNING) << "failed to get ip from host " << _host << ": " << status.to_string();
return Status::InternalError<false>("failed to get ip from host {}", _host);
}
}
std::string brpc_addr = get_host_port(realhost, port);
Status st = Status::OK();
std::shared_ptr<PBackendService_Stub> brpc_stub =
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_new_client_no_cache(
brpc_addr);
if (!brpc_stub) {
peer_cache_reader_failed_counter << 1;
LOG(WARNING) << "failed to get brpc stub " << brpc_addr;
st = Status::RpcError<false>("Address {} is wrong", brpc_addr);
return st;
}
LIMIT_REMOTE_SCAN_IO(bytes_read);
int64_t begin_ts = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
Defer defer_latency {[&]() {
int64_t end_ts = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
peer_cache_reader_latency << (end_ts - begin_ts);
}};
brpc::Controller cntl;
cntl.set_timeout_ms(5000);
PFetchPeerDataResponse resp;
peer_cache_reader_read_counter << 1;
brpc_stub->fetch_peer_data(&cntl, &req, &resp, nullptr);
if (cntl.Failed()) {
return Status::RpcError<false>(cntl.ErrorText());
}
if (resp.has_status()) {
Status st2 = Status::create<false>(resp.status());
LOG_EVERY_N(WARNING, 1000) << "peer cache read failed, status=" << st2.msg();
if (!st2.ok()) return st2;
}
size_t filled = 0;
for (const auto& data : resp.datas()) {
if (data.data().empty()) {
peer_cache_reader_failed_counter << 1;
LOG(WARNING) << "peer cache read empty data" << data.block_offset();
return Status::InternalError<false>("peer cache read empty data");
}
int64_t block_off = data.block_offset();
size_t rel = block_off > static_cast<int64_t>(off)
? static_cast<size_t>(block_off - static_cast<int64_t>(off))
: 0;
size_t can_copy = std::min(s.size - rel, static_cast<size_t>(data.data().size()));
VLOG_DEBUG << "peer cache read data=" << data.block_offset()
<< " size=" << data.data().size() << " off=" << rel << " can_copy=" << can_copy;
std::memcpy(s.data + rel, data.data().data(), can_copy);
filled += can_copy;
}
VLOG_DEBUG << "peer cache read filled=" << filled;
peer_bytes_read_total << filled;
peer_bytes_per_read << filled;
if (filled != s.size) {
peer_cache_reader_failed_counter << 1;
return Status::InternalError<false>("peer cache read incomplete: need={}, got={}", s.size,
filled);
}
peer_cache_reader_succ_counter << 1;
*bytes_read = filled;
return Status::OK();
}
} // namespace doris::io