blob: 5a77ee283c3327a21a50ca47187632a93c7f14f2 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/load_stream_writer.h"
#include <brpc/controller.h>
#include <butil/errno.h>
#include <fmt/format.h>
#include <gen_cpp/internal_service.pb.h>
#include <gen_cpp/olap_file.pb.h>
#include <filesystem>
#include <ostream>
#include <string>
#include <utility>
#include "bvar/bvar.h"
#include "cloud/config.h"
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "exec/tablet_info.h"
#include "io/fs/file_writer.h" // IWYU pragma: keep
#include "olap/data_dir.h"
#include "olap/memtable.h"
#include "olap/memtable_flush_executor.h"
#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/beta_rowset_writer.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/rowset/rowset_writer.h"
#include "olap/rowset/rowset_writer_context.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
#include "olap/rowset/segment_v2/segment.h"
#include "olap/rowset_builder.h"
#include "olap/schema.h"
#include "olap/schema_change.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "olap/txn_manager.h"
#include "runtime/exec_env.h"
#include "runtime/load_channel_mgr.h"
#include "runtime/memory/mem_tracker.h"
#include "service/backend_options.h"
#include "util/brpc_client_cache.h"
#include "util/brpc_closure.h"
#include "util/debug_points.h"
#include "util/mem_info.h"
#include "util/stopwatch.hpp"
#include "util/time.h"
#include "vec/core/block.h"
namespace doris {
#include "common/compile_check_begin.h"
using namespace ErrorCode;
bvar::Adder<int64_t> g_load_stream_writer_cnt("load_stream_writer_count");
bvar::Adder<int64_t> g_load_stream_file_writer_cnt("load_stream_file_writer_count");
LoadStreamWriter::LoadStreamWriter(WriteRequest* context, RuntimeProfile* profile)
: _req(*context), _rowset_writer(nullptr) {
g_load_stream_writer_cnt << 1;
// TODO(plat1ko): CloudStorageEngine
_rowset_builder = std::make_unique<RowsetBuilder>(
ExecEnv::GetInstance()->storage_engine().to_local(), *context, profile);
_resource_ctx = thread_context()->resource_ctx(); // from load stream
}
LoadStreamWriter::~LoadStreamWriter() {
g_load_stream_file_writer_cnt << -_segment_file_writers.size();
g_load_stream_file_writer_cnt << -_inverted_file_writers.size();
g_load_stream_writer_cnt << -1;
}
Status LoadStreamWriter::init() {
DBUG_EXECUTE_IF("LoadStreamWriter.init.failure",
{ return Status::InternalError("fault injection"); });
RETURN_IF_ERROR(_rowset_builder->init());
_rowset_writer = _rowset_builder->rowset_writer();
_is_init = true;
return Status::OK();
}
Status LoadStreamWriter::append_data(uint32_t segid, uint64_t offset, butil::IOBuf buf,
FileType file_type) {
SCOPED_ATTACH_TASK(_resource_ctx);
io::FileWriter* file_writer = nullptr;
auto& file_writers =
file_type == FileType::SEGMENT_FILE ? _segment_file_writers : _inverted_file_writers;
{
std::lock_guard lock_guard(_lock);
DCHECK(_is_init);
if (segid >= file_writers.size()) {
for (size_t i = file_writers.size(); i <= segid; i++) {
Status st;
io::FileWriterPtr seg_file_writer;
st = _rowset_writer->create_file_writer(static_cast<uint32_t>(i), seg_file_writer,
file_type);
DBUG_EXECUTE_IF("LoadStreamWriter.append_data.create_file_writer_failed",
{ st = Status::InternalError("fault injection"); });
if (!st.ok()) {
_is_canceled = true;
return st;
}
file_writers.push_back(std::move(seg_file_writer));
g_load_stream_file_writer_cnt << 1;
}
}
// TODO: IOBuf to Slice
file_writer = file_writers[segid].get();
}
DBUG_EXECUTE_IF("LoadStreamWriter.append_data.null_file_writer", { file_writer = nullptr; });
VLOG_DEBUG << " file_writer " << file_writer << "seg id " << segid;
if (file_writer == nullptr) {
return Status::Corruption("append_data failed, file writer {} is destoryed", segid);
}
DBUG_EXECUTE_IF("LoadStreamWriter.append_data.wrong_offset", { offset++; });
if (file_writer->bytes_appended() != offset) {
return Status::Corruption(
"append_data out-of-order in segment={}, expected offset={}, actual={}",
file_writer->path().native(), offset, file_writer->bytes_appended());
}
return file_writer->append(buf.to_string());
}
Status LoadStreamWriter::close_writer(uint32_t segid, FileType file_type) {
SCOPED_ATTACH_TASK(_resource_ctx);
io::FileWriter* file_writer = nullptr;
auto& file_writers =
file_type == FileType::SEGMENT_FILE ? _segment_file_writers : _inverted_file_writers;
{
std::lock_guard lock_guard(_lock);
if (!_is_init) {
return Status::Corruption("close_writer failed, LoadStreamWriter is not inited");
}
DBUG_EXECUTE_IF("LoadStreamWriter.close_writer.bad_segid",
{ segid = static_cast<uint32_t>(file_writers.size()); });
if (segid >= file_writers.size()) {
return Status::Corruption(
"close_writer failed, file {} is never opened, file type is {}", segid,
file_type);
}
file_writer = file_writers[segid].get();
}
DBUG_EXECUTE_IF("LoadStreamWriter.close_writer.null_file_writer", { file_writer = nullptr; });
if (file_writer == nullptr) {
return Status::Corruption(
"close_writer failed, file writer {} is destoryed, fiel type is {}", segid,
file_type);
}
auto st = file_writer->close();
if (!st.ok()) {
_is_canceled = true;
return st;
}
LOG(INFO) << "file " << segid << " path " << file_writer->path().native() << " closed, written "
<< file_writer->bytes_appended() << " bytes"
<< ", file type is " << file_type;
// ‌Allow the index file to be empty when creating an index on a variant-type column.
if (file_writer->bytes_appended() == 0 && file_type != FileType::INVERTED_INDEX_FILE) {
return Status::Corruption("file {} closed with 0 bytes, file type is {}",
file_writer->path().native(), file_type);
}
return Status::OK();
}
Status LoadStreamWriter::add_segment(uint32_t segid, const SegmentStatistics& stat) {
SCOPED_ATTACH_TASK(_resource_ctx);
size_t segment_file_size = 0;
size_t inverted_file_size = 0;
{
std::lock_guard lock_guard(_lock);
if (!_is_init) {
return Status::Corruption("add_segment failed, LoadStreamWriter is not inited");
}
DBUG_EXECUTE_IF("LoadStreamWriter.add_segment.bad_segid",
{ segid = static_cast<uint32_t>(_segment_file_writers.size()); });
RETURN_IF_ERROR(_calc_file_size(segid, FileType::SEGMENT_FILE, &segment_file_size));
if (_inverted_file_writers.size() > 0) {
RETURN_IF_ERROR(
_calc_file_size(segid, FileType::INVERTED_INDEX_FILE, &inverted_file_size));
}
}
DBUG_EXECUTE_IF("LoadStreamWriter.add_segment.size_not_match", { segment_file_size++; });
if (segment_file_size != stat.data_size) {
return Status::Corruption(
"add_segment failed, segment stat {} does not match, file size={}, inverted file "
"size={}, stat.data_size={}, tablet id={}",
segid, segment_file_size, inverted_file_size, stat.data_size, _req.tablet_id);
}
return _rowset_writer->add_segment(segid, stat);
}
Status LoadStreamWriter::_calc_file_size(uint32_t segid, FileType file_type, size_t* file_size) {
io::FileWriter* file_writer = nullptr;
auto& file_writers =
(file_type == FileType::SEGMENT_FILE) ? _segment_file_writers : _inverted_file_writers;
DBUG_EXECUTE_IF("LoadStreamWriter._calc_file_size.unknown_segment",
{ segid = static_cast<uint32_t>(file_writers.size()); });
if (segid >= file_writers.size()) {
return Status::Corruption("calc file size failed, file {} is never opened, file type is {}",
segid, file_type);
}
file_writer = file_writers[segid].get();
DBUG_EXECUTE_IF("LoadStreamWriter._calc_file_size.null_file_writer",
{ file_writer = nullptr; });
if (file_writer == nullptr) {
return Status::Corruption(
"calc file size failed, file writer {} is destoryed, file type is {}", segid,
file_type);
}
DBUG_EXECUTE_IF("LoadStreamWriter._calc_file_size.file_not_closed", {
io::FileWriterPtr fwriter;
static_cast<void>(_rowset_writer->create_file_writer(
static_cast<uint32_t>(file_writers.size()), fwriter, FileType::SEGMENT_FILE));
file_writers.push_back(std::move(fwriter));
file_writer = file_writers.back().get();
});
if (file_writer->state() != io::FileWriter::State::CLOSED) {
return Status::Corruption("calc file size failed, file {} is not closed",
file_writer->path().native());
}
*file_size = file_writer->bytes_appended();
return Status::OK();
}
Status LoadStreamWriter::_pre_close() {
SCOPED_ATTACH_TASK(_resource_ctx);
if (!_is_init) {
// if this delta writer is not initialized, but close() is called.
// which means this tablet has no data loaded, but at least one tablet
// in same partition has data loaded.
// so we have to also init this LoadStreamWriter, so that it can create an empty rowset
// for this tablet when being closed.
RETURN_IF_ERROR(init());
}
DCHECK(_is_init)
<< "rowset builder is supposed be to initialized before close_wait() being called";
DBUG_EXECUTE_IF("LoadStreamWriter.close.cancelled", { _is_canceled = true; });
if (_is_canceled) {
return Status::InternalError("flush segment failed");
}
DBUG_EXECUTE_IF("LoadStreamWriter.close.inverted_writers_size_not_match", {
io::FileWriterPtr file_writer;
static_cast<void>(_rowset_writer->create_file_writer(
static_cast<uint32_t>(_inverted_file_writers.size()), file_writer,
FileType::INVERTED_INDEX_FILE));
_inverted_file_writers.push_back(std::move(file_writer));
});
if (_inverted_file_writers.size() > 0 &&
_inverted_file_writers.size() != _segment_file_writers.size()) {
return Status::Corruption(
"LoadStreamWriter close failed, inverted file writer size is {},"
"segment file writer size is {}",
_inverted_file_writers.size(), _segment_file_writers.size());
}
DBUG_EXECUTE_IF("LoadStreamWriter.close.file_not_closed", {
io::FileWriterPtr file_writer;
static_cast<void>(_rowset_writer->create_file_writer(
static_cast<uint32_t>(_segment_file_writers.size()), file_writer,
FileType::SEGMENT_FILE));
_segment_file_writers.push_back(std::move(file_writer));
});
for (const auto& writer : _segment_file_writers) {
if (writer->state() != io::FileWriter::State::CLOSED) {
return Status::Corruption("LoadStreamWriter close failed, segment {} is not closed",
writer->path().native());
}
}
DBUG_EXECUTE_IF("LoadStreamWriter.close.inverted_file_not_closed", {
io::FileWriterPtr file_writer;
static_cast<void>(_rowset_writer->create_file_writer(
static_cast<uint32_t>(_inverted_file_writers.size()), file_writer,
FileType::INVERTED_INDEX_FILE));
_inverted_file_writers.push_back(std::move(file_writer));
});
for (const auto& writer : _inverted_file_writers) {
if (writer->state() != io::FileWriter::State::CLOSED) {
return Status::Corruption(
"LoadStreamWriter close failed, inverted file {} is not closed",
writer->path().native());
}
}
RETURN_IF_ERROR(_rowset_builder->build_rowset());
RETURN_IF_ERROR(_rowset_builder->submit_calc_delete_bitmap_task());
_pre_closed = true;
return Status::OK();
}
Status LoadStreamWriter::close() {
std::lock_guard<std::mutex> l(_lock);
if (!_pre_closed) {
RETURN_IF_ERROR(_pre_close());
}
RETURN_IF_ERROR(_rowset_builder->wait_calc_delete_bitmap());
// FIXME(plat1ko): No `commit_txn` operation in cloud mode, need better abstractions
RETURN_IF_ERROR(static_cast<RowsetBuilder*>(_rowset_builder.get())->commit_txn());
return Status::OK();
}
#include "common/compile_check_end.h"
} // namespace doris