blob: 82be3d9faad00f73d4c715b4310f4f53f04e919f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <list>
#include "common/cast_set.h"
#include "common/config.h"
#include "common/factory_creator.h"
#include "common/status.h"
#include "util/threadpool.h"
namespace doris::vectorized {
#include "common/compile_check_begin.h"
class VOlapTableSink;
class OlapTableBlockConvertor;
struct AutoIncIDAllocator {
int64_t next_id() {
DCHECK(!ids.empty());
if (ids.front().second > 0) {
--ids.front().second;
--total_count;
return ids.front().first++;
}
ids.pop_front();
DCHECK(!ids.empty() && ids.front().second > 0);
--ids.front().second;
--total_count;
return ids.front().first++;
}
void insert_ids(int64_t start, size_t length) {
total_count += length;
ids.emplace_back(start, length);
}
size_t total_count {0};
std::list<std::pair<int64_t, size_t>> ids;
};
class AutoIncIDBuffer {
ENABLE_FACTORY_CREATOR(AutoIncIDBuffer);
// GenericReader::_MIN_BATCH_SIZE = 4064
static constexpr size_t MIN_BATCH_SIZE = 4064;
public:
// all public functions are thread safe
AutoIncIDBuffer(int64_t _db_id, int64_t _table_id, int64_t column_id);
void set_batch_size_at_least(size_t batch_size);
Status sync_request_ids(size_t request_length, std::vector<std::pair<int64_t, size_t>>* result);
struct AutoIncRange {
int64_t start;
size_t length;
bool empty() const { return length == 0; }
void consume(size_t l) {
start += l;
length -= l;
}
};
private:
[[nodiscard]] size_t _prefetch_size() const {
return _batch_size * config::auto_inc_prefetch_size_ratio;
}
[[nodiscard]] size_t _low_water_level_mark() const {
return _batch_size * config::auto_inc_low_water_level_mark_size_ratio;
};
void _get_autoinc_ranges_from_buffers(size_t& request_length,
std::vector<std::pair<int64_t, size_t>>* result);
Status _launch_async_fetch_task(size_t length);
Result<int64_t> _fetch_ids_from_fe(size_t length);
std::atomic<size_t> _batch_size {MIN_BATCH_SIZE};
int64_t _db_id;
int64_t _table_id;
int64_t _column_id;
std::unique_ptr<ThreadPoolToken> _rpc_token;
Status _rpc_status {Status::OK()};
std::atomic<bool> _is_fetching {false};
std::mutex _mutex;
mutable std::mutex _latch;
std::list<AutoIncRange> _buffers;
size_t _current_volume {0};
};
class GlobalAutoIncBuffers {
public:
static GlobalAutoIncBuffers* GetInstance() {
static GlobalAutoIncBuffers buffers;
return &buffers;
}
GlobalAutoIncBuffers() {
static_cast<void>(ThreadPoolBuilder("AsyncFetchAutoIncIDExecutor")
.set_min_threads(cast_set<int>(config::auto_inc_fetch_thread_num))
.set_max_threads(cast_set<int>(config::auto_inc_fetch_thread_num))
.set_max_queue_size(std::numeric_limits<int>::max())
.build(&_fetch_autoinc_id_executor));
}
~GlobalAutoIncBuffers() = default;
std::unique_ptr<ThreadPoolToken> create_token() {
return _fetch_autoinc_id_executor->new_token(ThreadPool::ExecutionMode::CONCURRENT);
}
std::shared_ptr<AutoIncIDBuffer> get_auto_inc_buffer(int64_t db_id, int64_t table_id,
int64_t column_id) {
std::lock_guard<std::mutex> lock(_mutex);
auto key = std::make_tuple(db_id, table_id, column_id);
auto it = _buffers.find(key);
if (it == _buffers.end()) {
_buffers.emplace(key, AutoIncIDBuffer::create_shared(db_id, table_id, column_id));
}
return _buffers[{db_id, table_id, column_id}];
}
private:
std::unique_ptr<ThreadPool> _fetch_autoinc_id_executor;
std::map<std::tuple<int64_t, int64_t, int64_t>, std::shared_ptr<AutoIncIDBuffer>> _buffers;
std::mutex _mutex;
};
} // namespace doris::vectorized
#include "common/compile_check_end.h"