blob: 35436da82f1113fed22b4df0ff4f2a831979e033 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vwal_writer.h"
#include <gen_cpp/data.pb.h>
#include <sstream>
#include "gen_cpp/FrontendService.h"
#include "io/fs/encrypted_fs_factory.h"
#include "util/debug_points.h"
namespace doris {
namespace vectorized {
VWalWriter::VWalWriter(int64_t db_id, int64_t tb_id, int64_t wal_id,
const std::string& import_label, WalManager* wal_manager,
std::vector<TSlotDescriptor>& slot_desc, int be_exe_version)
: _db_id(db_id),
_tb_id(tb_id),
_wal_id(wal_id),
_label(import_label),
_wal_manager(wal_manager),
_slot_descs(slot_desc),
_be_exe_version(be_exe_version) {}
VWalWriter::~VWalWriter() {}
Status VWalWriter::init() {
io::FileSystemSPtr wal_fs = io::global_local_filesystem();
#ifndef BE_TEST
if (config::group_commit_wait_replay_wal_finish) {
std::shared_ptr<std::mutex> lock = std::make_shared<std::mutex>();
std::shared_ptr<std::condition_variable> cv = std::make_shared<std::condition_variable>();
auto add_st = _wal_manager->add_wal_cv_map(_wal_id, lock, cv);
if (!add_st.ok()) {
LOG(WARNING) << "fail to add wal_id " << _wal_id << " to wal_cv_map";
}
}
RETURN_IF_ERROR(determine_wal_fs(_db_id, _tb_id, wal_fs));
#endif
RETURN_IF_ERROR(_create_wal_writer(_wal_id, wal_fs, _wal_writer));
_wal_manager->add_wal_queue(_tb_id, _wal_id);
std::stringstream ss;
for (auto slot_desc : _slot_descs) {
if (slot_desc.col_unique_id < 0) {
continue;
}
ss << std::to_string(slot_desc.col_unique_id) << ",";
}
std::string col_ids = ss.str().substr(0, ss.str().size() - 1);
RETURN_IF_ERROR(_wal_writer->append_header(col_ids));
return Status::OK();
}
Status VWalWriter::write_wal(vectorized::Block* block) {
DBUG_EXECUTE_IF("VWalWriter.write_wal.fail",
{ return Status::InternalError("Failed to write wal!"); });
PBlock pblock;
size_t uncompressed_bytes = 0, compressed_bytes = 0;
int64_t compressed_time = 0;
RETURN_IF_ERROR(block->serialize(_be_exe_version, &pblock, &uncompressed_bytes,
&compressed_bytes, &compressed_time,
segment_v2::CompressionTypePB::NO_COMPRESSION));
RETURN_IF_ERROR(_wal_writer->append_blocks(std::vector<PBlock*> {&pblock}));
return Status::OK();
}
Status VWalWriter::close() {
if (config::group_commit_wait_replay_wal_finish) {
std::string wal_path;
RETURN_IF_ERROR(_wal_manager->get_wal_path(_wal_id, wal_path));
LOG(INFO) << "close file " << wal_path;
RETURN_IF_ERROR(_wal_manager->add_recover_wal(_db_id, _tb_id, _wal_id, wal_path));
RETURN_IF_ERROR(_wal_manager->wait_replay_wal_finish(_wal_id));
}
if (_wal_writer != nullptr) {
RETURN_IF_ERROR(_wal_writer->finalize());
}
return Status::OK();
}
Status VWalWriter::_create_wal_writer(int64_t wal_id, const io::FileSystemSPtr& fs,
std::shared_ptr<WalWriter>& wal_writer) {
std::string wal_path;
RETURN_IF_ERROR(_wal_manager->get_wal_path(wal_id, wal_path));
wal_writer = std::make_shared<WalWriter>(wal_path);
RETURN_IF_ERROR(wal_writer->init(fs));
return Status::OK();
}
} // namespace vectorized
} // namespace doris