blob: 3b4a1f1a185678c764bde22ec18a7ce94e8ed8d2 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/storage_policy.h"
#include <glog/logging.h>
#include <algorithm>
#include <cstdlib>
#include <mutex>
#include <unordered_map>
#include "gen_cpp/cloud.pb.h"
#include "olap/olap_define.h"
#include "olap/rowset/rowset_meta.h"
#include "util/hash_util.hpp"
namespace doris {
struct StoragePolicyMgr {
std::mutex mtx;
std::unordered_map<int64_t, StoragePolicyPtr> map;
};
static StoragePolicyMgr s_storage_policy_mgr;
Result<StorageResource> get_resource_by_storage_policy_id(int64_t storage_policy_id) {
auto storage_policy = get_storage_policy(storage_policy_id);
if (storage_policy == nullptr) {
return ResultError(Status::NotFound<false>(
"could not find storage_policy, storage_policy_id={}", storage_policy_id));
}
if (auto resource = get_storage_resource(storage_policy->resource_id); resource) {
return resource->first;
} else {
return ResultError(Status::NotFound<false>("could not find resource, resource_id={}",
storage_policy->resource_id));
}
}
StoragePolicyPtr get_storage_policy(int64_t id) {
std::lock_guard lock(s_storage_policy_mgr.mtx);
if (auto it = s_storage_policy_mgr.map.find(id); it != s_storage_policy_mgr.map.end()) {
return it->second;
}
return nullptr;
}
void put_storage_policy(int64_t id, StoragePolicyPtr policy) {
std::lock_guard lock(s_storage_policy_mgr.mtx);
s_storage_policy_mgr.map[id] = std::move(policy);
}
void delete_storage_policy(int64_t id) {
std::lock_guard lock(s_storage_policy_mgr.mtx);
s_storage_policy_mgr.map.erase(id);
}
std::vector<std::pair<int64_t, int64_t>> get_storage_policy_ids() {
std::vector<std::pair<int64_t, int64_t>> res;
res.reserve(s_storage_policy_mgr.map.size());
std::lock_guard lock(s_storage_policy_mgr.mtx);
for (auto& [id, policy] : s_storage_policy_mgr.map) {
res.emplace_back(id, policy->version);
}
return res;
}
struct StorageResourceMgr {
std::mutex mtx;
// resource_id -> storage_resource, resource_version
std::unordered_map<std::string, std::pair<StorageResource, int64_t>> map;
};
static StorageResourceMgr s_storage_resource_mgr;
io::RemoteFileSystemSPtr get_filesystem(const std::string& resource_id) {
std::lock_guard lock(s_storage_resource_mgr.mtx);
if (auto it = s_storage_resource_mgr.map.find(resource_id);
it != s_storage_resource_mgr.map.end()) {
return it->second.first.fs;
}
return nullptr;
}
std::optional<std::pair<StorageResource, int64_t>> get_storage_resource(int64_t resource_id) {
return get_storage_resource(std::to_string(resource_id));
}
std::optional<std::pair<StorageResource, int64_t>> get_storage_resource(
const std::string& resource_id) {
std::lock_guard lock(s_storage_resource_mgr.mtx);
if (auto it = s_storage_resource_mgr.map.find(resource_id);
it != s_storage_resource_mgr.map.end()) {
return it->second;
}
return std::nullopt;
}
void put_storage_resource(std::string resource_id, StorageResource resource, int64_t version) {
std::lock_guard lock(s_storage_resource_mgr.mtx);
s_storage_resource_mgr.map[resource_id] = std::make_pair(std::move(resource), version);
}
void put_storage_resource(int64_t resource_id, StorageResource resource, int64_t version) {
auto id_str = std::to_string(resource_id);
put_storage_resource(id_str, std::move(resource), version);
}
void delete_storage_resource(int64_t resource_id) {
auto id_str = std::to_string(resource_id);
std::lock_guard lock(s_storage_resource_mgr.mtx);
s_storage_resource_mgr.map.erase(id_str);
}
std::vector<std::pair<std::string, int64_t>> get_storage_resource_ids() {
std::vector<std::pair<std::string, int64_t>> res;
res.reserve(s_storage_resource_mgr.map.size());
std::lock_guard lock(s_storage_resource_mgr.mtx);
for (auto& [id, resource] : s_storage_resource_mgr.map) {
res.emplace_back(id, resource.second);
}
return res;
}
namespace {
[[noreturn]] void exit_at_unknown_path_version(std::string_view resource_id, int64_t path_version) {
throw Exception(
Status::FatalError("unknown path version, please upgrade BE or drop this storage "
"vault. resource_id={} path_version={}",
resource_id, path_version));
}
} // namespace
StorageResource::StorageResource(io::RemoteFileSystemSPtr fs_,
const cloud::StorageVaultPB_PathFormat& path_format)
: fs(std::move(fs_)), path_version(path_format.path_version()) {
switch (path_version) {
case 0:
break;
case 1:
shard_fn = [shard_num = path_format.shard_num()](int64_t tablet_id) {
return HashUtil::murmur_hash64A(static_cast<void*>(&tablet_id), sizeof(tablet_id),
HashUtil::MURMUR_SEED) %
shard_num;
};
break;
default:
exit_at_unknown_path_version(fs->id(), path_version);
}
}
std::string StorageResource::remote_segment_path(int64_t tablet_id, std::string_view rowset_id,
int64_t seg_id) const {
switch (path_version) {
case 0:
return fmt::format("{}/{}/{}_{}.dat", DATA_PREFIX, tablet_id, rowset_id, seg_id);
case 1:
return fmt::format("{}/{}/{}/{}/{}.dat", DATA_PREFIX, shard_fn(tablet_id), tablet_id,
rowset_id, seg_id);
default:
exit_at_unknown_path_version(fs->id(), path_version);
}
}
std::string StorageResource::remote_segment_path(const RowsetMeta& rowset, int64_t seg_id) const {
switch (path_version) {
case 0:
return fmt::format("{}/{}/{}_{}.dat", DATA_PREFIX, rowset.tablet_id(),
rowset.rowset_id().to_string(), seg_id);
case 1:
return fmt::format("{}/{}/{}/{}/{}.dat", DATA_PREFIX, shard_fn(rowset.tablet_id()),
rowset.tablet_id(), rowset.rowset_id().to_string(), seg_id);
default:
exit_at_unknown_path_version(fs->id(), path_version);
}
}
std::string StorageResource::remote_idx_v1_path(const RowsetMeta& rowset, int64_t seg_id,
int64_t index_id,
std::string_view index_path_suffix) const {
std::string suffix =
index_path_suffix.empty() ? "" : std::string {"@"} + index_path_suffix.data();
switch (path_version) {
case 0:
return fmt::format("{}/{}/{}_{}_{}{}.idx", DATA_PREFIX, rowset.tablet_id(),
rowset.rowset_id().to_string(), seg_id, index_id, suffix);
case 1:
return fmt::format("{}/{}/{}/{}/{}_{}{}.idx", DATA_PREFIX, shard_fn(rowset.tablet_id()),
rowset.tablet_id(), rowset.rowset_id().to_string(), seg_id, index_id,
suffix);
default:
exit_at_unknown_path_version(fs->id(), path_version);
}
}
std::string StorageResource::remote_idx_v2_path(const RowsetMeta& rowset, int64_t seg_id) const {
switch (path_version) {
case 0:
return fmt::format("{}/{}/{}_{}.idx", DATA_PREFIX, rowset.tablet_id(),
rowset.rowset_id().to_string(), seg_id);
case 1:
return fmt::format("{}/{}/{}/{}/{}.idx", DATA_PREFIX, shard_fn(rowset.tablet_id()),
rowset.tablet_id(), rowset.rowset_id().to_string(), seg_id);
default:
exit_at_unknown_path_version(fs->id(), path_version);
}
}
std::string StorageResource::remote_tablet_path(int64_t tablet_id) const {
switch (path_version) {
case 0:
return fmt::format("{}/{}", DATA_PREFIX, tablet_id);
case 1:
return fmt::format("{}/{}/{}", DATA_PREFIX, shard_fn(tablet_id), tablet_id);
default:
exit_at_unknown_path_version(fs->id(), path_version);
}
}
std::string StorageResource::cooldown_tablet_meta_path(int64_t tablet_id, int64_t replica_id,
int64_t cooldown_term) const {
return remote_tablet_path(tablet_id) + '/' +
cooldown_tablet_meta_filename(replica_id, cooldown_term);
}
} // namespace doris