blob: f5b00394a2768f51a4d7bb85bae840132073f645 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <bthread/countdown_event.h>
#include <condition_variable>
#include <deque>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet.h"
#include "common/status.h"
#include "gen_cpp/BackendService.h"
#include "util/threadpool.h"
namespace doris {
enum class DownloadType {
BE,
S3,
};
struct JobMeta {
JobMeta() = default;
JobMeta(const TJobMeta& meta);
DownloadType download_type;
std::string be_ip;
int32_t brpc_port;
std::vector<int64_t> tablet_ids;
};
// manager for
// table warm up
// cluster warm up
// balance peer addr cache
class CloudWarmUpManager {
public:
explicit CloudWarmUpManager(CloudStorageEngine& engine);
~CloudWarmUpManager();
// Set the job id if the id is zero
Status check_and_set_job_id(int64_t job_id);
// Set the batch id to record download progress
Status check_and_set_batch_id(int64_t job_id, int64_t batch_id, bool* retry = nullptr);
// Add the dowload job
void add_job(const std::vector<TJobMeta>& job_metas);
#ifdef BE_TEST
void consumer_job();
#endif
// Get the job state tuple<cur_job_id, cur_batch_id, pending_job_metas_size, _finish_job_size>
std::tuple<int64_t, int64_t, int64_t, int64_t> get_current_job_state();
// Cancel the job
Status clear_job(int64_t job_id);
Status set_event(int64_t job_id, TWarmUpEventType::type event, bool clear = false);
// If `sync_wait_timeout_ms` <= 0, the function will send the warm-up RPC
// and return immediately without waiting for the warm-up to complete.
// If `sync_wait_timeout_ms` > 0, the function will wait for the warm-up
// to finish or until the specified timeout (in milliseconds) is reached.
//
// @param rs_meta Metadata of the rowset to be warmed up.
// @param sync_wait_timeout_ms Timeout in milliseconds to wait for the warm-up
// to complete. Non-positive value means no waiting.
void warm_up_rowset(RowsetMeta& rs_meta, int64_t sync_wait_timeout_ms = -1);
void recycle_cache(int64_t tablet_id, const std::vector<RecycledRowsets>& rowsets);
// Balance warm up cache management methods
void record_balanced_tablet(int64_t tablet_id, const std::string& host, int32_t brpc_port);
std::optional<std::pair<std::string, int32_t>> get_balanced_tablet_info(int64_t tablet_id);
void remove_balanced_tablet(int64_t tablet_id);
void remove_balanced_tablets(const std::vector<int64_t>& tablet_ids);
bool is_balanced_tablet_expired(const std::chrono::system_clock::time_point& ctime) const;
std::unordered_map<int64_t, std::pair<std::string, int32_t>> get_all_balanced_tablets() const;
private:
void schedule_remove_balanced_tablet(int64_t tablet_id);
static void clean_up_expired_mappings(void* arg);
void handle_jobs();
Status _do_warm_up_rowset(RowsetMeta& rs_meta, std::vector<TReplicaInfo>& replicas,
int64_t sync_wait_timeout_ms, bool skip_existence_check);
std::vector<TReplicaInfo> get_replica_info(int64_t tablet_id, bool bypass_cache,
bool& cache_hit);
void _warm_up_rowset(RowsetMeta& rs_meta, int64_t sync_wait_timeout_ms);
void _recycle_cache(int64_t tablet_id, const std::vector<RecycledRowsets>& rowsets);
void submit_download_tasks(io::Path path, int64_t file_size, io::FileSystemSPtr file_system,
int64_t expiration_time,
std::shared_ptr<bthread::CountdownEvent> wait, bool is_index = false,
std::function<void(Status)> done_cb = nullptr);
std::mutex _mtx;
std::condition_variable _cond;
int64_t _cur_job_id {0};
int64_t _cur_batch_id {-1};
std::deque<std::shared_ptr<JobMeta>> _pending_job_metas;
std::vector<std::shared_ptr<JobMeta>> _finish_job;
std::thread _download_thread;
bool _closed {false};
// the attribute for compile in ut
[[maybe_unused]] CloudStorageEngine& _engine;
// timestamp, info
using CacheEntry = std::pair<std::chrono::steady_clock::time_point, TReplicaInfo>;
// tablet_id -> entry
using Cache = std::unordered_map<int64_t, CacheEntry>;
// job_id -> cache
std::unordered_map<int64_t, Cache> _tablet_replica_cache;
std::unique_ptr<ThreadPool> _thread_pool;
std::unique_ptr<ThreadPoolToken> _thread_pool_token;
// Sharded lock for better performance
static constexpr size_t SHARD_COUNT = 10240;
struct Shard {
mutable std::mutex mtx;
std::unordered_map<int64_t, JobMeta> tablets;
};
std::array<Shard, SHARD_COUNT> _balanced_tablets_shards;
// Helper methods for shard operations
size_t get_shard_index(int64_t tablet_id) const {
return std::hash<int64_t> {}(tablet_id) % SHARD_COUNT;
}
Shard& get_shard(int64_t tablet_id) {
return _balanced_tablets_shards[get_shard_index(tablet_id)];
}
};
} // namespace doris