blob: 5a01a6c9551c84d3f0f0b15af403e44d44a4984a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <butil/macros.h>
#include <gen_cpp/BackendService_types.h>
#include <gen_cpp/Types_types.h>
#include <stddef.h>
#include <stdint.h>
#include <functional>
#include <map>
#include <memory>
#include <mutex>
#include <set>
#include <shared_mutex>
#include <string>
#include <string_view>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include "common/status.h"
#include "olap/olap_common.h"
#include "olap/tablet.h"
#include "olap/tablet_meta.h"
#include "runtime/query_context.h"
namespace doris {
enum class FileMappingType {
INTERNAL, // for doris format file {tablet_id}{rowset_id}{segment_id}
EXTERNAL, // for external table.
};
struct InternalFileMappingInfo {
int64_t tablet_id;
RowsetId rowset_id;
uint32_t segment_id;
std::string to_string() const {
std::string value;
value.resize(sizeof(tablet_id) + sizeof(rowset_id) + sizeof(segment_id));
auto* ptr = value.data();
memcpy(ptr, &tablet_id, sizeof(tablet_id));
ptr += sizeof(tablet_id);
memcpy(ptr, &rowset_id, sizeof(rowset_id));
ptr += sizeof(rowset_id);
memcpy(ptr, &segment_id, sizeof(segment_id));
return value;
}
};
struct ExternalFileMappingInfo {
/* By recording the plan_node_id in fileMapping, the TFileScanRangeParams used in the scan phase can be found
* from QueryContext according to the plan_node_id. Because there are some important information in
* TFileScanRangeParams (needed when creating hdfs/s3 reader):
* 8: optional THdfsParams hdfs_params;
* 9: optional map<string, string> properties;
*/
int plan_node_id;
/*
* Record TFileRangeDesc external_scan_range_desc in fileMapping, usage:
* 1. If the file belongs to a partition, columns_from_path_keys and columns_from_path in TFileRangeDesc are needed when materializing the partition column
* 2. path, file_type, modification_time,compress_type .... used to read the file
* 3. TFileFormatType can distinguish whether it is iceberg/hive/hudi/paimon
*/
TFileRangeDesc scan_range_desc;
bool enable_file_meta_cache;
ExternalFileMappingInfo(int plan_node_id, const TFileRangeDesc& scan_range,
bool file_meta_cache)
: plan_node_id(plan_node_id),
scan_range_desc(scan_range),
enable_file_meta_cache(file_meta_cache) {}
std::string to_string() const {
std::string value;
value.resize(scan_range_desc.path.size() + sizeof(plan_node_id) +
sizeof(scan_range_desc.start_offset));
auto* ptr = value.data();
memcpy(ptr, &plan_node_id, sizeof(plan_node_id));
ptr += sizeof(plan_node_id);
memcpy(ptr, &scan_range_desc.start_offset, sizeof(scan_range_desc.start_offset));
ptr += sizeof(scan_range_desc.start_offset);
memcpy(ptr, scan_range_desc.path.data(), scan_range_desc.path.size());
return value;
}
};
struct FileMapping {
ENABLE_FACTORY_CREATOR(FileMapping);
FileMappingType type;
std::variant<InternalFileMappingInfo, ExternalFileMappingInfo> value;
FileMapping(int64_t tablet_id, RowsetId rowset_id, uint32_t segment_id)
: type(FileMappingType::INTERNAL),
value(std::in_place_type<InternalFileMappingInfo>, tablet_id, rowset_id, segment_id) {
}
FileMapping(int plan_node_id, const TFileRangeDesc& scan_range, bool enable_file_meta_cache)
: type(FileMappingType::EXTERNAL),
value(std::in_place_type<ExternalFileMappingInfo>, plan_node_id, scan_range,
enable_file_meta_cache) {}
std::tuple<int64_t, RowsetId, uint32_t> get_doris_format_info() const {
DCHECK(type == FileMappingType::INTERNAL);
auto info = std::get<InternalFileMappingInfo>(value);
return std::make_tuple(info.tablet_id, info.rowset_id, info.segment_id);
}
ExternalFileMappingInfo& get_external_file_info() {
DCHECK(type == FileMappingType::EXTERNAL);
return std::get<ExternalFileMappingInfo>(value);
}
static std::string file_mapping_info_to_string(
const std::variant<InternalFileMappingInfo, ExternalFileMappingInfo>& info) {
return std::visit(
[](const auto& info) -> std::string {
using T = std::decay_t<decltype(info)>;
if constexpr (std::is_same_v<T, InternalFileMappingInfo>) {
return info.to_string();
} else if constexpr (std::is_same_v<T, ExternalFileMappingInfo>) {
return info.to_string();
}
},
info);
}
std::string file_mapping_info_to_string() { return file_mapping_info_to_string(value); }
};
class IdFileMap {
public:
IdFileMap(uint64_t expired_timestamp) : delayed_expired_timestamp(expired_timestamp) {}
std::shared_ptr<FileMapping> get_file_mapping(uint32_t id) {
std::shared_lock lock(_mtx);
auto it = _id_map.find(id);
if (it == _id_map.end()) {
return nullptr;
}
return it->second;
}
uint32 get_file_mapping_id(const std::shared_ptr<FileMapping>& mapping) {
DCHECK(mapping.get() != nullptr);
auto value = mapping->file_mapping_info_to_string();
std::unique_lock lock(_mtx);
auto it = _mapping_to_id.find(value);
if (it != _mapping_to_id.end()) {
return it->second;
}
_id_map[_init_id++] = mapping;
_mapping_to_id[value] = _init_id - 1;
return _init_id - 1;
}
void add_temp_rowset(const RowsetSharedPtr& rowset) {
std::unique_lock lock(_mtx);
_temp_rowset_maps[{rowset->rowset_meta()->tablet_id(), rowset->rowset_id()}] = rowset;
}
RowsetSharedPtr get_temp_rowset(const int64_t tablet_id, const RowsetId& rowset_id) {
std::shared_lock lock(_mtx);
auto it = _temp_rowset_maps.find({tablet_id, rowset_id});
if (it == _temp_rowset_maps.end()) {
return nullptr;
}
return it->second;
}
int64_t get_delayed_expired_timestamp() { return delayed_expired_timestamp; }
void set_external_scan_params(QueryContext* query_ctx) {
std::call_once(once_flag_for_external, [&] {
DCHECK(query_ctx != nullptr);
_query_global = query_ctx->get_query_globals();
_query_options = query_ctx->get_query_options();
_file_scan_range_params_map = query_ctx->file_scan_range_params_map;
});
}
const TQueryGlobals& get_query_globals() const { return _query_global; }
const TQueryOptions& get_query_options() const { return _query_options; }
const std::map<int, TFileScanRangeParams>& get_external_scan_params() const {
return _file_scan_range_params_map;
}
private:
std::shared_mutex _mtx;
uint32_t _init_id = 0;
std::unordered_map<std::string, uint32_t> _mapping_to_id;
std::unordered_map<uint32_t, std::shared_ptr<FileMapping>> _id_map;
// use in scan external table
TQueryGlobals _query_global;
TQueryOptions _query_options;
std::map<int, TFileScanRangeParams> _file_scan_range_params_map;
std::once_flag once_flag_for_external;
// use in Doris Format to keep temp rowsets, preventing them from being deleted by compaction
std::unordered_map<std::pair<int64_t, RowsetId>, RowsetSharedPtr> _temp_rowset_maps;
uint64_t delayed_expired_timestamp = 0;
};
class IdManager {
public:
static constexpr uint8_t ID_VERSION = 0;
IdManager() = default;
~IdManager() {
std::unique_lock lock(_query_to_id_file_map_mtx);
_query_to_id_file_map.clear();
}
std::shared_ptr<IdFileMap> add_id_file_map(const UniqueId& query_id, int timeout) {
std::unique_lock lock(_query_to_id_file_map_mtx);
auto it = _query_to_id_file_map.find(query_id);
if (it == _query_to_id_file_map.end()) {
auto id_file_map = std::make_shared<IdFileMap>(UnixSeconds() + timeout);
_query_to_id_file_map[query_id] = id_file_map;
return id_file_map;
}
return it->second;
}
void gc_expired_id_file_map(int64_t now) {
std::unique_lock lock(_query_to_id_file_map_mtx);
for (auto it = _query_to_id_file_map.begin(); it != _query_to_id_file_map.end();) {
if (it->second->get_delayed_expired_timestamp() <= now) {
it = _query_to_id_file_map.erase(it);
} else {
++it;
}
}
}
void remove_id_file_map(const UniqueId& query_id) {
std::unique_lock lock(_query_to_id_file_map_mtx);
_query_to_id_file_map.erase(query_id);
}
std::shared_ptr<IdFileMap> get_id_file_map(const UniqueId& query_id) {
std::shared_lock lock(_query_to_id_file_map_mtx);
auto it = _query_to_id_file_map.find(query_id);
if (it == _query_to_id_file_map.end()) {
return nullptr;
}
return it->second;
}
private:
DISALLOW_COPY_AND_ASSIGN(IdManager);
phmap::flat_hash_map<UniqueId, std::shared_ptr<IdFileMap>> _query_to_id_file_map;
std::shared_mutex _query_to_id_file_map_mtx;
};
} // namespace doris