blob: ef92f5a945ee0a1b1b7a350e18c7a6d22509ce4d [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <atomic>
#include <memory>
#include "common/status.h"
#include "scan_task_queue.h"
#include "util/threadpool.h"
#include "vec/exec/scan/vscanner.h"
namespace doris {
class ExecEnv;
namespace vectorized {
class VScanner;
} // namespace vectorized
namespace taskgroup {
class ScanTaskTaskGroupQueue;
}
template <typename T>
class BlockingQueue;
} // namespace doris
namespace doris::vectorized {
class ScannerContext;
// Responsible for the scheduling and execution of all Scanners of a BE node.
// ScannerScheduler has two types of thread pools:
// 1. Scheduling thread pool
// Responsible for Scanner scheduling.
// A set of Scanners for a query will be encapsulated into a ScannerContext
// and submitted to the ScannerScheduler's scheduling queue.
// There are multiple scheduling queues in ScannerScheduler, and each scheduling queue
// is handled by a scheduling thread.
// The scheduling thread is scheduled in granularity of ScannerContext,
// that is, a group of Scanners in a ScannerContext are scheduled at a time.
//
//2. Execution thread pool
// The scheduling thread will submit the Scanners selected from the ScannerContext
// to the execution thread pool to do the actual scan task.
// Each Scanner will act as a producer, read a group of blocks and put them into
// the corresponding block queue.
// The corresponding ScanNode will act as a consumer to consume blocks from the block queue.
class ScannerScheduler {
public:
ScannerScheduler();
~ScannerScheduler();
Status init(ExecEnv* env);
[[nodiscard]] Status submit(ScannerContext* ctx);
std::unique_ptr<ThreadPoolToken> new_limited_scan_pool_token(ThreadPool::ExecutionMode mode,
int max_concurrency);
taskgroup::ScanTaskTaskGroupQueue* local_scan_task_queue() {
return _task_group_local_scan_queue.get();
}
int remote_thread_pool_max_size() const { return _remote_thread_pool_max_size; }
private:
// scheduling thread function
void _schedule_thread(int queue_id);
// schedule scanners in a certain ScannerContext
void _schedule_scanners(ScannerContext* ctx);
// execution thread function
void _scanner_scan(ScannerScheduler* scheduler, ScannerContext* ctx, VScannerSPtr scanner);
void _register_metrics();
static void _deregister_metrics();
void _task_group_scanner_scan(ScannerScheduler* scheduler,
taskgroup::ScanTaskTaskGroupQueue* scan_queue);
// Scheduling queue number.
// TODO: make it configurable.
static const int QUEUE_NUM = 4;
// The ScannerContext will be submitted to the pending queue roundrobin.
// _queue_idx pointer to the current queue.
// Use std::atomic_uint to prevent numerical overflow from memory out of bound.
// The scheduler thread will take ctx from pending queue, schedule it,
// and put it to the _scheduling_map.
// If any scanner finish, it will take ctx from and put it to pending queue again.
std::atomic_uint _queue_idx = {0};
BlockingQueue<ScannerContext*>** _pending_queues;
// scheduling thread pool
std::unique_ptr<ThreadPool> _scheduler_pool;
// execution thread pool
// _local_scan_thread_pool is for local scan task(typically, olap scanner)
// _remote_scan_thread_pool is for remote scan task(cold data on s3, hdfs, etc.)
// _limited_scan_thread_pool is a special pool for queries with resource limit
std::unique_ptr<PriorityThreadPool> _local_scan_thread_pool;
std::unique_ptr<PriorityThreadPool> _remote_scan_thread_pool;
std::unique_ptr<ThreadPool> _limited_scan_thread_pool;
std::unique_ptr<taskgroup::ScanTaskTaskGroupQueue> _task_group_local_scan_queue;
std::unique_ptr<ThreadPool> _group_local_scan_thread_pool;
// true is the scheduler is closed.
std::atomic_bool _is_closed = {false};
bool _is_init = false;
int _remote_thread_pool_max_size;
};
} // namespace doris::vectorized