blob: 1bfb0fd642237defb82ceea335e08a2815bb3410 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <stdio.h>
#include <sys/stat.h>
#include <memory>
#include <string>
#include <vector>
#include "io/fs/file_reader.h"
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
#include "olap/lru_cache.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/utils.h"
#include "util/debug_util.h"
namespace doris {
using FixedFileHeader = struct _FixedFileHeader {
// the length of the entire file
uint32_t file_length;
// Checksum of the file's contents except the FileHeader
uint32_t checksum;
// Protobuf length of section
uint32_t protobuf_length;
// Checksum of Protobuf part
uint32_t protobuf_checksum;
} __attribute__((packed));
using FixedFileHeaderV2 = struct _FixedFileHeaderV2 {
uint64_t magic_number;
uint32_t version;
// the length of the entire file
uint64_t file_length;
// Checksum of the file's contents except the FileHeader
uint32_t checksum;
// Protobuf length of section
uint64_t protobuf_length;
// Checksum of Protobuf part
uint32_t protobuf_checksum;
} __attribute__((packed));
template <typename MessageType, typename ExtraType = uint32_t>
class FileHeader {
public:
FileHeader(const std::string& file_path) : _file_path(file_path) {
memset(&_fixed_file_header, 0, sizeof(_fixed_file_header));
memset(&_extra_fixed_header, 0, sizeof(_extra_fixed_header));
_fixed_file_header_size = sizeof(_fixed_file_header);
}
~FileHeader() = default;
// To calculate the length of the proto part, it needs to be called after the proto is operated,
// and prepare must be called before calling serialize
Status prepare();
// call prepare() first, serialize() will write fixed header and protobuffer.
// Write the header to the starting position of the incoming file handle
Status serialize();
// read from file, validate file length, signature and alder32 of protobuffer.
// Read the header from the beginning of the incoming file handle
Status deserialize();
// Check the validity of Header
// it is actually call deserialize().
Status validate();
uint64_t file_length() const { return _fixed_file_header.file_length; }
uint32_t checksum() const { return _fixed_file_header.checksum; }
const ExtraType& extra() const { return _extra_fixed_header; }
ExtraType* mutable_extra() { return &_extra_fixed_header; }
const MessageType& message() const { return _proto; }
MessageType* mutable_message() { return &_proto; }
uint64_t size() const {
return _fixed_file_header_size + sizeof(_extra_fixed_header) +
_fixed_file_header.protobuf_length;
}
void set_file_length(uint64_t file_length) { _fixed_file_header.file_length = file_length; }
void set_checksum(uint32_t checksum) { _fixed_file_header.checksum = checksum; }
private:
std::string _file_path;
FixedFileHeaderV2 _fixed_file_header;
uint32_t _fixed_file_header_size;
std::string _proto_string;
ExtraType _extra_fixed_header;
MessageType _proto;
};
// FileHeader implementation
template <typename MessageType, typename ExtraType>
Status FileHeader<MessageType, ExtraType>::prepare() {
try {
if (!_proto.SerializeToString(&_proto_string)) {
return Status::Error<ErrorCode::SERIALIZE_PROTOBUF_ERROR>(
"serialize file header to string error. [path={}]", _file_path);
}
} catch (...) {
return Status::Error<ErrorCode::SERIALIZE_PROTOBUF_ERROR>(
"serialize file header to string error. [path={}]", _file_path);
}
_fixed_file_header.protobuf_checksum =
olap_adler32(olap_adler32_init(), _proto_string.c_str(), _proto_string.size());
_fixed_file_header.checksum = 0;
_fixed_file_header.protobuf_length = _proto_string.size();
_fixed_file_header.file_length = size();
_fixed_file_header.version = OLAP_DATA_VERSION_APPLIED;
_fixed_file_header.magic_number = OLAP_FIX_HEADER_MAGIC_NUMBER;
return Status::OK();
}
template <typename MessageType, typename ExtraType>
Status FileHeader<MessageType, ExtraType>::serialize() {
// write to file
io::FileWriterPtr file_writer;
RETURN_IF_ERROR(io::global_local_filesystem()->create_file(_file_path, &file_writer));
RETURN_IF_ERROR(
file_writer->append({(const uint8_t*)&_fixed_file_header, _fixed_file_header_size}));
RETURN_IF_ERROR(file_writer->append(
{(const uint8_t*)&_extra_fixed_header, sizeof(_extra_fixed_header)}));
RETURN_IF_ERROR(file_writer->append({_proto_string}));
return file_writer->close();
}
template <typename MessageType, typename ExtraType>
Status FileHeader<MessageType, ExtraType>::deserialize() {
io::FileReaderSPtr file_reader;
RETURN_IF_ERROR(io::global_local_filesystem()->open_file(_file_path, &file_reader));
off_t real_file_length = 0;
uint32_t real_protobuf_checksum = 0;
size_t bytes_read = 0;
RETURN_IF_ERROR(file_reader->read_at(
0, {(const uint8_t*)&_fixed_file_header, _fixed_file_header_size}, &bytes_read));
DCHECK(_fixed_file_header_size == bytes_read)
<< " deserialize read bytes dismatch, request bytes " << _fixed_file_header_size
<< " actual read " << bytes_read;
//Status read_at(size_t offset, Slice result, size_t* bytes_read,
// const IOContext* io_ctx = nullptr);
if (_fixed_file_header.magic_number != OLAP_FIX_HEADER_MAGIC_NUMBER) {
VLOG_TRACE << "old fix header found, magic num=" << _fixed_file_header.magic_number;
FixedFileHeader tmp_header;
RETURN_IF_ERROR(file_reader->read_at(0, {(const uint8_t*)&tmp_header, sizeof(tmp_header)},
&bytes_read));
DCHECK(sizeof(tmp_header) == bytes_read)
<< " deserialize read bytes dismatch, request bytes " << sizeof(tmp_header)
<< " actual read " << bytes_read;
_fixed_file_header.file_length = tmp_header.file_length;
_fixed_file_header.checksum = tmp_header.checksum;
_fixed_file_header.protobuf_length = tmp_header.protobuf_length;
_fixed_file_header.protobuf_checksum = tmp_header.protobuf_checksum;
_fixed_file_header.magic_number = OLAP_FIX_HEADER_MAGIC_NUMBER;
_fixed_file_header.version = OLAP_DATA_VERSION_APPLIED;
_fixed_file_header_size = sizeof(tmp_header);
}
VLOG_NOTICE << "fix head loaded. file_length=" << _fixed_file_header.file_length
<< ", checksum=" << _fixed_file_header.checksum
<< ", protobuf_length=" << _fixed_file_header.protobuf_length
<< ", magic_number=" << _fixed_file_header.magic_number
<< ", version=" << _fixed_file_header.version;
RETURN_IF_ERROR(file_reader->read_at(
_fixed_file_header_size,
{(const uint8_t*)&_extra_fixed_header, sizeof(_extra_fixed_header)}, &bytes_read));
std::unique_ptr<char[]> buf(new (std::nothrow) char[_fixed_file_header.protobuf_length]);
if (nullptr == buf) {
char errmsg[64];
return Status::Error<ErrorCode::MEM_ALLOC_FAILED>(
"malloc protobuf buf error. file={}, error={}", file_reader->path().native(),
strerror_r(errno, errmsg, 64));
}
RETURN_IF_ERROR(file_reader->read_at(_fixed_file_header_size + sizeof(_extra_fixed_header),
{buf.get(), _fixed_file_header.protobuf_length},
&bytes_read));
real_file_length = file_reader->size();
if (file_length() != static_cast<uint64_t>(real_file_length)) {
return Status::InternalError(
"file length is not match. file={}, file_length={}, real_file_length={}",
file_reader->path().native(), file_length(), real_file_length);
}
// check proto checksum
real_protobuf_checksum =
olap_adler32(olap_adler32_init(), buf.get(), _fixed_file_header.protobuf_length);
if (real_protobuf_checksum != _fixed_file_header.protobuf_checksum) {
// When compiling using gcc there woule be error like:
// Cannot bind packed field '_FixedFileHeaderV2::protobuf_checksum' to 'unsigned int&'
// so we need to using unary operator+ to evaluate one value to pass
// to status to successfully compile.
return Status::InternalError("checksum is not match. file={}, expect={}, actual={}",
file_reader->path().native(),
+_fixed_file_header.protobuf_checksum, real_protobuf_checksum);
}
try {
std::string protobuf_str(buf.get(), _fixed_file_header.protobuf_length);
if (!_proto.ParseFromString(protobuf_str)) {
return Status::Error<ErrorCode::PARSE_PROTOBUF_ERROR>(
"fail to parse file content to protobuf object. file={}",
file_reader->path().native());
}
} catch (...) {
LOG(WARNING) << "fail to load protobuf. file='" << file_reader->path().native();
return Status::Error<ErrorCode::PARSE_PROTOBUF_ERROR>("fail to load protobuf. file={}",
file_reader->path().native());
}
return Status::OK();
}
template <typename MessageType, typename ExtraType>
Status FileHeader<MessageType, ExtraType>::validate() {
return deserialize();
}
} // namespace doris