blob: 377f4cce67e355db20771301abd8f8392044a860 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "io/fs/packed_file_manager.h"
#include <bvar/bvar.h>
#include <bvar/recorder.h>
#include <bvar/window.h>
#include <algorithm>
#include <chrono>
#include <cstdint>
#include <ctime>
#include <functional>
#include <limits>
#include <optional>
#include <random>
#include <sstream>
#include <unordered_set>
#ifdef BE_TEST
#include "cpp/sync_point.h"
#endif
#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_storage_engine.h"
#include "cloud/config.h"
#include "common/config.h"
#include "gen_cpp/cloud.pb.h"
#include "io/fs/packed_file_trailer.h"
#include "olap/storage_engine.h"
#include "runtime/exec_env.h"
#include "util/coding.h"
#include "util/uid_util.h"
namespace doris::io {
namespace {
bvar::Adder<int64_t> g_packed_file_total_count("packed_file", "total_count");
bvar::Adder<int64_t> g_packed_file_total_small_file_count("packed_file", "total_small_file_num");
bvar::Adder<int64_t> g_packed_file_total_size_bytes("packed_file", "total_size_bytes");
bvar::IntRecorder g_packed_file_small_file_num_recorder;
bvar::IntRecorder g_packed_file_file_size_recorder;
bvar::Window<bvar::IntRecorder> g_packed_file_avg_small_file_num(
"packed_file_avg_small_file_num", &g_packed_file_small_file_num_recorder,
/*window_size=*/10);
bvar::Window<bvar::IntRecorder> g_packed_file_avg_file_size_bytes("packed_file_avg_file_size_bytes",
&g_packed_file_file_size_recorder,
/*window_size=*/10);
bvar::IntRecorder g_packed_file_active_to_ready_ms_recorder;
bvar::IntRecorder g_packed_file_ready_to_upload_ms_recorder;
bvar::IntRecorder g_packed_file_uploading_to_uploaded_ms_recorder;
bvar::Window<bvar::IntRecorder> g_packed_file_active_to_ready_ms_window(
"packed_file_active_to_ready_ms", &g_packed_file_active_to_ready_ms_recorder,
/*window_size=*/10);
bvar::Window<bvar::IntRecorder> g_packed_file_ready_to_upload_ms_window(
"packed_file_ready_to_upload_ms", &g_packed_file_ready_to_upload_ms_recorder,
/*window_size=*/10);
bvar::Window<bvar::IntRecorder> g_packed_file_uploading_to_uploaded_ms_window(
"packed_file_uploading_to_uploaded_ms", &g_packed_file_uploading_to_uploaded_ms_recorder,
/*window_size=*/10);
Status append_packed_info_trailer(FileWriter* writer, const std::string& packed_file_path,
const cloud::PackedFileInfoPB& packed_file_info) {
if (writer == nullptr) {
return Status::InternalError("File writer is null for packed file: {}", packed_file_path);
}
if (writer->state() == FileWriter::State::CLOSED) {
return Status::InternalError("File writer already closed for packed file: {}",
packed_file_path);
}
cloud::PackedFileFooterPB footer_pb;
footer_pb.mutable_packed_file_info()->CopyFrom(packed_file_info);
std::string serialized_footer;
if (!footer_pb.SerializeToString(&serialized_footer)) {
return Status::InternalError("Failed to serialize packed file footer info for {}",
packed_file_path);
}
if (serialized_footer.size() >
std::numeric_limits<uint32_t>::max() - kPackedFileTrailerSuffixSize) {
return Status::InternalError("PackedFileFooterPB too large for {}", packed_file_path);
}
std::string trailer;
trailer.reserve(serialized_footer.size() + kPackedFileTrailerSuffixSize);
trailer.append(serialized_footer);
put_fixed32_le(&trailer, static_cast<uint32_t>(serialized_footer.size()));
put_fixed32_le(&trailer, kPackedFileTrailerVersion);
return writer->append(Slice(trailer));
}
} // namespace
PackedFileManager* PackedFileManager::instance() {
static PackedFileManager instance;
return &instance;
}
PackedFileManager::~PackedFileManager() {
stop_background_manager();
}
Status PackedFileManager::init() {
return Status::OK();
}
Status PackedFileManager::create_new_packed_file_context(
const std::string& resource_id, std::unique_ptr<PackedFileContext>& packed_file_ctx) {
FileSystemSPtr file_system;
RETURN_IF_ERROR(ensure_file_system(resource_id, &file_system));
if (file_system == nullptr) {
return Status::InternalError("File system is not available for packed file creation: " +
resource_id);
}
auto uuid = generate_uuid_string();
auto hash_val = std::hash<std::string> {}(uuid);
uint16_t path_bucket = hash_val % 4096 + 1;
std::stringstream path_stream;
path_stream << "data/packed_file/" << path_bucket << "/" << uuid << ".bin";
packed_file_ctx = std::make_unique<PackedFileContext>();
const std::string relative_path = path_stream.str();
packed_file_ctx->packed_file_path = relative_path;
packed_file_ctx->create_time = std::time(nullptr);
packed_file_ctx->create_timestamp = std::chrono::steady_clock::now();
packed_file_ctx->state = PackedFileState::INIT;
packed_file_ctx->resource_id = resource_id;
packed_file_ctx->file_system = std::move(file_system);
// Create file writer for the packed file
FileWriterPtr new_writer;
FileWriterOptions opts;
RETURN_IF_ERROR(
packed_file_ctx->file_system->create_file(Path(relative_path), &new_writer, &opts));
packed_file_ctx->writer = std::move(new_writer);
return Status::OK();
}
Status PackedFileManager::ensure_file_system(const std::string& resource_id,
FileSystemSPtr* file_system) {
if (file_system == nullptr) {
return Status::InvalidArgument("file_system output parameter is null");
}
{
std::lock_guard<std::mutex> lock(_file_system_mutex);
if (resource_id.empty()) {
if (_default_file_system != nullptr) {
*file_system = _default_file_system;
return Status::OK();
}
} else {
auto it = _file_systems.find(resource_id);
if (it != _file_systems.end()) {
*file_system = it->second;
return Status::OK();
}
}
}
if (!config::is_cloud_mode()) {
return Status::InternalError("Cloud file system is not available in local mode");
}
auto* exec_env = ExecEnv::GetInstance();
if (exec_env == nullptr) {
return Status::InternalError("ExecEnv instance is not initialized");
}
FileSystemSPtr resolved_fs;
if (resource_id.empty()) {
resolved_fs = exec_env->storage_engine().to_cloud().latest_fs();
if (resolved_fs == nullptr) {
return Status::InternalError("Cloud file system is not ready");
}
} else {
auto storage_resource =
exec_env->storage_engine().to_cloud().get_storage_resource(resource_id);
if (!storage_resource.has_value() || storage_resource->fs == nullptr) {
return Status::InternalError("Cloud file system is not ready for resource: " +
resource_id);
}
resolved_fs = storage_resource->fs;
}
{
std::lock_guard<std::mutex> lock(_file_system_mutex);
if (resource_id.empty()) {
_default_file_system = resolved_fs;
*file_system = _default_file_system;
} else {
_file_systems[resource_id] = resolved_fs;
*file_system = resolved_fs;
}
}
return Status::OK();
}
Status PackedFileManager::append_small_file(const std::string& path, const Slice& data,
const PackedAppendContext& info) {
// Check if file is too large to be merged
if (data.get_size() > config::small_file_threshold_bytes) {
return Status::OK(); // Skip merging for large files
}
if (info.txn_id <= 0) {
return Status::InvalidArgument("Missing valid txn id for packed file append: " + path);
}
std::lock_guard<std::timed_mutex> lock(_current_packed_file_mutex);
auto& current_state = _current_packed_files[info.resource_id];
if (!current_state || !current_state->writer) {
RETURN_IF_ERROR(create_new_packed_file_context(info.resource_id, current_state));
}
// Check if we need to create a new packed file
if (current_state->total_size + data.get_size() >= config::packed_file_size_threshold_bytes) {
RETURN_IF_ERROR(mark_current_packed_file_for_upload_locked(info.resource_id));
auto it = _current_packed_files.find(info.resource_id);
if (it == _current_packed_files.end() || !it->second || !it->second->writer) {
RETURN_IF_ERROR(create_new_packed_file_context(
info.resource_id, _current_packed_files[info.resource_id]));
}
}
PackedFileContext* active_state = current_state.get();
// Write data to current packed file
RETURN_IF_ERROR(active_state->writer->append(data));
// Update index
PackedSliceLocation location;
location.packed_file_path = active_state->packed_file_path;
location.offset = active_state->current_offset;
location.size = data.get_size();
location.tablet_id = info.tablet_id;
location.rowset_id = info.rowset_id;
location.resource_id = info.resource_id;
location.txn_id = info.txn_id;
active_state->slice_locations[path] = location;
active_state->current_offset += data.get_size();
active_state->total_size += data.get_size();
// Rotate packed file when small file count reaches threshold
if (config::packed_file_small_file_count_threshold > 0 &&
static_cast<int64_t>(active_state->slice_locations.size()) >=
config::packed_file_small_file_count_threshold) {
RETURN_IF_ERROR(mark_current_packed_file_for_upload_locked(info.resource_id));
}
// Mark as active if this is the first write
if (!active_state->first_append_timestamp.has_value()) {
active_state->first_append_timestamp = std::chrono::steady_clock::now();
}
if (active_state->state == PackedFileState::INIT) {
active_state->state = PackedFileState::ACTIVE;
}
// Update global index
{
std::lock_guard<std::mutex> global_lock(_global_index_mutex);
_global_slice_locations[path] = location;
}
return Status::OK();
}
Status PackedFileManager::wait_for_packed_file_upload(PackedFileContext* packed_file_ptr) {
std::unique_lock<std::mutex> upload_lock(packed_file_ptr->upload_mutex);
packed_file_ptr->upload_cv.wait(upload_lock, [packed_file_ptr] {
auto state = packed_file_ptr->state.load(std::memory_order_acquire);
return state == PackedFileState::UPLOADED || state == PackedFileState::FAILED;
});
if (packed_file_ptr->state == PackedFileState::FAILED) {
std::string err = packed_file_ptr->last_error;
if (err.empty()) {
err = "Packed file upload failed";
}
return Status::InternalError(err);
}
return Status::OK();
}
Status PackedFileManager::wait_upload_done(const std::string& path) {
std::string packed_file_path;
{
std::lock_guard<std::mutex> global_lock(_global_index_mutex);
auto it = _global_slice_locations.find(path);
if (it == _global_slice_locations.end()) {
return Status::InternalError("File not found in global index: " + path);
}
packed_file_path = it->second.packed_file_path;
}
// Find the packed file in uploaded files first - if already uploaded, no need to wait
std::shared_ptr<PackedFileContext> managed_packed_file;
std::shared_ptr<PackedFileContext> failed_packed_file;
{
std::lock_guard<std::mutex> lock(_packed_files_mutex);
auto uploaded_it = _uploaded_packed_files.find(packed_file_path);
if (uploaded_it != _uploaded_packed_files.end()) {
auto state = uploaded_it->second->state.load(std::memory_order_acquire);
if (state == PackedFileState::UPLOADED) {
return Status::OK(); // Already uploaded, no need to wait
}
if (state == PackedFileState::FAILED) {
failed_packed_file = uploaded_it->second;
} else {
managed_packed_file = uploaded_it->second;
}
}
}
if (failed_packed_file) {
std::lock_guard<std::mutex> upload_lock(failed_packed_file->upload_mutex);
std::string err = failed_packed_file->last_error;
if (err.empty()) {
err = "Merge file upload failed";
}
return Status::InternalError(err);
}
// Find the packed file in either current or uploading files
PackedFileContext* packed_file_ptr = nullptr;
{
std::lock_guard<std::timed_mutex> current_lock(_current_packed_file_mutex);
for (auto& [resource_id, state] : _current_packed_files) {
if (state && state->packed_file_path == packed_file_path) {
packed_file_ptr = state.get();
break;
}
}
}
if (!packed_file_ptr) {
std::lock_guard<std::mutex> lock(_packed_files_mutex);
auto uploading_it = _uploading_packed_files.find(packed_file_path);
if (uploading_it != _uploading_packed_files.end()) {
managed_packed_file = uploading_it->second;
packed_file_ptr = managed_packed_file.get();
} else {
auto uploaded_it = _uploaded_packed_files.find(packed_file_path);
if (uploaded_it != _uploaded_packed_files.end()) {
managed_packed_file = uploaded_it->second;
packed_file_ptr = managed_packed_file.get();
}
}
}
if (!packed_file_ptr) {
// Packed file not found in any location, this is unexpected
return Status::InternalError("Packed file not found for path: " + path);
}
Status wait_status = wait_for_packed_file_upload(packed_file_ptr);
(void)managed_packed_file; // keep shared ownership alive during wait
return wait_status;
}
Status PackedFileManager::get_packed_slice_location(const std::string& path,
PackedSliceLocation* location) {
std::lock_guard<std::mutex> lock(_global_index_mutex);
auto it = _global_slice_locations.find(path);
if (it == _global_slice_locations.end()) {
return Status::NotFound("File not found in global packed index: {}", path);
}
*location = it->second;
return Status::OK();
}
void PackedFileManager::start_background_manager() {
if (_background_thread) {
return; // Already started
}
_stop_background_thread = false;
_background_thread = std::make_unique<std::thread>([this] { background_manager(); });
}
void PackedFileManager::stop_background_manager() {
_stop_background_thread = true;
if (_background_thread && _background_thread->joinable()) {
_background_thread->join();
}
_background_thread.reset();
}
Status PackedFileManager::mark_current_packed_file_for_upload_locked(
const std::string& resource_id) {
auto it = _current_packed_files.find(resource_id);
if (it == _current_packed_files.end() || !it->second || !it->second->writer) {
return Status::OK(); // Nothing to mark for upload
}
auto& current = it->second;
// Mark as ready for upload
current->state = PackedFileState::READY_TO_UPLOAD;
if (!current->ready_to_upload_timestamp.has_value()) {
auto now = std::chrono::steady_clock::now();
current->ready_to_upload_timestamp = now;
int64_t active_to_ready_ms = -1;
if (current->first_append_timestamp.has_value()) {
active_to_ready_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
now - *current->first_append_timestamp)
.count();
g_packed_file_active_to_ready_ms_recorder << active_to_ready_ms;
if (auto* sampler = g_packed_file_active_to_ready_ms_recorder.get_sampler()) {
sampler->take_sample();
}
}
VLOG_DEBUG << "Packed file " << current->packed_file_path
<< " transition ACTIVE->READY_TO_UPLOAD; active_to_ready_ms="
<< active_to_ready_ms;
}
// Move to uploading files list
{
std::shared_ptr<PackedFileContext> uploading_ptr =
std::shared_ptr<PackedFileContext>(std::move(current));
std::lock_guard<std::mutex> lock(_packed_files_mutex);
_uploading_packed_files[uploading_ptr->packed_file_path] = uploading_ptr;
}
// Create new packed file
return create_new_packed_file_context(resource_id, it->second);
}
Status PackedFileManager::mark_current_packed_file_for_upload(const std::string& resource_id) {
std::lock_guard<std::timed_mutex> lock(_current_packed_file_mutex);
return mark_current_packed_file_for_upload_locked(resource_id);
}
void PackedFileManager::record_packed_file_metrics(const PackedFileContext& packed_file) {
g_packed_file_total_count << 1;
g_packed_file_total_small_file_count
<< static_cast<int64_t>(packed_file.slice_locations.size());
g_packed_file_total_size_bytes << packed_file.total_size;
g_packed_file_small_file_num_recorder
<< static_cast<int64_t>(packed_file.slice_locations.size());
g_packed_file_file_size_recorder << packed_file.total_size;
// Flush samplers immediately so the window bvar reflects the latest packed file.
if (auto* sampler = g_packed_file_small_file_num_recorder.get_sampler()) {
sampler->take_sample();
}
if (auto* sampler = g_packed_file_file_size_recorder.get_sampler()) {
sampler->take_sample();
}
}
void PackedFileManager::background_manager() {
auto last_cleanup_time = std::chrono::steady_clock::now();
while (!_stop_background_thread.load()) {
int64_t check_interval_ms =
std::max<int64_t>(1, config::packed_file_time_threshold_ms / 10);
std::this_thread::sleep_for(std::chrono::milliseconds(check_interval_ms));
// Check if current packed file should be closed due to time threshold
std::vector<std::string> resources_to_mark;
{
std::unique_lock<std::timed_mutex> current_lock(_current_packed_file_mutex,
std::defer_lock);
int64_t lock_wait_ms = std::max<int64_t>(0, config::packed_file_try_lock_timeout_ms);
if (current_lock.try_lock_for(std::chrono::milliseconds(lock_wait_ms))) {
for (auto& [resource_id, state] : _current_packed_files) {
if (!state || state->state != PackedFileState::ACTIVE) {
continue;
}
if (!state->first_append_timestamp.has_value()) {
continue;
}
auto current_time = std::chrono::steady_clock::now();
auto elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
current_time - *(state->first_append_timestamp))
.count();
if (elapsed_ms >= config::packed_file_time_threshold_ms) {
resources_to_mark.push_back(resource_id);
}
}
}
}
for (const auto& resource_id : resources_to_mark) {
Status st = mark_current_packed_file_for_upload(resource_id);
if (!st.ok()) {
LOG(WARNING) << "Failed to close current packed file for resource " << resource_id
<< ": " << st.to_string();
}
}
// Process uploading files
process_uploading_packed_files();
auto now = std::chrono::steady_clock::now();
int64_t cleanup_interval_sec =
std::max<int64_t>(1, config::packed_file_cleanup_interval_seconds);
auto cleanup_interval = std::chrono::seconds(cleanup_interval_sec);
if (now - last_cleanup_time >= cleanup_interval) {
cleanup_expired_data();
last_cleanup_time = now;
}
}
}
void PackedFileManager::process_uploading_packed_files() {
std::vector<std::shared_ptr<PackedFileContext>> files_ready;
std::vector<std::shared_ptr<PackedFileContext>> files_uploading;
auto record_ready_to_upload = [&](const std::shared_ptr<PackedFileContext>& packed_file) {
if (!packed_file->uploading_timestamp.has_value()) {
packed_file->uploading_timestamp = std::chrono::steady_clock::now();
int64_t duration_ms = -1;
if (packed_file->ready_to_upload_timestamp.has_value()) {
duration_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
*packed_file->uploading_timestamp -
*packed_file->ready_to_upload_timestamp)
.count();
g_packed_file_ready_to_upload_ms_recorder << duration_ms;
if (auto* sampler = g_packed_file_ready_to_upload_ms_recorder.get_sampler()) {
sampler->take_sample();
}
}
VLOG_DEBUG << "Packed file " << packed_file->packed_file_path
<< " transition READY_TO_UPLOAD->UPLOADING; "
"ready_to_upload_ms="
<< duration_ms;
}
};
{
std::lock_guard<std::mutex> lock(_packed_files_mutex);
for (auto& [packed_file_path, packed_file] : _uploading_packed_files) {
auto state = packed_file->state.load(std::memory_order_acquire);
if (state != PackedFileState::READY_TO_UPLOAD && state != PackedFileState::UPLOADING) {
continue;
}
if (state == PackedFileState::READY_TO_UPLOAD) {
files_ready.emplace_back(packed_file);
} else {
files_uploading.emplace_back(packed_file);
}
}
}
auto handle_success = [&](const std::shared_ptr<PackedFileContext>& packed_file) {
auto now = std::chrono::steady_clock::now();
int64_t active_to_ready_ms = -1;
int64_t ready_to_upload_ms = -1;
int64_t uploading_to_uploaded_ms = -1;
if (packed_file->first_append_timestamp.has_value() &&
packed_file->ready_to_upload_timestamp.has_value()) {
active_to_ready_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
*packed_file->ready_to_upload_timestamp -
*packed_file->first_append_timestamp)
.count();
}
if (packed_file->ready_to_upload_timestamp.has_value() &&
packed_file->uploading_timestamp.has_value()) {
ready_to_upload_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
*packed_file->uploading_timestamp -
*packed_file->ready_to_upload_timestamp)
.count();
}
if (packed_file->uploading_timestamp.has_value()) {
uploading_to_uploaded_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
now - *packed_file->uploading_timestamp)
.count();
g_packed_file_uploading_to_uploaded_ms_recorder << uploading_to_uploaded_ms;
if (auto* sampler = g_packed_file_uploading_to_uploaded_ms_recorder.get_sampler()) {
sampler->take_sample();
}
}
int64_t total_ms = -1;
if (packed_file->first_append_timestamp.has_value()) {
total_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
now - *packed_file->first_append_timestamp)
.count();
}
std::ostringstream slices_stream;
bool first_slice = true;
for (const auto& [small_file_path, index] : packed_file->slice_locations) {
if (!first_slice) {
slices_stream << "; ";
}
first_slice = false;
slices_stream << small_file_path << "(txn=" << index.txn_id
<< ", offset=" << index.offset << ", size=" << index.size << ")";
}
LOG(INFO) << "Packed file " << packed_file->packed_file_path
<< " uploaded; slices=" << packed_file->slice_locations.size()
<< ", total_bytes=" << packed_file->total_size << ", slice_detail=["
<< slices_stream.str() << "]"
<< ", active_to_ready_ms=" << active_to_ready_ms
<< ", ready_to_upload_ms=" << ready_to_upload_ms
<< ", uploading_to_uploaded_ms=" << uploading_to_uploaded_ms
<< ", total_ms=" << total_ms;
{
std::lock_guard<std::mutex> upload_lock(packed_file->upload_mutex);
packed_file->state = PackedFileState::UPLOADED;
packed_file->upload_time = std::time(nullptr);
}
packed_file->upload_cv.notify_all();
{
std::lock_guard<std::mutex> lock(_packed_files_mutex);
_uploading_packed_files.erase(packed_file->packed_file_path);
_uploaded_packed_files[packed_file->packed_file_path] = packed_file;
}
};
auto handle_failure = [&](const std::shared_ptr<PackedFileContext>& packed_file,
const Status& status) {
LOG(WARNING) << "Failed to upload packed file: " << packed_file->packed_file_path
<< ", error: " << status.to_string();
{
std::lock_guard<std::mutex> upload_lock(packed_file->upload_mutex);
packed_file->state = PackedFileState::FAILED;
packed_file->last_error = status.to_string();
packed_file->upload_time = std::time(nullptr);
}
packed_file->upload_cv.notify_all();
{
std::lock_guard<std::mutex> lock(_packed_files_mutex);
_uploading_packed_files.erase(packed_file->packed_file_path);
_uploaded_packed_files[packed_file->packed_file_path] = packed_file;
}
};
for (auto& packed_file : files_ready) {
const std::string& packed_file_path = packed_file->packed_file_path;
cloud::PackedFileInfoPB packed_file_info;
packed_file_info.set_ref_cnt(packed_file->slice_locations.size());
packed_file_info.set_total_slice_num(packed_file->slice_locations.size());
packed_file_info.set_total_slice_bytes(packed_file->total_size);
packed_file_info.set_remaining_slice_bytes(packed_file->total_size);
packed_file_info.set_created_at_sec(packed_file->create_time);
packed_file_info.set_corrected(false);
packed_file_info.set_state(doris::cloud::PackedFileInfoPB::NORMAL);
packed_file_info.set_resource_id(packed_file->resource_id);
for (const auto& [small_file_path, index] : packed_file->slice_locations) {
auto* small_file = packed_file_info.add_slices();
small_file->set_path(small_file_path);
small_file->set_offset(index.offset);
small_file->set_size(index.size);
small_file->set_deleted(false);
if (index.tablet_id != 0) {
small_file->set_tablet_id(index.tablet_id);
}
if (!index.rowset_id.empty()) {
small_file->set_rowset_id(index.rowset_id);
}
if (index.txn_id != 0) {
small_file->set_txn_id(index.txn_id);
}
}
Status meta_status = update_meta_service(packed_file->packed_file_path, packed_file_info);
if (!meta_status.ok()) {
LOG(WARNING) << "Failed to update meta service for packed file: "
<< packed_file->packed_file_path << ", error: " << meta_status.to_string();
handle_failure(packed_file, meta_status);
continue;
}
// Record stats once the packed file metadata is persisted.
record_packed_file_metrics(*packed_file);
Status trailer_status = append_packed_info_trailer(
packed_file->writer.get(), packed_file->packed_file_path, packed_file_info);
if (!trailer_status.ok()) {
handle_failure(packed_file, trailer_status);
continue;
}
// Now upload the file
if (!packed_file->slice_locations.empty()) {
std::ostringstream oss;
oss << "Uploading packed file " << packed_file_path << " with "
<< packed_file->slice_locations.size() << " small files: ";
bool first = true;
for (const auto& [small_file_path, index] : packed_file->slice_locations) {
if (!first) {
oss << ", ";
}
first = false;
oss << "[" << small_file_path << ", offset=" << index.offset
<< ", size=" << index.size << "]";
}
VLOG_DEBUG << oss.str();
} else {
VLOG_DEBUG << "Uploading packed file " << packed_file_path << " with no small files";
}
Status upload_status = finalize_packed_file_upload(packed_file->packed_file_path,
packed_file->writer.get());
if (upload_status.is<ErrorCode::ALREADY_CLOSED>()) {
record_ready_to_upload(packed_file);
handle_success(packed_file);
continue;
}
if (!upload_status.ok()) {
handle_failure(packed_file, upload_status);
continue;
}
record_ready_to_upload(packed_file);
packed_file->state = PackedFileState::UPLOADING;
}
for (auto& packed_file : files_uploading) {
if (!packed_file->writer) {
handle_failure(packed_file,
Status::InternalError("File writer is null for packed file: {}",
packed_file->packed_file_path));
continue;
}
if (packed_file->writer->state() != FileWriter::State::CLOSED) {
continue;
}
Status status = packed_file->writer->close(true);
if (status.is<ErrorCode::ALREADY_CLOSED>()) {
handle_success(packed_file);
continue;
}
if (status.ok()) {
continue;
}
handle_failure(packed_file, status);
}
}
Status PackedFileManager::finalize_packed_file_upload(const std::string& packed_file_path,
FileWriter* writer) {
if (writer == nullptr) {
return Status::InternalError("File writer is null for packed file: " + packed_file_path);
}
return writer->close(true);
}
Status PackedFileManager::update_meta_service(const std::string& packed_file_path,
const cloud::PackedFileInfoPB& packed_file_info) {
#ifdef BE_TEST
TEST_SYNC_POINT_RETURN_WITH_VALUE("PackedFileManager::update_meta_service", Status::OK(),
packed_file_path, &packed_file_info);
#endif
VLOG_DEBUG << "Updating meta service for packed file: " << packed_file_path << " with "
<< packed_file_info.total_slice_num() << " small files"
<< ", total bytes: " << packed_file_info.total_slice_bytes();
// Get CloudMetaMgr through StorageEngine
if (!config::is_cloud_mode()) {
return Status::InternalError("Storage engine is not cloud mode");
}
auto& storage_engine = ExecEnv::GetInstance()->storage_engine();
auto& cloud_meta_mgr = storage_engine.to_cloud().meta_mgr();
return cloud_meta_mgr.update_packed_file_info(packed_file_path, packed_file_info);
}
void PackedFileManager::cleanup_expired_data() {
auto current_time = std::time(nullptr);
// Clean up expired uploaded files
{
std::lock_guard<std::mutex> uploaded_lock(_packed_files_mutex);
auto it = _uploaded_packed_files.begin();
while (it != _uploaded_packed_files.end()) {
if (it->second->state == PackedFileState::UPLOADED &&
current_time - it->second->upload_time > config::uploaded_file_retention_seconds) {
it = _uploaded_packed_files.erase(it);
} else if (it->second->state == PackedFileState::FAILED &&
current_time - it->second->upload_time >
config::uploaded_file_retention_seconds) {
it = _uploaded_packed_files.erase(it);
} else {
++it;
}
}
}
// Clean up expired global index entries
{
std::unordered_set<std::string> active_packed_files;
{
std::lock_guard<std::timed_mutex> current_lock(_current_packed_file_mutex);
for (const auto& [resource_id, state] : _current_packed_files) {
if (state) {
active_packed_files.insert(state->packed_file_path);
}
}
}
{
std::lock_guard<std::mutex> merge_lock(_packed_files_mutex);
for (const auto& [path, state] : _uploading_packed_files) {
active_packed_files.insert(path);
}
for (const auto& [path, state] : _uploaded_packed_files) {
active_packed_files.insert(path);
}
}
std::lock_guard<std::mutex> global_lock(_global_index_mutex);
auto it = _global_slice_locations.begin();
while (it != _global_slice_locations.end()) {
const auto& index = it->second;
if (active_packed_files.find(index.packed_file_path) == active_packed_files.end()) {
it = _global_slice_locations.erase(it);
} else {
++it;
}
}
}
}
#ifdef BE_TEST
namespace {
void reset_adder(bvar::Adder<int64_t>& adder) {
auto current = adder.get_value();
if (current != 0) {
adder << (-current);
}
}
} // namespace
void PackedFileManager::reset_packed_file_bvars_for_test() const {
reset_adder(g_packed_file_total_count);
reset_adder(g_packed_file_total_small_file_count);
reset_adder(g_packed_file_total_size_bytes);
g_packed_file_small_file_num_recorder.reset();
g_packed_file_file_size_recorder.reset();
g_packed_file_active_to_ready_ms_recorder.reset();
g_packed_file_ready_to_upload_ms_recorder.reset();
g_packed_file_uploading_to_uploaded_ms_recorder.reset();
if (auto* sampler = g_packed_file_small_file_num_recorder.get_sampler()) {
sampler->take_sample();
}
if (auto* sampler = g_packed_file_file_size_recorder.get_sampler()) {
sampler->take_sample();
}
if (auto* sampler = g_packed_file_active_to_ready_ms_recorder.get_sampler()) {
sampler->take_sample();
}
if (auto* sampler = g_packed_file_ready_to_upload_ms_recorder.get_sampler()) {
sampler->take_sample();
}
if (auto* sampler = g_packed_file_uploading_to_uploaded_ms_recorder.get_sampler()) {
sampler->take_sample();
}
}
int64_t PackedFileManager::packed_file_total_count_for_test() const {
return g_packed_file_total_count.get_value();
}
int64_t PackedFileManager::packed_file_total_small_file_num_for_test() const {
return g_packed_file_total_small_file_count.get_value();
}
int64_t PackedFileManager::packed_file_total_size_bytes_for_test() const {
return g_packed_file_total_size_bytes.get_value();
}
double PackedFileManager::packed_file_avg_small_file_num_for_test() const {
return g_packed_file_avg_small_file_num.get_value().get_average_double();
}
double PackedFileManager::packed_file_avg_file_size_for_test() const {
return g_packed_file_avg_file_size_bytes.get_value().get_average_double();
}
void PackedFileManager::record_packed_file_metrics_for_test(
const PackedFileManager::PackedFileContext* packed_file) {
DCHECK(packed_file != nullptr);
record_packed_file_metrics(*packed_file);
}
void PackedFileManager::clear_state_for_test() {
std::lock_guard<std::timed_mutex> cur_lock(_current_packed_file_mutex);
_current_packed_files.clear();
{
std::lock_guard<std::mutex> lock(_packed_files_mutex);
_uploading_packed_files.clear();
_uploaded_packed_files.clear();
}
{
std::lock_guard<std::mutex> lock(_global_index_mutex);
_global_slice_locations.clear();
}
}
#endif
} // namespace doris::io