blob: f5d4489ea3d2c6b92339fdc8acdf60af702f991c [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <atomic>
#include <memory>
#include <string>
#include <thread>
#include <vector>
namespace doris {
// A thread-safe counter for tracking concurrent operations
// Uses atomic variable for high-performance concurrent access
class ConcurrencyCounter {
public:
explicit ConcurrencyCounter(std::string name) : _name(std::move(name)), _count(0) {}
// Increment the counter
void increment() { _count.fetch_add(1, std::memory_order_relaxed); }
// Decrement the counter
void decrement() { _count.fetch_sub(1, std::memory_order_relaxed); }
// Get current value
int64_t value() const { return _count.load(std::memory_order_relaxed); }
const std::string& name() const { return _name; }
// RAII helper for automatic increment/decrement
class Guard {
public:
explicit Guard(ConcurrencyCounter* counter) : _counter(counter) {
if (_counter) {
_counter->increment();
}
}
~Guard() {
if (_counter) {
_counter->decrement();
}
}
Guard(const Guard&) = delete;
Guard& operator=(const Guard&) = delete;
private:
ConcurrencyCounter* _counter;
};
private:
std::string _name;
std::atomic<int64_t> _count;
};
// Singleton manager for all concurrency counters
// All counters are defined here in order
class ConcurrencyStatsManager {
public:
static ConcurrencyStatsManager& instance();
// Start the background thread for periodic logging
void start();
// Stop the background thread
void stop();
// Manually dump all counters to log
void dump_to_log();
// Access to individual counters (defined in order of read path from top to bottom)
ConcurrencyCounter* vscanner_get_block;
ConcurrencyCounter* segment_iterator_next_batch;
ConcurrencyCounter* column_reader_read_page;
ConcurrencyCounter* page_io_decompress;
ConcurrencyCounter* page_io_pre_decode;
ConcurrencyCounter* page_io_insert_page_cache;
ConcurrencyCounter* cached_remote_reader_read_at;
ConcurrencyCounter* cached_remote_reader_get_or_set;
ConcurrencyCounter* cached_remote_reader_get_or_set_wait_lock;
ConcurrencyCounter* cached_remote_reader_write_back;
ConcurrencyCounter* cached_remote_reader_blocking;
ConcurrencyCounter* cached_remote_reader_local_read;
ConcurrencyCounter* s3_file_reader_read;
private:
ConcurrencyStatsManager();
~ConcurrencyStatsManager();
ConcurrencyStatsManager(const ConcurrencyStatsManager&) = delete;
ConcurrencyStatsManager& operator=(const ConcurrencyStatsManager&) = delete;
void _dump_thread_func();
// All counters in the order they should be printed
std::vector<ConcurrencyCounter*> _counters;
std::atomic<bool> _running;
std::unique_ptr<std::thread> _dump_thread;
};
// Macro for scoped counting
#define SCOPED_CONCURRENCY_COUNT_IMPL(counter_ptr, unique_id) \
doris::ConcurrencyCounter::Guard _concurrency_guard_##unique_id(counter_ptr)
#define SCOPED_CONCURRENCY_COUNT_HELPER(counter_ptr, line) \
SCOPED_CONCURRENCY_COUNT_IMPL(counter_ptr, line)
#define SCOPED_CONCURRENCY_COUNT(counter_ptr) SCOPED_CONCURRENCY_COUNT_HELPER(counter_ptr, __LINE__)
} // namespace doris