blob: 6a345e0d8a89ff62b7d10544241f58eca9357e24 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "io/fs/packed_file_writer.h"
#include <bvar/recorder.h>
#include <bvar/window.h>
#include <glog/logging.h>
#include "cloud/config.h"
#include "common/config.h"
#include "common/status.h"
#include "runtime/exec_env.h"
#include "util/slice.h"
namespace doris::io {
bvar::IntRecorder packed_file_writer_first_append_to_close_ms_recorder;
bvar::Window<bvar::IntRecorder> packed_file_writer_first_append_to_close_ms_window(
"packed_file_writer_first_append_to_close_ms",
&packed_file_writer_first_append_to_close_ms_recorder, /*window_size=*/10);
PackedFileWriter::PackedFileWriter(FileWriterPtr inner_writer, Path path,
PackedAppendContext append_info)
: _inner_writer(std::move(inner_writer)),
_file_path(path.native()),
_packed_file_manager(PackedFileManager::instance()),
_append_info(std::move(append_info)) {
DCHECK(_inner_writer != nullptr);
DCHECK(!_file_path.empty());
}
PackedFileWriter::~PackedFileWriter() {
if (_state == State::OPENED) {
LOG(WARNING) << "PackedFileWriter destroyed without being closed, file: " << _file_path;
}
}
Status PackedFileWriter::appendv(const Slice* data, size_t data_cnt) {
if (_state != State::OPENED) {
return Status::InternalError("Cannot append to closed or closing writer for file: " +
_file_path);
}
if (!_first_append_timestamp.has_value()) {
_first_append_timestamp = std::chrono::steady_clock::now();
}
// Calculate total size to append
size_t total_size = 0;
for (size_t i = 0; i < data_cnt; ++i) {
total_size += data[i].size;
}
if (total_size == 0) {
return Status::OK();
}
// Check if we should switch to direct write mode
if (!_is_direct_write && _bytes_appended + total_size > config::small_file_threshold_bytes) {
RETURN_IF_ERROR(_switch_to_direct_write());
_is_direct_write = true;
}
// Write data based on current mode
if (_is_direct_write) {
RETURN_IF_ERROR(_inner_writer->appendv(data, data_cnt));
} else {
// Buffer small file data
for (size_t i = 0; i < data_cnt; ++i) {
_buffer.append(data[i].data, data[i].size);
}
}
_bytes_appended += total_size;
return Status::OK();
}
Status PackedFileWriter::close(bool non_block) {
if (_state == State::CLOSED) {
return Status::OK();
}
auto record_close_latency = [this]() {
if (_close_latency_recorded || !_first_append_timestamp.has_value()) {
return;
}
auto now = std::chrono::steady_clock::now();
auto latency_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
now - *_first_append_timestamp)
.count();
packed_file_writer_first_append_to_close_ms_recorder << latency_ms;
if (auto* sampler = packed_file_writer_first_append_to_close_ms_recorder.get_sampler()) {
sampler->take_sample();
}
_close_latency_recorded = true;
};
if (_state == State::ASYNC_CLOSING) {
if (non_block) {
return Status::InternalError("Don't submit async close multi times");
}
if (!_is_direct_write) {
RETURN_IF_ERROR(_wait_packed_upload());
} else {
RETURN_IF_ERROR(_inner_writer->close(false));
}
_state = State::CLOSED;
if (!non_block) {
record_close_latency();
}
return Status::OK();
}
if (non_block) {
return _close_async();
} else {
return _close_sync();
}
}
Status PackedFileWriter::_close_async() {
if (!_is_direct_write) {
// Send small file data to packed manager
RETURN_IF_ERROR(_send_to_packed_manager());
} else {
// For large files, just close the inner writer asynchronously
RETURN_IF_ERROR(_inner_writer->close(true));
}
_state = State::ASYNC_CLOSING;
return Status::OK();
}
Status PackedFileWriter::_close_sync() {
if (!_is_direct_write) {
// Send small file data to pack manager and wait for upload
RETURN_IF_ERROR(_send_to_packed_manager());
RETURN_IF_ERROR(_wait_packed_upload());
} else {
// For large files, close the inner writer synchronously
RETURN_IF_ERROR(_inner_writer->close(false));
}
_state = State::CLOSED;
if (!_close_latency_recorded) {
auto now = std::chrono::steady_clock::now();
if (_first_append_timestamp.has_value()) {
auto latency_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
now - *_first_append_timestamp)
.count();
packed_file_writer_first_append_to_close_ms_recorder << latency_ms;
if (auto* sampler =
packed_file_writer_first_append_to_close_ms_recorder.get_sampler()) {
sampler->take_sample();
}
_close_latency_recorded = true;
}
}
return Status::OK();
}
Status PackedFileWriter::_wait_packed_upload() {
DCHECK(!_is_direct_write);
// Only wait if we have data that was sent to packed manager
if (_bytes_appended > 0 && _packed_file_manager != nullptr) {
return _packed_file_manager->wait_upload_done(_file_path);
}
return Status::OK();
}
Status PackedFileWriter::_switch_to_direct_write() {
DCHECK(!_is_direct_write);
// If we have buffered data, write it to inner writer first
if (_buffer.size() > 0) {
Slice buffer_slice(_buffer.data(), _buffer.size());
RETURN_IF_ERROR(_inner_writer->appendv(&buffer_slice, 1));
_buffer.clear();
}
return Status::OK();
}
Status PackedFileWriter::_send_to_packed_manager() {
DCHECK(!_is_direct_write);
if (_packed_file_manager == nullptr) {
return Status::InternalError("PackedFileManager is not available");
}
LOG(INFO) << "send_to_packed_manager: " << _file_path << " buffer size: " << _buffer.size();
if (_append_info.resource_id.empty()) {
return Status::InternalError("Missing resource id for packed file append");
}
if (_append_info.txn_id <= 0) {
return Status::InvalidArgument("Missing valid txn id for packed file append: " +
_file_path);
}
Slice data_slice(_buffer.data(), _buffer.size());
RETURN_IF_ERROR(_packed_file_manager->append_small_file(_file_path, data_slice, _append_info));
_buffer.clear();
return Status::OK();
}
Status PackedFileWriter::get_packed_slice_location(PackedSliceLocation* location) const {
DCHECK(_state == State::CLOSED)
<< " file_path: " << _file_path << " bytes_appended: " << _bytes_appended;
if (_is_direct_write) {
*location = PackedSliceLocation {};
return Status::OK();
}
RETURN_IF_ERROR(_packed_file_manager->get_packed_slice_location(_file_path, location));
LOG(INFO) << "get_packed_slice_location: " << _file_path
<< " packed_path: " << location->packed_file_path << " " << location->offset << " "
<< location->size;
return Status::OK();
}
} // namespace doris::io