blob: 92092110beaab96f144d185f822d16daba914a0e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/rowset/beta_rowset_writer_v2.h"
#include <assert.h>
// IWYU pragma: no_include <bthread/errno.h>
#include <errno.h> // IWYU pragma: keep
#include <stdio.h>
#include <ctime> // time
#include <filesystem>
#include <memory>
#include <sstream>
#include <utility>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/logging.h"
#include "io/fs/file_system.h"
#include "io/fs/file_writer.h"
#include "io/fs/stream_sink_file_writer.h"
#include "olap/data_dir.h"
#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/rowset_writer.h"
#include "olap/rowset/segment_v2/inverted_index_cache.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
#include "olap/rowset/segment_v2/segment.h"
#include "olap/rowset/segment_v2/segment_writer.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"
#include "olap/tablet_schema.h"
#include "util/slice.h"
#include "util/time.h"
#include "vec/common/schema_util.h" // LocalSchemaChangeRecorder
#include "vec/core/block.h"
#include "vec/sink/load_stream_stub.h"
namespace doris {
using namespace ErrorCode;
BetaRowsetWriterV2::BetaRowsetWriterV2(const std::vector<std::shared_ptr<LoadStreamStub>>& streams)
: _segment_creator(_context, _seg_files, _idx_files), _streams(streams) {}
BetaRowsetWriterV2::~BetaRowsetWriterV2() = default;
Status BetaRowsetWriterV2::init(const RowsetWriterContext& rowset_writer_context) {
_context = rowset_writer_context;
_context.segment_collector = std::make_shared<SegmentCollectorT<BetaRowsetWriterV2>>(this);
_context.file_writer_creator = std::make_shared<FileWriterCreatorT<BetaRowsetWriterV2>>(this);
return Status::OK();
}
Status BetaRowsetWriterV2::create_file_writer(uint32_t segment_id, io::FileWriterPtr& file_writer,
FileType file_type) {
auto partition_id = _context.partition_id;
auto index_id = _context.index_id;
auto tablet_id = _context.tablet_id;
auto load_id = _context.load_id;
auto stream_writer = std::make_unique<io::StreamSinkFileWriter>(_streams);
stream_writer->init(load_id, partition_id, index_id, tablet_id, segment_id, file_type);
file_writer = std::move(stream_writer);
return Status::OK();
}
Status BetaRowsetWriterV2::add_segment(uint32_t segment_id, const SegmentStatistics& segstat) {
bool ok = false;
for (const auto& stream : _streams) {
auto st = stream->add_segment(_context.partition_id, _context.index_id, _context.tablet_id,
segment_id, segstat);
if (!st.ok()) {
LOG(WARNING) << "failed to add segment " << segment_id << " to stream "
<< stream->stream_id();
}
ok = ok || st.ok();
}
if (!ok) {
return Status::InternalError("failed to add segment {} of tablet {} to any replicas",
segment_id, _context.tablet_id);
}
return Status::OK();
}
Status BetaRowsetWriterV2::flush_memtable(vectorized::Block* block, int32_t segment_id,
int64_t* flush_size) {
if (block->rows() == 0) {
return Status::OK();
}
{
SCOPED_RAW_TIMER(&_segment_writer_ns);
RETURN_IF_ERROR(_segment_creator.flush_single_block(block, segment_id, flush_size));
}
// delete bitmap and seg compaction are done on the destination BE.
return Status::OK();
}
Status BetaRowsetWriterV2::flush_single_block(const vectorized::Block* block) {
return _segment_creator.flush_single_block(block);
}
} // namespace doris