blob: 300e1daa606e518f7c5683790399b8e4a150586e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <atomic>
#include <condition_variable>
#include <deque>
#include <functional>
#include <memory>
#include <mutex>
#include <string>
#include <string_view>
#include "common/status.h"
namespace doris {
class ExecEnv;
class StorageEngine;
class CloudStorageEngine;
class Thread;
class ThreadPool;
class TReportRequest;
class TTabletInfo;
class TAgentTaskRequest;
class ClusterInfo;
class TaskWorkerPoolIf {
public:
virtual ~TaskWorkerPoolIf() = default;
virtual Status submit_task(const TAgentTaskRequest& task) = 0;
};
class TaskWorkerPool : public TaskWorkerPoolIf {
public:
TaskWorkerPool(std::string_view name, int worker_count,
std::function<void(const TAgentTaskRequest&)> callback);
~TaskWorkerPool() override;
void stop();
Status submit_task(const TAgentTaskRequest& task) override;
protected:
std::atomic_bool _stopped {false};
std::unique_ptr<ThreadPool> _thread_pool;
std::function<void(const TAgentTaskRequest&)> _callback;
};
class PublishVersionWorkerPool final : public TaskWorkerPool {
public:
PublishVersionWorkerPool(StorageEngine& engine);
~PublishVersionWorkerPool() override;
private:
void publish_version_callback(const TAgentTaskRequest& task);
StorageEngine& _engine;
};
class PriorTaskWorkerPool final : public TaskWorkerPoolIf {
public:
PriorTaskWorkerPool(const std::string& name, int normal_worker_count,
int high_prior_worker_count,
std::function<void(const TAgentTaskRequest& task)> callback);
~PriorTaskWorkerPool() override;
void stop();
Status submit_task(const TAgentTaskRequest& task) override;
Status submit_high_prior_and_cancel_low(TAgentTaskRequest& task);
private:
void normal_loop();
void high_prior_loop();
bool _stopped {false};
std::mutex _mtx;
std::condition_variable _normal_condv;
std::deque<std::unique_ptr<TAgentTaskRequest>> _normal_queue;
std::condition_variable _high_prior_condv;
std::deque<std::unique_ptr<TAgentTaskRequest>> _high_prior_queue;
std::vector<std::shared_ptr<Thread>> _workers;
std::function<void(const TAgentTaskRequest&)> _callback;
};
class ReportWorker {
public:
ReportWorker(std::string name, const ClusterInfo* cluster_info, int report_interval_s,
std::function<void()> callback);
~ReportWorker();
std::string_view name() const { return _name; }
// Notify the worker to report immediately
void notify();
void stop();
private:
std::string _name;
std::shared_ptr<Thread> _thread;
std::mutex _mtx;
std::condition_variable _condv;
bool _stopped {false};
bool _signal {false};
};
void alter_inverted_index_callback(StorageEngine& engine, const TAgentTaskRequest& req);
void alter_cloud_index_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req);
void check_consistency_callback(StorageEngine& engine, const TAgentTaskRequest& req);
void upload_callback(StorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req);
void download_callback(StorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req);
void download_callback(CloudStorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req);
void make_snapshot_callback(StorageEngine& engine, const TAgentTaskRequest& req);
void release_snapshot_callback(StorageEngine& engine, const TAgentTaskRequest& req);
void release_snapshot_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req);
void move_dir_callback(StorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req);
void move_dir_callback(CloudStorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req);
void submit_table_compaction_callback(StorageEngine& engine, const TAgentTaskRequest& req);
void push_storage_policy_callback(StorageEngine& engine, const TAgentTaskRequest& req);
void push_index_policy_callback(const TAgentTaskRequest& req);
void push_cooldown_conf_callback(StorageEngine& engine, const TAgentTaskRequest& req);
void create_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req);
void drop_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req);
void drop_tablet_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req);
void clear_transaction_task_callback(StorageEngine& engine, const TAgentTaskRequest& req);
void push_callback(StorageEngine& engine, const TAgentTaskRequest& req);
void cloud_push_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req);
void update_tablet_meta_callback(StorageEngine& engine, const TAgentTaskRequest& req);
void alter_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req);
void alter_cloud_tablet_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req);
void clone_callback(StorageEngine& engine, const ClusterInfo* cluster_info,
const TAgentTaskRequest& req);
void storage_medium_migrate_callback(StorageEngine& engine, const TAgentTaskRequest& req);
void gc_binlog_callback(StorageEngine& engine, const TAgentTaskRequest& req);
void clean_trash_callback(StorageEngine& engine, const TAgentTaskRequest& req);
void clean_udf_cache_callback(const TAgentTaskRequest& req);
void visible_version_callback(StorageEngine& engine, const TAgentTaskRequest& req);
void report_task_callback(const ClusterInfo* cluster_info);
void report_disk_callback(StorageEngine& engine, const ClusterInfo* cluster_info);
void report_disk_callback(CloudStorageEngine& engine, const ClusterInfo* cluster_info);
void report_tablet_callback(StorageEngine& engine, const ClusterInfo* cluster_info);
void report_tablet_callback(CloudStorageEngine& engine, const ClusterInfo* cluster_info);
void calc_delete_bitmap_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req);
void report_index_policy_callback(const ClusterInfo* cluster_info);
} // namespace doris