blob: 66e089c22e9d5346506ccf4d3615fef430581d2c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "cloud/cloud_internal_service.h"
#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet_mgr.h"
#include "io/cache/block_file_cache.h"
#include "io/cache/block_file_cache_factory.h"
namespace doris {
CloudInternalServiceImpl::CloudInternalServiceImpl(CloudStorageEngine& engine, ExecEnv* exec_env)
: PInternalService(exec_env), _engine(engine) {}
CloudInternalServiceImpl::~CloudInternalServiceImpl() = default;
void CloudInternalServiceImpl::alter_vault_sync(google::protobuf::RpcController* controller,
const doris::PAlterVaultSyncRequest* request,
PAlterVaultSyncResponse* response,
google::protobuf::Closure* done) {
LOG(INFO) << "alter be to sync vault info from Meta Service";
// If the vaults containing hdfs vault then it would try to create hdfs connection using jni
// which would acuiqre one thread local jniEnv. But bthread context can't guarantee that the brpc
// worker thread wouldn't do bthread switch between worker threads.
bool ret = _heavy_work_pool.try_offer([this, done]() {
brpc::ClosureGuard closure_guard(done);
_engine.sync_storage_vault();
});
if (!ret) {
brpc::ClosureGuard closure_guard(done);
LOG(WARNING) << "fail to offer alter_vault_sync request to the work pool, pool="
<< _heavy_work_pool.get_info();
}
}
FileCacheType cache_type_to_pb(io::FileCacheType type) {
switch (type) {
case io::FileCacheType::TTL:
return FileCacheType::TTL;
case io::FileCacheType::INDEX:
return FileCacheType::INDEX;
case io::FileCacheType::NORMAL:
return FileCacheType::NORMAL;
default:
DCHECK(false);
}
return FileCacheType::NORMAL;
}
void CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id(
google::protobuf::RpcController* controller [[maybe_unused]],
const PGetFileCacheMetaRequest* request, PGetFileCacheMetaResponse* response,
google::protobuf::Closure* done) {
brpc::ClosureGuard closure_guard(done);
if (!config::enable_file_cache) {
LOG_WARNING("try to access tablet file cache meta, but file cache not enabled");
return;
}
for (const auto& tablet_id : request->tablet_ids()) {
auto res = _engine.tablet_mgr().get_tablet(tablet_id);
if (!res.has_value()) {
LOG(ERROR) << "failed to get tablet: " << tablet_id
<< " err msg: " << res.error().msg();
return;
}
CloudTabletSPtr tablet = std::move(res.value());
auto rowsets = tablet->get_snapshot_rowset();
std::for_each(rowsets.cbegin(), rowsets.cend(), [&](const RowsetSharedPtr& rowset) {
std::string rowset_id = rowset->rowset_id().to_string();
for (int64_t segment_id = 0; segment_id < rowset->num_segments(); segment_id++) {
std::string file_name = fmt::format("{}_{}.dat", rowset_id, segment_id);
auto cache_key = io::BlockFileCache::hash(file_name);
auto* cache = io::FileCacheFactory::instance()->get_by_path(cache_key);
auto segments_meta = cache->get_hot_blocks_meta(cache_key);
for (const auto& tuple : segments_meta) {
FileCacheBlockMeta* meta = response->add_file_cache_block_metas();
meta->set_tablet_id(tablet_id);
meta->set_rowset_id(rowset_id);
meta->set_segment_id(segment_id);
meta->set_file_name(file_name);
meta->set_file_size(rowset->rowset_meta()->segment_file_size(segment_id));
meta->set_offset(std::get<0>(tuple));
meta->set_size(std::get<1>(tuple));
meta->set_cache_type(cache_type_to_pb(std::get<2>(tuple)));
meta->set_expiration_time(std::get<3>(tuple));
}
}
});
}
}
} // namespace doris