blob: 462729623d1090b0d7323d621b0e6c8735fe3763 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/rowset/vertical_beta_rowset_writer.h"
#include <fmt/format.h>
#include <gen_cpp/olap_file.pb.h>
#include <algorithm>
#include <atomic>
#include <memory>
#include <mutex>
#include <ostream>
#include <string>
#include <utility>
#include "cloud/cloud_rowset_writer.h"
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/logging.h"
#include "io/fs/file_system.h"
#include "io/fs/file_writer.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/rowset/rowset_writer_context.h"
#include "util/slice.h"
#include "vec/core/block.h"
namespace doris {
using namespace ErrorCode;
template class VerticalBetaRowsetWriter<BetaRowsetWriter>;
template class VerticalBetaRowsetWriter<CloudRowsetWriter>;
template <class T>
requires std::is_base_of_v<BaseBetaRowsetWriter, T>
Status VerticalBetaRowsetWriter<T>::add_columns(const vectorized::Block* block,
const std::vector<uint32_t>& col_ids, bool is_key,
uint32_t max_rows_per_segment,
bool has_cluster_key) {
auto& context = this->_context;
VLOG_NOTICE << "VerticalBetaRowsetWriter::add_columns, columns: " << block->columns();
size_t num_rows = block->rows();
if (num_rows == 0) {
return Status::OK();
}
if (UNLIKELY(max_rows_per_segment > context.max_rows_per_segment)) {
max_rows_per_segment = context.max_rows_per_segment;
}
if (_segment_writers.empty()) {
// it must be key columns
DCHECK(is_key);
std::unique_ptr<segment_v2::SegmentWriter> writer;
RETURN_IF_ERROR(_create_segment_writer(col_ids, is_key, &writer));
_segment_writers.emplace_back(std::move(writer));
_cur_writer_idx = 0;
RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->append_block(block, 0, num_rows));
} else if (is_key) {
if (_segment_writers[_cur_writer_idx]->num_rows_written() > max_rows_per_segment ||
(has_cluster_key && _segment_writers[_cur_writer_idx]->primary_keys_size() >
config::mow_primary_key_index_max_size_in_memory)) {
// segment is full, need flush columns and create new segment writer
RETURN_IF_ERROR(_flush_columns(_segment_writers[_cur_writer_idx].get(), true));
std::unique_ptr<segment_v2::SegmentWriter> writer;
RETURN_IF_ERROR(_create_segment_writer(col_ids, is_key, &writer));
_segment_writers.emplace_back(std::move(writer));
++_cur_writer_idx;
}
RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->append_block(block, 0, num_rows));
} else {
// value columns
int64_t left = num_rows;
while (left > 0) {
uint32_t num_rows_written = _segment_writers[_cur_writer_idx]->num_rows_written();
VLOG_NOTICE << "num_rows_written: " << num_rows_written
<< ", _cur_writer_idx: " << _cur_writer_idx;
uint32_t num_rows_key_group = _segment_writers[_cur_writer_idx]->row_count();
CHECK_LT(num_rows_written, num_rows_key_group);
// init if it's first value column write in current segment
if (num_rows_written == 0) {
VLOG_NOTICE << "init first value column segment writer";
RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->init(col_ids, is_key));
}
int64_t to_write = num_rows_written + left >= num_rows_key_group
? num_rows_key_group - num_rows_written
: left;
RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->append_block(block, num_rows - left,
to_write));
left -= to_write;
CHECK_GE(left, 0);
if (num_rows_key_group == num_rows_written + to_write &&
_cur_writer_idx < _segment_writers.size() - 1) {
RETURN_IF_ERROR(_flush_columns(_segment_writers[_cur_writer_idx].get()));
++_cur_writer_idx;
}
}
}
if (is_key) {
this->_num_rows_written += num_rows;
}
return Status::OK();
}
template <class T>
requires std::is_base_of_v<BaseBetaRowsetWriter, T>
Status VerticalBetaRowsetWriter<T>::_flush_columns(segment_v2::SegmentWriter* segment_writer,
bool is_key) {
uint64_t index_size = 0;
VLOG_NOTICE << "flush columns index: " << _cur_writer_idx;
RETURN_IF_ERROR(segment_writer->finalize_columns_data());
RETURN_IF_ERROR(segment_writer->finalize_columns_index(&index_size));
if (is_key) {
_total_key_group_rows += segment_writer->row_count();
// record segment key bound
KeyBoundsPB key_bounds;
Slice min_key = segment_writer->min_encoded_key();
Slice max_key = segment_writer->max_encoded_key();
DCHECK_LE(min_key.compare(max_key), 0);
key_bounds.set_min_key(min_key.to_string());
key_bounds.set_max_key(max_key.to_string());
this->_segments_encoded_key_bounds.emplace_back(std::move(key_bounds));
this->_segment_num_rows.resize(_cur_writer_idx + 1);
this->_segment_num_rows[_cur_writer_idx] = _segment_writers[_cur_writer_idx]->row_count();
}
return Status::OK();
}
template <class T>
requires std::is_base_of_v<BaseBetaRowsetWriter, T>
Status VerticalBetaRowsetWriter<T>::flush_columns(bool is_key) {
if (_segment_writers.empty()) {
return Status::OK();
}
DCHECK(_cur_writer_idx < _segment_writers.size() && _segment_writers[_cur_writer_idx]);
RETURN_IF_ERROR(_flush_columns(_segment_writers[_cur_writer_idx].get(), is_key));
_cur_writer_idx = 0;
return Status::OK();
}
template <class T>
requires std::is_base_of_v<BaseBetaRowsetWriter, T>
Status VerticalBetaRowsetWriter<T>::_create_segment_writer(
const std::vector<uint32_t>& column_ids, bool is_key,
std::unique_ptr<segment_v2::SegmentWriter>* writer) {
auto& context = this->_context;
int seg_id = this->_num_segment.fetch_add(1, std::memory_order_relaxed);
io::FileWriterPtr segment_file_writer;
RETURN_IF_ERROR(BaseBetaRowsetWriter::create_file_writer(seg_id, segment_file_writer));
DCHECK(segment_file_writer != nullptr);
IndexFileWriterPtr index_file_writer;
if (context.tablet_schema->has_inverted_index() || context.tablet_schema->has_ann_index()) {
RETURN_IF_ERROR(RowsetWriter::create_index_file_writer(seg_id, &index_file_writer));
}
segment_v2::SegmentWriterOptions writer_options;
writer_options.enable_unique_key_merge_on_write = context.enable_unique_key_merge_on_write;
writer_options.rowset_ctx = &context;
writer_options.max_rows_per_segment = context.max_rows_per_segment;
writer_options.write_type = DataWriteType::TYPE_COMPACTION;
// TODO if support VerticalSegmentWriter, also need to handle cluster key primary key index
*writer = std::make_unique<segment_v2::SegmentWriter>(
segment_file_writer.get(), seg_id, context.tablet_schema, context.tablet,
context.data_dir, writer_options, index_file_writer.get());
RETURN_IF_ERROR(this->_seg_files.add(seg_id, std::move(segment_file_writer)));
if (context.tablet_schema->has_inverted_index() || context.tablet_schema->has_ann_index()) {
RETURN_IF_ERROR(this->_idx_files.add(seg_id, std::move(index_file_writer)));
}
auto s = (*writer)->init(column_ids, is_key);
if (!s.ok()) {
LOG(WARNING) << "failed to init segment writer: " << s.to_string();
writer->reset(nullptr);
return s;
}
return Status::OK();
}
template <class T>
requires std::is_base_of_v<BaseBetaRowsetWriter, T>
Status VerticalBetaRowsetWriter<T>::final_flush() {
for (auto& segment_writer : _segment_writers) {
uint64_t segment_size = 0;
//uint64_t footer_position = 0;
auto st = segment_writer->finalize_footer(&segment_size);
if (!st.ok()) {
LOG(WARNING) << "Fail to finalize segment footer, " << st;
return st;
}
this->_total_data_size += segment_size;
segment_writer.reset();
}
return Status::OK();
}
template <class T>
requires std::is_base_of_v<BaseBetaRowsetWriter, T>
Status VerticalBetaRowsetWriter<T>::_close_file_writers() {
RETURN_IF_ERROR(BaseBetaRowsetWriter::_close_inverted_index_file_writers());
return this->_seg_files.close();
}
} // namespace doris