blob: f2c2b764644c33b6b8c96d1afb53a661fae7bfc5 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/memtable_flush_executor.h"
#include <gen_cpp/olap_file.pb.h>
#include <algorithm>
#include <cstddef>
#include <ostream>
#include "common/config.h"
#include "common/logging.h"
#include "common/signal_handler.h"
#include "olap/memtable.h"
#include "olap/rowset/rowset_writer.h"
#include "olap/storage_engine.h"
#include "runtime/thread_context.h"
#include "util/debug_points.h"
#include "util/doris_metrics.h"
#include "util/metrics.h"
#include "util/pretty_printer.h"
#include "util/stopwatch.hpp"
#include "util/time.h"
namespace doris {
using namespace ErrorCode;
bvar::Adder<int64_t> g_flush_task_num("memtable_flush_task_num");
class MemtableFlushTask final : public Runnable {
ENABLE_FACTORY_CREATOR(MemtableFlushTask);
public:
MemtableFlushTask(std::shared_ptr<FlushToken> flush_token, std::shared_ptr<MemTable> memtable,
int32_t segment_id, int64_t submit_task_time)
: _flush_token(flush_token),
_memtable(memtable),
_segment_id(segment_id),
_submit_task_time(submit_task_time) {
g_flush_task_num << 1;
}
~MemtableFlushTask() override { g_flush_task_num << -1; }
void run() override {
auto token = _flush_token.lock();
if (token) {
token->_flush_memtable(_memtable, _segment_id, _submit_task_time);
} else {
LOG(WARNING) << "flush token is deconstructed, ignore the flush task";
}
}
private:
std::weak_ptr<FlushToken> _flush_token;
std::shared_ptr<MemTable> _memtable;
int32_t _segment_id;
int64_t _submit_task_time;
};
std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) {
os << "(flush time(ms)=" << stat.flush_time_ns / NANOS_PER_MILLIS
<< ", flush wait time(ms)=" << stat.flush_wait_time_ns / NANOS_PER_MILLIS
<< ", flush submit count=" << stat.flush_submit_count
<< ", running flush count=" << stat.flush_running_count
<< ", finish flush count=" << stat.flush_finish_count
<< ", flush bytes: " << stat.flush_size_bytes
<< ", flush disk bytes: " << stat.flush_disk_size_bytes << ")";
return os;
}
Status FlushToken::submit(std::shared_ptr<MemTable> mem_table) {
{
std::shared_lock rdlk(_flush_status_lock);
DBUG_EXECUTE_IF("FlushToken.submit_flush_error", {
_flush_status = Status::IOError<false>("dbug_be_memtable_submit_flush_error");
});
if (!_flush_status.ok()) {
return _flush_status;
}
}
if (mem_table == nullptr || mem_table->empty()) {
return Status::OK();
}
int64_t submit_task_time = MonotonicNanos();
auto task = MemtableFlushTask::create_shared(
shared_from_this(), mem_table, _rowset_writer->allocate_segment_id(), submit_task_time);
// NOTE: we should guarantee WorkloadGroup is not deconstructed when submit memtable flush task.
// because currently WorkloadGroup's can only be destroyed when all queries in the group is finished,
// but not consider whether load channel is finish.
std::shared_ptr<WorkloadGroup> wg_sptr = _wg_wptr.lock();
ThreadPool* wg_thread_pool = nullptr;
if (wg_sptr) {
wg_thread_pool = wg_sptr->get_memtable_flush_pool();
}
Status ret = wg_thread_pool ? wg_thread_pool->submit(std::move(task))
: _thread_pool->submit(std::move(task));
if (ret.ok()) {
// _wait_running_task_finish was executed after this function, so no need to notify _cond here
_stats.flush_submit_count++;
}
return ret;
}
// NOTE: FlushToken's submit/cancel/wait run in one thread,
// so we don't need to make them mutually exclusive, std::atomic is enough.
void FlushToken::_wait_submit_task_finish() {
std::unique_lock<std::mutex> lock(_mutex);
_submit_task_finish_cond.wait(lock, [&]() { return _stats.flush_submit_count.load() == 0; });
}
void FlushToken::_wait_running_task_finish() {
std::unique_lock<std::mutex> lock(_mutex);
_running_task_finish_cond.wait(lock, [&]() { return _stats.flush_running_count.load() == 0; });
}
void FlushToken::cancel() {
_shutdown_flush_token();
_wait_running_task_finish();
}
Status FlushToken::wait() {
_wait_submit_task_finish();
{
std::shared_lock rdlk(_flush_status_lock);
if (!_flush_status.ok()) {
return _flush_status;
}
}
return Status::OK();
}
Status FlushToken::_try_reserve_memory(const std::shared_ptr<ResourceContext>& resource_context,
int64_t size) {
auto* thread_context = doris::thread_context();
auto* memtable_flush_executor =
ExecEnv::GetInstance()->storage_engine().memtable_flush_executor();
Status st;
int32_t max_waiting_time = config::memtable_wait_for_memory_sleep_time_s;
do {
// only try to reserve process memory
st = thread_context->thread_mem_tracker_mgr->try_reserve(
size, ThreadMemTrackerMgr::TryReserveChecker::CHECK_PROCESS);
if (st.ok()) {
memtable_flush_executor->inc_flushing_task();
break;
}
if (_is_shutdown() || resource_context->task_controller()->is_cancelled()) {
st = Status::Cancelled("flush memtable already cancelled");
break;
}
// Make sure at least one memtable is flushing even reserve memory failed.
if (memtable_flush_executor->check_and_inc_has_any_flushing_task()) {
// If there are already any flushing task, Wait for some time and retry.
LOG_EVERY_T(INFO, 60) << fmt::format(
"Failed to reserve memory {} for flush memtable, retry after 100ms",
PrettyPrinter::print_bytes(size));
std::this_thread::sleep_for(std::chrono::seconds(1));
max_waiting_time -= 1;
} else {
st = Status::OK();
break;
}
} while (max_waiting_time > 0);
return st;
}
Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t* flush_size) {
VLOG_CRITICAL << "begin to flush memtable for tablet: " << memtable->tablet_id()
<< ", memsize: " << PrettyPrinter::print_bytes(memtable->memory_usage())
<< ", rows: " << memtable->stat().raw_rows;
memtable->update_mem_type(MemType::FLUSH);
int64_t duration_ns = 0;
{
SCOPED_RAW_TIMER(&duration_ns);
SCOPED_ATTACH_TASK(memtable->resource_ctx());
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
memtable->resource_ctx()->memory_context()->mem_tracker()->write_tracker());
SCOPED_CONSUME_MEM_TRACKER(memtable->mem_tracker());
DEFER_RELEASE_RESERVED();
auto reserve_size = memtable->get_flush_reserve_memory_size();
if (memtable->resource_ctx()->task_controller()->is_enable_reserve_memory() &&
reserve_size > 0) {
RETURN_IF_ERROR(_try_reserve_memory(memtable->resource_ctx(), reserve_size));
}
Defer defer {[&]() {
ExecEnv::GetInstance()->storage_engine().memtable_flush_executor()->dec_flushing_task();
}};
std::unique_ptr<vectorized::Block> block;
RETURN_IF_ERROR(memtable->to_block(&block));
RETURN_IF_ERROR(_rowset_writer->flush_memtable(block.get(), segment_id, flush_size));
memtable->set_flush_success();
}
_memtable_stat += memtable->stat();
DorisMetrics::instance()->memtable_flush_total->increment(1);
DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 1000);
VLOG_CRITICAL << "after flush memtable for tablet: " << memtable->tablet_id()
<< ", flushsize: " << PrettyPrinter::print_bytes(*flush_size);
return Status::OK();
}
void FlushToken::_flush_memtable(std::shared_ptr<MemTable> memtable_ptr, int32_t segment_id,
int64_t submit_task_time) {
signal::set_signal_task_id(_rowset_writer->load_id());
signal::tablet_id = memtable_ptr->tablet_id();
Defer defer {[&]() {
std::lock_guard<std::mutex> lock(_mutex);
_stats.flush_submit_count--;
if (_stats.flush_submit_count == 0) {
_submit_task_finish_cond.notify_one();
}
_stats.flush_running_count--;
if (_stats.flush_running_count == 0) {
_running_task_finish_cond.notify_one();
}
}};
DBUG_EXECUTE_IF("FlushToken.flush_memtable.wait_before_first_shutdown",
{ std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); });
if (_is_shutdown()) {
return;
}
DBUG_EXECUTE_IF("FlushToken.flush_memtable.wait_after_first_shutdown",
{ std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); });
_stats.flush_running_count++;
// double check if shutdown to avoid wait running task finish count not accurate
if (_is_shutdown()) {
return;
}
DBUG_EXECUTE_IF("FlushToken.flush_memtable.wait_after_second_shutdown",
{ std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); });
uint64_t flush_wait_time_ns = MonotonicNanos() - submit_task_time;
_stats.flush_wait_time_ns += flush_wait_time_ns;
// If previous flush has failed, return directly
{
std::shared_lock rdlk(_flush_status_lock);
if (!_flush_status.ok()) {
return;
}
}
MonotonicStopWatch timer;
timer.start();
size_t memory_usage = memtable_ptr->memory_usage();
int64_t flush_size;
Status s = _do_flush_memtable(memtable_ptr.get(), segment_id, &flush_size);
{
std::shared_lock rdlk(_flush_status_lock);
if (!_flush_status.ok()) {
return;
}
}
if (!s.ok()) {
std::lock_guard wrlk(_flush_status_lock);
LOG(WARNING) << "Flush memtable failed with res = " << s
<< ", load_id: " << print_id(_rowset_writer->load_id());
_flush_status = s;
return;
}
VLOG_CRITICAL << "flush memtable wait time: "
<< PrettyPrinter::print(flush_wait_time_ns, TUnit::TIME_NS)
<< ", flush memtable cost: "
<< PrettyPrinter::print(timer.elapsed_time(), TUnit::TIME_NS)
<< ", submit count: " << _stats.flush_submit_count
<< ", running count: " << _stats.flush_running_count
<< ", finish count: " << _stats.flush_finish_count
<< ", mem size: " << PrettyPrinter::print_bytes(memory_usage)
<< ", disk size: " << PrettyPrinter::print_bytes(flush_size);
_stats.flush_time_ns += timer.elapsed_time();
_stats.flush_finish_count++;
_stats.flush_size_bytes += memtable_ptr->memory_usage();
_stats.flush_disk_size_bytes += flush_size;
}
void MemTableFlushExecutor::init(int num_disk) {
num_disk = std::max(1, num_disk);
int num_cpus = std::thread::hardware_concurrency();
int min_threads = std::max(1, config::flush_thread_num_per_store);
int max_threads = num_cpus == 0 ? num_disk * min_threads
: std::min(num_disk * min_threads,
num_cpus * config::max_flush_thread_num_per_cpu);
static_cast<void>(ThreadPoolBuilder("MemTableFlushThreadPool")
.set_min_threads(min_threads)
.set_max_threads(max_threads)
.build(&_flush_pool));
min_threads = std::max(1, config::high_priority_flush_thread_num_per_store);
max_threads = num_cpus == 0 ? num_disk * min_threads
: std::min(num_disk * min_threads,
num_cpus * config::max_flush_thread_num_per_cpu);
static_cast<void>(ThreadPoolBuilder("MemTableHighPriorityFlushThreadPool")
.set_min_threads(min_threads)
.set_max_threads(max_threads)
.build(&_high_prio_flush_pool));
}
// NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are flushed in order.
Status MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>& flush_token,
std::shared_ptr<RowsetWriter> rowset_writer,
bool is_high_priority,
std::shared_ptr<WorkloadGroup> wg_sptr) {
switch (rowset_writer->type()) {
case ALPHA_ROWSET:
// alpha rowset do not support flush in CONCURRENT. and not support alpha rowset now.
return Status::InternalError<false>("not support alpha rowset load now.");
case BETA_ROWSET: {
// beta rowset can be flush in CONCURRENT, because each memtable using a new segment writer.
ThreadPool* pool = is_high_priority ? _high_prio_flush_pool.get() : _flush_pool.get();
flush_token = FlushToken::create_shared(pool, wg_sptr);
flush_token->set_rowset_writer(rowset_writer);
return Status::OK();
}
default:
return Status::InternalError<false>("unknown rowset type.");
}
}
} // namespace doris