blob: dbaade783f4eacea1cd33ee2201455af030b2ee9 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <gen_cpp/PaloInternalService_types.h>
#include <atomic>
#include <condition_variable>
#include <cstdint>
#include <future>
#include <memory>
#include <mutex>
#include <shared_mutex>
#include <unordered_map>
#include <utility>
#include "common/status.h"
#include "olap/wal/wal_manager.h"
#include "runtime/exec_env.h"
#include "util/threadpool.h"
#include "vec/core/block.h"
#include "vec/sink/writer/vwal_writer.h"
namespace doris {
class ExecEnv;
class TUniqueId;
class RuntimeState;
namespace pipeline {
class Dependency;
}
struct BlockData {
BlockData(const std::shared_ptr<vectorized::Block>& block)
: block(block), block_bytes(block->bytes()) {};
std::shared_ptr<vectorized::Block> block;
size_t block_bytes;
};
class LoadBlockQueue {
public:
LoadBlockQueue(const UniqueId& load_instance_id, std::string& label, int64_t txn_id,
int64_t schema_version, int64_t index_size,
std::shared_ptr<std::atomic_size_t> all_block_queues_bytes,
bool wait_internal_group_commit_finish, int64_t group_commit_interval_ms,
int64_t group_commit_data_bytes)
: load_instance_id(load_instance_id),
label(label),
txn_id(txn_id),
schema_version(schema_version),
index_size(index_size),
wait_internal_group_commit_finish(wait_internal_group_commit_finish),
_group_commit_interval_ms(group_commit_interval_ms),
_start_time(std::chrono::steady_clock::now()),
_last_print_time(_start_time),
_group_commit_data_bytes(group_commit_data_bytes),
_all_block_queues_bytes(all_block_queues_bytes) {};
Status add_block(RuntimeState* runtime_state, std::shared_ptr<vectorized::Block> block,
bool write_wal, UniqueId& load_id);
Status get_block(RuntimeState* runtime_state, vectorized::Block* block, bool* find_block,
bool* eos, std::shared_ptr<pipeline::Dependency> get_block_dep);
bool contain_load_id(const UniqueId& load_id);
Status add_load_id(const UniqueId& load_id,
const std::shared_ptr<pipeline::Dependency> put_block_dep);
Status remove_load_id(const UniqueId& load_id);
void cancel(const Status& st);
bool need_commit() { return _need_commit; }
Status create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id, const std::string& import_label,
WalManager* wal_manager, std::vector<TSlotDescriptor>& slot_desc,
int be_exe_version);
Status close_wal();
bool has_enough_wal_disk_space(size_t estimated_wal_bytes);
void append_dependency(std::shared_ptr<pipeline::Dependency> finish_dep);
void append_read_dependency(std::shared_ptr<pipeline::Dependency> read_dep);
int64_t get_group_commit_interval_ms() { return _group_commit_interval_ms; };
std::string debug_string() const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(
debug_string_buffer,
"load_instance_id={}, label={}, txn_id={}, "
"wait_internal_group_commit_finish={}, data_size_condition={}, "
"group_commit_load_count={}, process_finish={}, _need_commit={}, schema_version={}",
load_instance_id.to_string(), label, txn_id, wait_internal_group_commit_finish,
data_size_condition, group_commit_load_count, process_finish.load(), _need_commit,
schema_version);
return fmt::to_string(debug_string_buffer);
}
UniqueId load_instance_id;
std::string label;
int64_t txn_id;
int64_t schema_version;
int64_t index_size;
bool wait_internal_group_commit_finish = false;
bool data_size_condition = false;
// counts of load in one group commit
std::atomic_size_t group_commit_load_count = 0;
// the execute status of this internal group commit
std::mutex mutex;
std::atomic<bool> process_finish = false;
Status status = Status::OK();
std::vector<std::shared_ptr<pipeline::Dependency>> dependencies;
private:
void _cancel_without_lock(const Status& st);
std::string _get_load_ids();
// the set of load ids of all blocks in this queue
std::map<UniqueId, std::shared_ptr<pipeline::Dependency>> _load_ids_to_write_dep;
std::vector<std::shared_ptr<pipeline::Dependency>> _read_deps;
std::list<BlockData> _block_queue;
// wal
std::string _wal_base_path;
std::shared_ptr<vectorized::VWalWriter> _v_wal_writer;
// commit
bool _need_commit = false;
// commit by time interval, can be changed by 'ALTER TABLE my_table SET ("group_commit_interval_ms"="1000");'
int64_t _group_commit_interval_ms;
std::chrono::steady_clock::time_point _start_time;
std::chrono::steady_clock::time_point _last_print_time;
// commit by data size
int64_t _group_commit_data_bytes;
int64_t _data_bytes = 0;
// memory back pressure, memory consumption of all tables' load block queues
std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
std::condition_variable _get_cond;
};
class GroupCommitTable {
public:
GroupCommitTable(ExecEnv* exec_env, doris::ThreadPool* thread_pool, int64_t db_id,
int64_t table_id, std::shared_ptr<std::atomic_size_t> all_block_queue_bytes)
: _exec_env(exec_env),
_thread_pool(thread_pool),
_all_block_queues_bytes(all_block_queue_bytes),
_db_id(db_id),
_table_id(table_id) {};
Status get_first_block_load_queue(int64_t table_id, int64_t base_schema_version,
int64_t index_size, const UniqueId& load_id,
std::shared_ptr<LoadBlockQueue>& load_block_queue,
int be_exe_version,
std::shared_ptr<MemTrackerLimiter> mem_tracker,
std::shared_ptr<pipeline::Dependency> create_plan_dep,
std::shared_ptr<pipeline::Dependency> put_block_dep);
Status get_load_block_queue(const TUniqueId& instance_id,
std::shared_ptr<LoadBlockQueue>& load_block_queue,
std::shared_ptr<pipeline::Dependency> get_block_dep);
void remove_load_id(const UniqueId& load_id);
private:
Status _create_group_commit_load(int be_exe_version,
std::shared_ptr<MemTrackerLimiter> mem_tracker);
Status _exec_plan_fragment(int64_t db_id, int64_t table_id, const std::string& label,
int64_t txn_id, const TPipelineFragmentParams& pipeline_params);
Status _finish_group_commit_load(int64_t db_id, int64_t table_id, const std::string& label,
int64_t txn_id, const TUniqueId& instance_id, Status& status,
RuntimeState* state);
ExecEnv* _exec_env = nullptr;
ThreadPool* _thread_pool = nullptr;
// memory consumption of all tables' load block queues, used for memory back pressure.
std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
int64_t _db_id;
int64_t _table_id;
std::mutex _lock;
// fragment_instance_id to load_block_queue
std::unordered_map<UniqueId, std::shared_ptr<LoadBlockQueue>> _load_block_queues;
bool _is_creating_plan_fragment = false;
// user_load_id -> <create_plan_dep, put_block_dep, base_schema_version, index_size>
std::unordered_map<UniqueId,
std::tuple<std::shared_ptr<pipeline::Dependency>,
std::shared_ptr<pipeline::Dependency>, int64_t, int64_t>>
_create_plan_deps;
std::string _create_plan_failed_reason;
};
class GroupCommitMgr {
public:
GroupCommitMgr(ExecEnv* exec_env);
virtual ~GroupCommitMgr();
void stop();
// used when init group_commit_scan_node
Status get_load_block_queue(int64_t table_id, const TUniqueId& instance_id,
std::shared_ptr<LoadBlockQueue>& load_block_queue,
std::shared_ptr<pipeline::Dependency> get_block_dep);
Status get_first_block_load_queue(int64_t db_id, int64_t table_id, int64_t base_schema_version,
int64_t index_size, const UniqueId& load_id,
std::shared_ptr<LoadBlockQueue>& load_block_queue,
int be_exe_version,
std::shared_ptr<MemTrackerLimiter> mem_tracker,
std::shared_ptr<pipeline::Dependency> create_plan_dep,
std::shared_ptr<pipeline::Dependency> put_block_dep);
void remove_load_id(int64_t table_id, const UniqueId& load_id);
std::promise<Status> debug_promise;
std::future<Status> debug_future = debug_promise.get_future();
private:
ExecEnv* _exec_env = nullptr;
std::unique_ptr<doris::ThreadPool> _thread_pool;
// memory consumption of all tables' load block queues, used for memory back pressure.
std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
std::mutex _lock;
// TODO remove table when unused
std::unordered_map<int64_t, std::shared_ptr<GroupCommitTable>> _table_map;
};
} // namespace doris