blob: 3b5a12a12ed7a5b2bda3653dbb7d91965c2f84af [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/apache/impala/blob/branch-2.9.0/be/src/util/blocking-priority-queue.hpp
// and modified by Doris
#pragma once
#include <unistd.h>
#include <cassert>
#include <condition_variable>
#include <cstdint>
#include <mutex>
#include <queue>
#include "common/config.h"
#include "util/stopwatch.hpp"
namespace doris {
#include "common/compile_check_begin.h"
// Fixed capacity FIFO queue, where both blocking_get and blocking_put operations block
// if the queue is empty or full, respectively.
template <typename T>
class BlockingPriorityQueue {
public:
BlockingPriorityQueue(uint32_t max_elements)
: _shutdown(false),
_max_element(max_elements),
_upgrade_counter(0),
_total_get_wait_time(0),
_total_put_wait_time(0),
_get_waiting(0),
_put_waiting(0) {}
// Get an element from the queue, waiting indefinitely (or until timeout) for one to become available.
// Returns false if we were shut down prior to getting the element, and there
// are no more elements available.
// -- timeout_ms: 0 means wait indefinitely
bool blocking_get(T* out, uint32_t timeout_ms = 0) {
MonotonicStopWatch timer;
timer.start();
std::unique_lock unique_lock(_lock);
bool wait_successful = false;
if (timeout_ms > 0) {
while (!(_shutdown || !_queue.empty())) {
++_get_waiting;
if (_get_cv.wait_for(unique_lock, std::chrono::milliseconds(timeout_ms)) ==
std::cv_status::timeout) {
// timeout
wait_successful = _shutdown || !_queue.empty();
break;
}
}
} else {
while (!(_shutdown || !_queue.empty())) {
++_get_waiting;
_get_cv.wait(unique_lock);
}
wait_successful = true;
}
_total_get_wait_time += timer.elapsed_time();
if (wait_successful) {
if (_upgrade_counter > config::priority_queue_remaining_tasks_increased_frequency) {
std::priority_queue<T> tmp_queue;
while (!_queue.empty()) {
T v = _queue.top();
_queue.pop();
++v;
tmp_queue.push(v);
}
swap(_queue, tmp_queue);
_upgrade_counter = 0;
}
if (!_queue.empty()) {
*out = _queue.top();
_queue.pop();
++_upgrade_counter;
if (_put_waiting > 0) {
--_put_waiting;
unique_lock.unlock();
_put_cv.notify_one();
}
return true;
} else {
assert(_shutdown);
return false;
}
} else {
//time out
assert(!_shutdown);
return false;
}
}
bool non_blocking_get(T* out) {
MonotonicStopWatch timer;
timer.start();
std::unique_lock unique_lock(_lock);
if (!_queue.empty()) {
// 定期提高队列中残留的任务优先级
// 保证优先级较低的大查询不至于完全饿死
if (_upgrade_counter > config::priority_queue_remaining_tasks_increased_frequency) {
std::priority_queue<T> tmp_queue;
while (!_queue.empty()) {
T v = _queue.top();
_queue.pop();
++v;
tmp_queue.push(v);
}
swap(_queue, tmp_queue);
_upgrade_counter = 0;
}
*out = _queue.top();
_queue.pop();
++_upgrade_counter;
_total_get_wait_time += timer.elapsed_time();
if (_put_waiting > 0) {
--_put_waiting;
unique_lock.unlock();
_put_cv.notify_one();
}
return true;
}
return false;
}
// Puts an element into the queue, waiting indefinitely until there is space.
// If the queue is shut down, returns false.
bool blocking_put(const T& val) {
MonotonicStopWatch timer;
timer.start();
std::unique_lock unique_lock(_lock);
while (!(_shutdown || _queue.size() < _max_element)) {
++_put_waiting;
_put_cv.wait(unique_lock);
}
_total_put_wait_time += timer.elapsed_time();
if (_shutdown) {
return false;
}
_queue.push(val);
if (_get_waiting > 0) {
--_get_waiting;
unique_lock.unlock();
_get_cv.notify_one();
}
return true;
}
// Return false if queue full or has been shutdown.
bool try_put(const T& val) {
std::unique_lock unique_lock(_lock);
if (_queue.size() < _max_element && !_shutdown) {
_queue.push(val);
if (_get_waiting > 0) {
--_get_waiting;
unique_lock.unlock();
_get_cv.notify_one();
}
return true;
}
return false;
}
// Shut down the queue. Wakes up all threads waiting on blocking_get or blocking_put.
void shutdown() {
{
std::lock_guard l(_lock);
_shutdown = true;
}
_get_cv.notify_all();
_put_cv.notify_all();
}
uint32_t get_size() const {
std::lock_guard l(_lock);
return static_cast<uint32_t>(_queue.size());
}
uint32_t get_capacity() const { return _max_element; }
// Returns the total amount of time threads have blocked in blocking_get.
uint64_t total_get_wait_time() const { return _total_get_wait_time; }
// Returns the total amount of time threads have blocked in blocking_put.
uint64_t total_put_wait_time() const { return _total_put_wait_time; }
private:
bool _shutdown;
const int _max_element;
std::condition_variable _get_cv; // 'get' callers wait on this
std::condition_variable _put_cv; // 'put' callers wait on this
// _lock guards access to _queue, total_get_wait_time, and total_put_wait_time
mutable std::mutex _lock;
std::priority_queue<T> _queue;
int _upgrade_counter;
std::atomic<uint64_t> _total_get_wait_time;
std::atomic<uint64_t> _total_put_wait_time;
size_t _get_waiting;
size_t _put_waiting;
};
#include "common/compile_check_end.h"
} // namespace doris