blob: 5a69e636d16db4ba825edec589fef6c42455f915 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/wal/wal_writer.h"
#include <crc32c/crc32c.h>
#include <gen_cpp/AgentService_types.h>
#include <gen_cpp/FrontendService_types.h>
#include "common/config.h"
#include "common/status.h"
#include "io/fs/encrypted_fs_factory.h"
#include "io/fs/file_system.h"
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
#include "io/fs/path.h"
#include "olap/storage_engine.h"
#include "olap/wal/wal_manager.h"
#include "util/thrift_rpc_helper.h"
namespace doris {
const char* k_wal_magic = "WAL1";
const uint32_t k_wal_magic_length = 4;
WalWriter::WalWriter(const std::string& file_name) : _file_name(file_name) {}
WalWriter::~WalWriter() {}
Status determine_wal_fs(int64_t db_id, int64_t tb_id, io::FileSystemSPtr& fs) {
if (!config::enable_wal_tde) {
fs = io::global_local_filesystem();
return Status::OK();
}
#ifndef BE_TEST
TNetworkAddress master_addr = ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
TGetTableTDEInfoRequest req;
req.__set_db_id(db_id);
req.__set_table_id(tb_id);
TGetTableTDEInfoResult ret;
RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&req, &ret](FrontendServiceConnection& client) {
client->getTableTDEInfo(ret, req);
}));
if (auto st = Status::create(ret.status); !st) {
return st;
}
auto encrypt_algorithm = [&ret]() -> EncryptionAlgorithmPB {
switch (ret.algorithm) {
case doris::TEncryptionAlgorithm::AES256:
return EncryptionAlgorithmPB::AES_256_CTR;
case doris::TEncryptionAlgorithm::SM4:
return EncryptionAlgorithmPB::SM4_128_CTR;
default:
return EncryptionAlgorithmPB::PLAINTEXT;
}
}();
auto local_fs = io::global_local_filesystem();
fs = io::make_file_system(local_fs, encrypt_algorithm);
#else
fs = io::global_local_filesystem();
#endif
return Status::OK();
}
Status WalWriter::init(const io::FileSystemSPtr& fs) {
io::Path wal_path = _file_name;
auto parent_path = wal_path.parent_path();
bool exists = false;
RETURN_IF_ERROR(fs->exists(parent_path, &exists));
if (!exists) {
RETURN_IF_ERROR(fs->create_directory(parent_path));
}
RETURN_IF_ERROR(fs->create_file(_file_name, &_file_writer));
LOG(INFO) << "create wal " << _file_name;
return Status::OK();
}
Status WalWriter::finalize() {
if (!_file_writer) {
return Status::InternalError("wal writer is null,fail to close file={}", _file_name);
}
auto st = _file_writer->close();
if (!st.ok()) {
LOG(WARNING) << "fail to close wal " << _file_name;
}
return Status::OK();
}
Status WalWriter::append_blocks(const PBlockArray& blocks) {
if (!_file_writer) {
return Status::InternalError("wal writer is null,fail to write file={}", _file_name);
}
size_t total_size = 0;
size_t offset = 0;
for (const auto& block : blocks) {
uint8_t len_buf[sizeof(uint64_t)];
uint64_t block_length = block->ByteSizeLong();
total_size += LENGTH_SIZE + block_length + CHECKSUM_SIZE;
encode_fixed64_le(len_buf, block_length);
RETURN_IF_ERROR(_file_writer->append({len_buf, sizeof(uint64_t)}));
offset += LENGTH_SIZE;
std::string content = block->SerializeAsString();
RETURN_IF_ERROR(_file_writer->append(content));
offset += block_length;
uint8_t checksum_buf[sizeof(uint32_t)];
uint32_t checksum = crc32c::Crc32c(content.data(), block_length);
encode_fixed32_le(checksum_buf, checksum);
RETURN_IF_ERROR(_file_writer->append({checksum_buf, sizeof(uint32_t)}));
offset += CHECKSUM_SIZE;
}
if (offset != total_size) {
return Status::InternalError(
"failed to write block to wal expected= " + std::to_string(total_size) +
",actually=" + std::to_string(offset));
}
return Status::OK();
}
Status WalWriter::append_header(std::string col_ids) {
if (!_file_writer) {
return Status::InternalError("wal writer is null,fail to write file={}", _file_name);
}
size_t total_size = 0;
uint64_t length = col_ids.size();
total_size += k_wal_magic_length;
total_size += VERSION_SIZE;
total_size += LENGTH_SIZE;
total_size += length;
size_t offset = 0;
RETURN_IF_ERROR(_file_writer->append({k_wal_magic, k_wal_magic_length}));
offset += k_wal_magic_length;
uint8_t version_buf[sizeof(uint32_t)];
encode_fixed32_le(version_buf, WAL_VERSION);
RETURN_IF_ERROR(_file_writer->append({version_buf, sizeof(uint32_t)}));
offset += VERSION_SIZE;
uint8_t len_buf[sizeof(uint64_t)];
encode_fixed64_le(len_buf, length);
RETURN_IF_ERROR(_file_writer->append({len_buf, sizeof(uint64_t)}));
offset += LENGTH_SIZE;
RETURN_IF_ERROR(_file_writer->append(col_ids));
offset += length;
if (offset != total_size) {
return Status::InternalError(
"failed to write header to wal expected= " + std::to_string(total_size) +
",actually=" + std::to_string(offset));
}
return Status::OK();
}
} // namespace doris