blob: 6237c80683079b6bf5b9c7b6a63342e19ff19e0c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// Some portions copyright (C) 2008, Google, inc.
// Utilities for working with protobufs.
// Some of this code is cribbed from the protobuf source,
// but modified to work with kudu's 'faststring' instead of STL strings.
#include "kudu/util/pb_util.h"
#include <algorithm>
#include <cstddef>
#include <deque>
#include <initializer_list>
#include <memory>
#include <mutex>
#include <ostream>
#include <string>
#include <unordered_set>
#include <vector>
#include <boost/optional/optional.hpp>
#include <glog/logging.h>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/descriptor.pb.h>
#include <google/protobuf/descriptor_database.h>
#include <google/protobuf/dynamic_message.h>
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
#include <google/protobuf/message.h>
#include <google/protobuf/message_lite.h>
#include <google/protobuf/stubs/status.h>
#include <google/protobuf/text_format.h>
#include <google/protobuf/util/json_util.h>
#include "kudu/gutil/integral_types.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/escaping.h"
#include "kudu/gutil/strings/fastmem.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/coding.h"
#include "kudu/util/coding-inl.h"
#include "kudu/util/crc.h"
#include "kudu/util/debug/sanitizer_scopes.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/env.h"
#include "kudu/util/env_util.h"
#include "kudu/util/faststring.h"
#include "kudu/util/jsonwriter.h"
#include "kudu/util/logging.h"
#include "kudu/util/path_util.h"
#include "kudu/util/pb_util-internal.h"
#include "kudu/util/pb_util.pb.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
using google::protobuf::Descriptor;
using google::protobuf::DescriptorPool;
using google::protobuf::DynamicMessageFactory;
using google::protobuf::FieldDescriptor;
using google::protobuf::FileDescriptor;
using google::protobuf::FileDescriptorProto;
using google::protobuf::FileDescriptorSet;
using google::protobuf::io::ArrayInputStream;
using google::protobuf::io::CodedInputStream;
using google::protobuf::Message;
using google::protobuf::MessageLite;
using google::protobuf::Reflection;
using google::protobuf::SimpleDescriptorDatabase;
using google::protobuf::TextFormat;
using kudu::crc::Crc;
using kudu::pb_util::internal::SequentialFileFileInputStream;
using kudu::pb_util::internal::WritableFileOutputStream;
using std::deque;
using std::endl;
using std::initializer_list;
using std::ostream;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::unordered_set;
using std::vector;
using strings::Substitute;
using strings::Utf8SafeCEscape;
namespace std {
// Allow the use of FileState with DCHECK_EQ.
std::ostream& operator<< (std::ostream& os, const kudu::pb_util::FileState& state) {
os << static_cast<int>(state);
return os;
} // namespace std
namespace kudu {
namespace pb_util {
static const char* const kTmpTemplateSuffix = ".XXXXXX";
// Protobuf container constants.
static const uint32_t kPBContainerInvalidVersion = 0;
static const uint32_t kPBContainerDefaultVersion = 2;
static const int kPBContainerChecksumLen = sizeof(uint32_t);
static const char kPBContainerMagic[] = "kuducntr";
static const int kPBContainerMagicLen = 8;
static const int kPBContainerV1HeaderLen =
kPBContainerMagicLen + sizeof(uint32_t); // Magic number + version.
static const int kPBContainerV2HeaderLen =
kPBContainerV1HeaderLen + kPBContainerChecksumLen; // Same as V1 plus a checksum.
const int kPBContainerMinimumValidLength = kPBContainerV1HeaderLen;
static_assert(arraysize(kPBContainerMagic) - 1 == kPBContainerMagicLen,
"kPBContainerMagic does not match expected length");
namespace {
// When serializing, we first compute the byte size, then serialize the message.
// If serialization produces a different number of bytes than expected, we
// call this function, which crashes. The problem could be due to a bug in the
// protobuf implementation but is more likely caused by concurrent modification
// of the message. This function attempts to distinguish between the two and
// provide a useful error message.
void ByteSizeConsistencyError(int byte_size_before_serialization,
int byte_size_after_serialization,
int bytes_produced_by_serialization) {
CHECK_EQ(byte_size_before_serialization, byte_size_after_serialization)
<< "Protocol message was modified concurrently during serialization.";
CHECK_EQ(bytes_produced_by_serialization, byte_size_before_serialization)
<< "Byte size calculation and serialization were inconsistent. This "
"may indicate a bug in protocol buffers or it may be caused by "
"concurrent modification of the message.";
LOG(FATAL) << "This shouldn't be called if all the sizes are equal.";
string InitializationErrorMessage(const char* action,
const MessageLite& message) {
// Note: We want to avoid depending on strutil in the lite library, otherwise
// we'd use:
// return strings::Substitute(
// "Can't $0 message of type \"$1\" because it is missing required "
// "fields: $2",
// action, message.GetTypeName(),
// message.InitializationErrorString());
string result;
result += "Can't ";
result += action;
result += " message of type \"";
result += message.GetTypeName();
result += "\" because it is missing required fields: ";
result += message.InitializationErrorString();
return result;
// Returns true iff the specified protobuf container file version is supported
// by this implementation.
bool IsSupportedContainerVersion(uint32_t version) {
if (version == 1 || version == 2) {
return true;
return false;
// Reads exactly 'length' bytes from the container file into 'result',
// validating that there is sufficient data in the file to read this length
// before attempting to do so, and validating that it has read that length
// after performing the read.
// If the file size is less than the requested size of the read, returns
// Status::Incomplete.
// If there is an unexpected short read, returns Status::Corruption.
// NOTE: the data in 'result' may be modified even in the case of a failed read.
template<typename ReadableFileType>
Status ValidateAndReadData(ReadableFileType* reader, uint64_t file_size,
uint64_t* offset, uint64_t length,
faststring* result) {
// Validate the read length using the file size.
if (*offset + length > file_size) {
return Status::Incomplete("File size not large enough to be valid",
Substitute("Proto container file $0: "
"Tried to read $1 bytes at offset "
"$2 but file size is only $3 bytes",
reader->filename(), length,
*offset, file_size));
// Perform the read.
RETURN_NOT_OK(reader->Read(*offset, Slice(*result)));
*offset += length;
return Status::OK();
// Helper macro for use with ParseAndCompareChecksum(). Example usage:
// RETURN_NOT_OK_PREPEND(ParseAndCompareChecksum(, { data }),
// CHECKSUM_ERR_MSG("Data checksum does not match", filename, offset));
#define CHECKSUM_ERR_MSG(prefix, filename, cksum_offset) \
Substitute("$0: Incorrect checksum in file $1 at offset $2", prefix, filename, cksum_offset)
// Parses a checksum from the specified buffer and compares it to the bytes
// given in 'slices' by calculating a rolling CRC32 checksum of the bytes in
// the 'slices'.
// If they match, returns OK. Otherwise, returns Status::Corruption.
Status ParseAndCompareChecksum(const uint8_t* checksum_buf,
const initializer_list<Slice>& slices) {
uint32_t written_checksum = DecodeFixed32(checksum_buf);
uint64_t actual_checksum = 0;
Crc* crc32c = crc::GetCrc32cInstance();
for (Slice s : slices) {
crc32c->Compute(, s.size(), &actual_checksum);
if (PREDICT_FALSE(actual_checksum != written_checksum)) {
return Status::Corruption(Substitute("Checksum does not match. Expected: $0. Actual: $1",
written_checksum, actual_checksum));
return Status::OK();
// If necessary, get the size of the file opened by 'reader' in 'cached_file_size'.
// If 'cached_file_size' already has a value, this is a no-op.
template<typename ReadableFileType>
Status CacheFileSize(ReadableFileType* reader,
boost::optional<uint64_t>* cached_file_size) {
if (*cached_file_size) {
return Status::OK();
uint64_t file_size;
*cached_file_size = file_size;
return Status::OK();
template<typename ReadableFileType>
Status RestOfFileIsAllZeros(ReadableFileType* reader,
uint64_t filesize,
uint64_t offset,
bool* all_zeros) {
DCHECK_GE(filesize, offset);
constexpr uint64_t max_to_read = 4 * 1024 * 1024; // 4 MiB.
faststring buf;
while (true) {
uint64_t to_read = std::min(max_to_read, filesize - offset);
if (to_read == 0) {
RETURN_NOT_OK(reader->Read(offset, Slice(buf)));
offset += to_read;
if (!IsAllZeros(buf)) {
*all_zeros = false;
return Status::OK();
*all_zeros = true;
return Status::OK();
// Read and parse a message of the specified format at the given offset in the
// format documented in pb_util.h. 'offset' is an in-out parameter and will be
// updated with the new offset on success. On failure, 'offset' is not modified.
template<typename ReadableFileType>
Status ReadPBStartingAt(ReadableFileType* reader, int version,
boost::optional<uint64_t>* cached_file_size,
uint64_t* offset, Message* msg) {
uint64_t tmp_offset = *offset;
VLOG(1) << "Reading PB with version " << version << " starting at offset " << *offset;
RETURN_NOT_OK(CacheFileSize(reader, cached_file_size));
uint64_t file_size = cached_file_size->get();
if (tmp_offset == *cached_file_size) {
return Status::EndOfFile("Reached end of file");
// Read the data length from the file.
// Version 2+ includes a checksum for the length field.
uint64_t length_buflen = (version == 1) ? sizeof(uint32_t)
: sizeof(uint32_t) + kPBContainerChecksumLen;
faststring length_and_cksum_buf;
RETURN_NOT_OK_PREPEND(ValidateAndReadData(reader, file_size, &tmp_offset, length_buflen,
Substitute("Could not read data length from proto container file $0 "
"at offset $1", reader->filename(), *offset));
Slice length(, sizeof(uint32_t));
// Versions >= 2 have an individual checksum for the data length.
if (version >= 2) {
// KUDU-2260: If the length and checksum data are all 0's, and the rest of
// the file is all 0's, then it's an incomplete record, not corruption.
// This can happen e.g. on ext4 in the default data=ordered mode, when the
// filesize metadata is updated but the new data is not persisted.
// See
if (IsAllZeros(length_and_cksum_buf)) {
bool all_zeros = false;
RETURN_NOT_OK(RestOfFileIsAllZeros(reader, file_size, tmp_offset, &all_zeros));
if (all_zeros) {
return Status::Incomplete("incomplete write of PB: rest of file is NULL bytes");
Slice length_checksum( + sizeof(uint32_t), kPBContainerChecksumLen);
RETURN_NOT_OK_PREPEND(ParseAndCompareChecksum(, { length }),
CHECKSUM_ERR_MSG("Data length checksum does not match",
reader->filename(), tmp_offset - kPBContainerChecksumLen));
uint32_t data_length = DecodeFixed32(;
// Read body and checksum into buffer for checksum & parsing.
uint64_t data_and_cksum_buflen = data_length + kPBContainerChecksumLen;
faststring body_and_cksum_buf;
RETURN_NOT_OK_PREPEND(ValidateAndReadData(reader, file_size, &tmp_offset, data_and_cksum_buflen,
Substitute("Could not read PB message data from proto container file $0 "
"at offset $1",
reader->filename(), tmp_offset));
Slice body(, data_length);
Slice record_checksum( + data_length, kPBContainerChecksumLen);
// Version 1 has a single checksum for length, body.
// Version 2+ has individual checksums for length and body, respectively.
if (version == 1) {
RETURN_NOT_OK_PREPEND(ParseAndCompareChecksum(, { length, body }),
CHECKSUM_ERR_MSG("Length and data checksum does not match",
reader->filename(), tmp_offset - kPBContainerChecksumLen));
} else {
RETURN_NOT_OK_PREPEND(ParseAndCompareChecksum(, { body }),
CHECKSUM_ERR_MSG("Data checksum does not match",
reader->filename(), tmp_offset - kPBContainerChecksumLen));
// The checksum is correct. Time to decode the body.
// We could compare pb_type_ against msg.GetTypeName(), but:
// 1. pb_type_ is not available when reading the supplemental header,
// 2. ParseFromArray() should fail if the data cannot be parsed into the
// provided message type.
// To permit parsing of very large PB messages, we must use parse through a
// CodedInputStream and bump the byte limit. The SetTotalBytesLimit() docs
// say that 512MB is the shortest theoretical message length that may produce
// integer overflow warnings, so that's what we'll use.
ArrayInputStream ais(, body.size());
CodedInputStream cis(&ais);
cis.SetTotalBytesLimit(512 * 1024 * 1024, -1);
if (PREDICT_FALSE(!msg->ParseFromCodedStream(&cis))) {
return Status::IOError("Unable to parse PB from path", reader->filename());
*offset = tmp_offset;
return Status::OK();
// Wrapper around ReadPBStartingAt() to enforce that we don't return
// Status::Incomplete() for V1 format files.
template<typename ReadableFileType>
Status ReadFullPB(ReadableFileType* reader, int version,
boost::optional<uint64_t>* cached_file_size,
uint64_t* offset, Message* msg) {
bool had_cached_size = *cached_file_size != boost::none;
Status s = ReadPBStartingAt(reader, version, cached_file_size, offset, msg);
if (PREDICT_FALSE(s.IsIncomplete() && version == 1)) {
return Status::Corruption("Unrecoverable incomplete record", s.ToString());
// If we hit EOF, but we were using a cached view of the file size, then it might be
// that the file has been extended. Invalidate the cache and try again.
if (had_cached_size && (s.IsIncomplete() || s.IsEndOfFile())) {
*cached_file_size = boost::none;
return ReadFullPB(reader, version, cached_file_size, offset, msg);
return s;
// Read and parse the protobuf container file-level header documented in pb_util.h.
template<typename ReadableFileType>
Status ParsePBFileHeader(ReadableFileType* reader, boost::optional<uint64_t>* cached_file_size,
uint64_t* offset, int* version) {
RETURN_NOT_OK(CacheFileSize(reader, cached_file_size));
uint64_t file_size = cached_file_size->get();
// We initially read enough data for a V2+ file header. This optimizes for
// V2+ and is valid on a V1 file because we don't consider these files valid
// unless they contain a record in addition to the file header. The
// additional 4 bytes required by a V2+ header (vs V1) is still less than the
// minimum number of bytes required for a V1 format data record.
uint64_t tmp_offset = *offset;
faststring header;
RETURN_NOT_OK_PREPEND(ValidateAndReadData(reader, file_size, &tmp_offset, kPBContainerV2HeaderLen,
Substitute("Could not read header for proto container file $0",
Slice magic_and_version(, kPBContainerMagicLen + sizeof(uint32_t));
Slice checksum( + kPBContainerMagicLen + sizeof(uint32_t), kPBContainerChecksumLen);
// Validate magic number.
if (PREDICT_FALSE(!strings::memeq(kPBContainerMagic,, kPBContainerMagicLen))) {
string file_magic(reinterpret_cast<const char*>(, kPBContainerMagicLen);
return Status::Corruption("Invalid magic number",
Substitute("Expected: $0, found: $1",
// Validate container file version.
uint32_t tmp_version = DecodeFixed32( + kPBContainerMagicLen);
if (PREDICT_FALSE(!IsSupportedContainerVersion(tmp_version))) {
return Status::NotSupported(
Substitute("Protobuf container has unsupported version: $0. Default version: $1",
tmp_version, kPBContainerDefaultVersion));
// Versions >= 2 have a checksum after the magic number and encoded version
// to ensure the integrity of these fields.
if (tmp_version >= 2) {
RETURN_NOT_OK_PREPEND(ParseAndCompareChecksum(, { magic_and_version }),
CHECKSUM_ERR_MSG("File header checksum does not match",
reader->filename(), tmp_offset - kPBContainerChecksumLen));
} else {
// Version 1 doesn't have a header checksum. Rewind our read offset so this
// data will be read again when we next attempt to read a data record.
tmp_offset -= kPBContainerChecksumLen;
*offset = tmp_offset;
*version = tmp_version;
return Status::OK();
// Read and parse the supplemental header from the container file.
template<typename ReadableFileType>
Status ReadSupplementalHeader(ReadableFileType* reader, int version,
boost::optional<uint64_t>* cached_file_size,
uint64_t* offset,
ContainerSupHeaderPB* sup_header) {
RETURN_NOT_OK_PREPEND(ReadFullPB(reader, version, cached_file_size, offset, sup_header),
Substitute("Could not read supplemental header from proto container file $0 "
"with version $1 at offset $2",
reader->filename(), version, *offset));
return Status::OK();
} // anonymous namespace
void AppendToString(const MessageLite &msg, faststring *output) {
DCHECK(msg.IsInitialized()) << InitializationErrorMessage("serialize", msg);
AppendPartialToString(msg, output);
void AppendPartialToString(const MessageLite &msg, faststring* output) {
size_t old_size = output->size();
int byte_size = msg.ByteSize();
// Messages >2G cannot be serialized due to overflow computing ByteSize.
DCHECK_GE(byte_size, 0) << "Error computing ByteSize";
output->resize(old_size + static_cast<size_t>(byte_size));
uint8* start = &((*output)[old_size]);
uint8* end = msg.SerializeWithCachedSizesToArray(start);
if (end - start != byte_size) {
ByteSizeConsistencyError(byte_size, msg.ByteSize(), end - start);
void SerializeToString(const MessageLite &msg, faststring *output) {
AppendToString(msg, output);
Status ParseFromSequentialFile(MessageLite *msg, SequentialFile *rfile) {
SequentialFileFileInputStream input(rfile);
if (!msg->ParseFromZeroCopyStream(&input)) {
// If it's not a file IO error then it's a parsing error.
// Probably, we read wrong or damaged data here.
return Status::Corruption("Error parsing msg", InitializationErrorMessage("parse", *msg));
return Status::OK();
Status ParseFromArray(MessageLite* msg, const uint8_t* data, uint32_t length) {
if (!msg->ParseFromArray(data, length)) {
return Status::Corruption("Error parsing msg", InitializationErrorMessage("parse", *msg));
return Status::OK();
Status WritePBToPath(Env* env, const std::string& path,
const MessageLite& msg,
SyncMode sync) {
const string tmp_template = path + kTmpInfix + kTmpTemplateSuffix;
string tmp_path;
unique_ptr<WritableFile> file;
RETURN_NOT_OK(env->NewTempWritableFile(WritableFileOptions(), tmp_template, &tmp_path, &file));
auto tmp_deleter = MakeScopedCleanup([&]() {
WARN_NOT_OK(env->DeleteFile(tmp_path), "Could not delete file " + tmp_path);
WritableFileOutputStream output(file.get());
bool res = msg.SerializeToZeroCopyStream(&output);
if (!res || !output.Flush()) {
return Status::IOError("Unable to serialize PB to file");
if (sync == pb_util::SYNC) {
RETURN_NOT_OK_PREPEND(file->Sync(), "Failed to Sync() " + tmp_path);
RETURN_NOT_OK_PREPEND(file->Close(), "Failed to Close() " + tmp_path);
RETURN_NOT_OK_PREPEND(env->RenameFile(tmp_path, path), "Failed to rename tmp file to " + path);
if (sync == pb_util::SYNC) {
RETURN_NOT_OK_PREPEND(env->SyncDir(DirName(path)), "Failed to SyncDir() parent of " + path);
return Status::OK();
Status ReadPBFromPath(Env* env, const std::string& path, MessageLite* msg) {
shared_ptr<SequentialFile> rfile;
RETURN_NOT_OK(env_util::OpenFileForSequential(env, path, &rfile));
RETURN_NOT_OK(ParseFromSequentialFile(msg, rfile.get()));
return Status::OK();
static void TruncateString(string* s, int max_len) {
if (s->size() > max_len) {
void TruncateFields(Message* message, int max_len) {
const Reflection* reflection = message->GetReflection();
vector<const FieldDescriptor*> fields;
reflection->ListFields(*message, &fields);
for (const FieldDescriptor* field : fields) {
if (field->is_repeated()) {
for (int i = 0; i < reflection->FieldSize(*message, field); i++) {
switch (field->cpp_type()) {
case FieldDescriptor::CPPTYPE_STRING: {
const string& s_const = reflection->GetRepeatedStringReference(*message, field, i,
TruncateString(const_cast<string*>(&s_const), max_len);
case FieldDescriptor::CPPTYPE_MESSAGE: {
TruncateFields(reflection->MutableRepeatedMessage(message, field, i), max_len);
} else {
switch (field->cpp_type()) {
case FieldDescriptor::CPPTYPE_STRING: {
const string& s_const = reflection->GetStringReference(*message, field, nullptr);
TruncateString(const_cast<string*>(&s_const), max_len);
case FieldDescriptor::CPPTYPE_MESSAGE: {
TruncateFields(reflection->MutableMessage(message, field), max_len);
namespace {
class SecureFieldPrinter : public TextFormat::FieldValuePrinter {
using super = TextFormat::FieldValuePrinter;
string PrintFieldName(const Message& message,
const Reflection* reflection,
const FieldDescriptor* field) const override {
hide_next_string_ = field->cpp_type() == FieldDescriptor::CPPTYPE_STRING &&
return super::PrintFieldName(message, reflection, field);
string PrintString(const string& val) const override {
if (hide_next_string_) {
hide_next_string_ = false;
return KUDU_REDACT(super::PrintString(val));
return super::PrintString(val);
string PrintBytes(const string& val) const override {
if (hide_next_string_) {
hide_next_string_ = false;
return KUDU_REDACT(super::PrintBytes(val));
return super::PrintBytes(val);
mutable bool hide_next_string_ = false;
} // anonymous namespace
string SecureDebugString(const Message& msg) {
string debug_string;
TextFormat::Printer printer;
printer.SetDefaultFieldValuePrinter(new SecureFieldPrinter());
printer.PrintToString(msg, &debug_string);
return debug_string;
string SecureShortDebugString(const Message& msg) {
string debug_string;
TextFormat::Printer printer;
printer.SetDefaultFieldValuePrinter(new SecureFieldPrinter());
printer.PrintToString(msg, &debug_string);
// Single line mode currently might have an extra space at the end.
if (!debug_string.empty() &&
debug_string[debug_string.size() - 1] == ' ') {
debug_string.resize(debug_string.size() - 1);
return debug_string;
WritablePBContainerFile::WritablePBContainerFile(shared_ptr<RWFile> writer)
: state_(FileState::NOT_INITIALIZED),
writer_(std::move(writer)) {
WritablePBContainerFile::~WritablePBContainerFile() {
WARN_NOT_OK(Close(), "Could not Close() when destroying file");
Status WritablePBContainerFile::SetVersionForTests(int version) {
if (!IsSupportedContainerVersion(version)) {
return Status::NotSupported(Substitute("Version $0 is not supported", version));
version_ = version;
return Status::OK();
Status WritablePBContainerFile::CreateNew(const Message& msg) {
const uint64_t kHeaderLen = (version_ == 1) ? kPBContainerV1HeaderLen
: kPBContainerV1HeaderLen + kPBContainerChecksumLen;
faststring buf;
// Serialize the magic.
strings::memcpy_inlined(, kPBContainerMagic, kPBContainerMagicLen);
uint64_t offset = kPBContainerMagicLen;
// Serialize the version.
InlineEncodeFixed32( + offset, version_);
offset += sizeof(uint32_t);
DCHECK_EQ(kPBContainerV1HeaderLen, offset)
<< "Serialized unexpected number of total bytes";
// Versions >= 2: Checksum the magic and version.
if (version_ >= 2) {
uint32_t header_checksum = crc::Crc32c(, offset);
InlineEncodeFixed32( + offset, header_checksum);
offset += sizeof(uint32_t);
DCHECK_EQ(offset, kHeaderLen);
// Serialize the supplemental header.
ContainerSupHeaderPB sup_header;
RETURN_NOT_OK_PREPEND(AppendMsgToBuffer(sup_header, &buf),
"Failed to prepare supplemental header for writing");
// Write the serialized buffer to the file.
"Failed to append header to file");
state_ = FileState::OPEN;
return Status::OK();
Status WritablePBContainerFile::OpenExisting() {
boost::optional<uint64_t> size;
RETURN_NOT_OK(ParsePBFileHeader(writer_.get(), &size, &offset_, &version_));
ContainerSupHeaderPB sup_header;
RETURN_NOT_OK(ReadSupplementalHeader(writer_.get(), version_, &size,
&offset_, &sup_header));
offset_ = size.get(); // Reset the write offset to the end of the file.
state_ = FileState::OPEN;
return Status::OK();
Status WritablePBContainerFile::AppendBytes(const Slice& data) {
std::lock_guard<Mutex> l(offset_lock_);
RETURN_NOT_OK(writer_->Write(offset_, data));
offset_ += data.size();
return Status::OK();
Status WritablePBContainerFile::Append(const Message& msg) {
DCHECK_EQ(FileState::OPEN, state_);
faststring buf;
RETURN_NOT_OK_PREPEND(AppendMsgToBuffer(msg, &buf),
"Failed to prepare buffer for writing");
RETURN_NOT_OK_PREPEND(AppendBytes(buf), "Failed to append data to file");
return Status::OK();
Status WritablePBContainerFile::Flush() {
DCHECK_EQ(FileState::OPEN, state_);
// TODO: Flush just the dirty bytes.
RETURN_NOT_OK_PREPEND(writer_->Flush(RWFile::FLUSH_ASYNC, 0, 0), "Failed to Flush() file");
return Status::OK();
Status WritablePBContainerFile::Sync() {
DCHECK_EQ(FileState::OPEN, state_);
RETURN_NOT_OK_PREPEND(writer_->Sync(), "Failed to Sync() file");
return Status::OK();
Status WritablePBContainerFile::Close() {
if (state_ != FileState::CLOSED) {
state_ = FileState::CLOSED;
Status s = writer_->Close();
RETURN_NOT_OK_PREPEND(s, "Failed to Close() file");
return Status::OK();
const string& WritablePBContainerFile::filename() const {
return writer_->filename();
Status WritablePBContainerFile::AppendMsgToBuffer(const Message& msg, faststring* buf) {
DCHECK(msg.IsInitialized()) << InitializationErrorMessage("serialize", msg);
int data_len = msg.ByteSize();
// Messages >2G cannot be serialized due to overflow computing ByteSize.
DCHECK_GE(data_len, 0) << "Error computing ByteSize";
uint64_t record_buflen = sizeof(uint32_t) + data_len + sizeof(uint32_t);
if (version_ >= 2) {
record_buflen += sizeof(uint32_t); // Additional checksum just for the length.
// Grow the buffer to hold the new data.
uint64_t record_offset = buf->size();
buf->resize(record_offset + record_buflen);
uint8_t* dst = buf->data() + record_offset;
// Serialize the data length.
size_t cur_offset = 0;
InlineEncodeFixed32(dst + cur_offset, static_cast<uint32_t>(data_len));
cur_offset += sizeof(uint32_t);
// For version >= 2: Serialize the checksum of the data length.
if (version_ >= 2) {
uint32_t length_checksum = crc::Crc32c(&data_len, sizeof(data_len));
InlineEncodeFixed32(dst + cur_offset, length_checksum);
cur_offset += sizeof(uint32_t);
// Serialize the data.
uint64_t data_offset = cur_offset;
if (PREDICT_FALSE(!msg.SerializeWithCachedSizesToArray(dst + cur_offset))) {
return Status::IOError("Failed to serialize PB to array");
cur_offset += data_len;
// Calculate and serialize the data checksum.
// For version 1, this is the checksum of the len + data.
// For version >= 2, this is only the checksum of the data.
uint32_t data_checksum;
if (version_ == 1) {
data_checksum = crc::Crc32c(dst, cur_offset);
} else {
data_checksum = crc::Crc32c(dst + data_offset, data_len);
InlineEncodeFixed32(dst + cur_offset, data_checksum);
cur_offset += sizeof(uint32_t);
DCHECK_EQ(record_buflen, cur_offset) << "Serialized unexpected number of total bytes";
return Status::OK();
void WritablePBContainerFile::PopulateDescriptorSet(
const FileDescriptor* desc, FileDescriptorSet* output) {
// Because we don't compile protobuf with TSAN enabled, copying the
// static PB descriptors in this function ends up triggering a lot of
// race reports. We suppress the reports, but TSAN still has to walk
// the stack, etc, and this function becomes very slow. So, we ignore
// TSAN here.
debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
FileDescriptorSet all_descs;
// Tracks all schemas that have been added to 'unemitted' at one point
// or another. Is a superset of 'unemitted' and only ever grows.
unordered_set<const FileDescriptor*> processed;
// Tracks all remaining unemitted schemas.
deque<const FileDescriptor*> unemitted;
InsertOrDie(&processed, desc);
while (!unemitted.empty()) {
const FileDescriptor* proto = unemitted.front();
// The current schema is emitted iff we've processed (i.e. emitted) all
// of its dependencies.
bool emit = true;
for (int i = 0; i < proto->dependency_count(); i++) {
const FileDescriptor* dep = proto->dependency(i);
if (InsertIfNotPresent(&processed, dep)) {
emit = false;
if (emit) {
ReadablePBContainerFile::ReadablePBContainerFile(shared_ptr<RandomAccessFile> reader)
: state_(FileState::NOT_INITIALIZED),
reader_(std::move(reader)) {
ReadablePBContainerFile::~ReadablePBContainerFile() {
Status ReadablePBContainerFile::Open() {
RETURN_NOT_OK(ParsePBFileHeader(reader_.get(), &cached_file_size_, &offset_, &version_));
ContainerSupHeaderPB sup_header;
RETURN_NOT_OK(ReadSupplementalHeader(reader_.get(), version_, &cached_file_size_,
&offset_, &sup_header));
pb_type_ = sup_header.pb_type();
state_ = FileState::OPEN;
return Status::OK();
Status ReadablePBContainerFile::ReadNextPB(Message* msg) {
DCHECK_EQ(FileState::OPEN, state_);
return ReadFullPB(reader_.get(), version_, &cached_file_size_, &offset_, msg);
Status ReadablePBContainerFile::GetPrototype(const Message** prototype) {
if (!prototype_) {
// Loading the schemas into a DescriptorDatabase (and not directly into
// a DescriptorPool) defers resolution until FindMessageTypeByName()
// below, allowing for schemas to be loaded in any order.
unique_ptr<SimpleDescriptorDatabase> db(new SimpleDescriptorDatabase());
for (int i = 0; i < protos()->file_size(); i++) {
if (!db->Add(protos()->file(i))) {
return Status::Corruption("Descriptor not loaded", Substitute(
"Could not load descriptor for PB type $0 referenced in container file",
unique_ptr<DescriptorPool> pool(new DescriptorPool(db.get()));
const Descriptor* desc = pool->FindMessageTypeByName(pb_type());
if (!desc) {
return Status::NotFound("Descriptor not found", Substitute(
"Could not find descriptor for PB type $0 referenced in container file",
unique_ptr<DynamicMessageFactory> factory(new DynamicMessageFactory());
const Message* p = factory->GetPrototype(desc);
if (!p) {
return Status::NotSupported("Descriptor not supported", Substitute(
"Descriptor $0 referenced in container file not supported",
db_ = std::move(db);
descriptor_pool_ = std::move(pool);
message_factory_ = std::move(factory);
prototype_ = p;
*prototype = prototype_;
return Status::OK();
Status ReadablePBContainerFile::Dump(ostream* os, ReadablePBContainerFile::Format format) {
DCHECK_EQ(FileState::OPEN, state_);
// Since we use the protobuf library support for dumping JSON, there isn't any easy
// way to hook in our redaction support. Since this is only used by CLI tools,
// just refuse to dump JSON if redaction is enabled.
if (format == Format::JSON && KUDU_SHOULD_REDACT()) {
return Status::NotSupported("cannot dump PBC file in JSON format if redaction is enabled");
const char* const kDashes = "-------";
if (format == Format::DEBUG) {
*os << "File header" << endl;
*os << kDashes << endl;
*os << "Protobuf container version: " << version_ << endl;
*os << "Total container file size: " << *cached_file_size_ << endl;
*os << "Entry PB type: " << pb_type_ << endl;
*os << endl;
// Use the embedded protobuf information from the container file to
// create the appropriate kind of protobuf Message.
const Message* prototype;
unique_ptr<Message> msg(prototype_->New());
// Dump each message in the container file.
int count = 0;
uint64_t prev_offset = offset_;
Status s;
string buf;
for (s = ReadNextPB(msg.get());
s = ReadNextPB(msg.get())) {
switch (format) {
case Format::ONELINE:
*os << count << "\t" << SecureShortDebugString(*msg) << endl;
case Format::DEFAULT:
case Format::DEBUG:
*os << "Message " << count << endl;
if (format == Format::DEBUG) {
*os << "offset: " << prev_offset << endl;
*os << "length: " << (offset_ - prev_offset) << endl;
*os << kDashes << endl;
*os << SecureDebugString(*msg) << endl;
case Format::JSON:
const auto& google_status = google::protobuf::util::MessageToJsonString(
*msg, &buf, google::protobuf::util::JsonPrintOptions());
if (!google_status.ok()) {
return Status::RuntimeError("could not convert PB to JSON", google_status.ToString());
*os << buf << endl;
prev_offset = offset_;
if (format == Format::DEBUG && !s.IsEndOfFile()) {
*os << "Message " << count << endl;
*os << "error: failed to parse protobuf message" << endl;
*os << "offset: " << prev_offset << endl;
*os << "remaining file length: " << (*cached_file_size_ - prev_offset) << endl;
*os << kDashes << endl;
return s.IsEndOfFile() ? Status::OK() : s;
Status ReadablePBContainerFile::Close() {
state_ = FileState::CLOSED;
return Status::OK();
int ReadablePBContainerFile::version() const {
DCHECK_EQ(FileState::OPEN, state_);
return version_;
uint64_t ReadablePBContainerFile::offset() const {
DCHECK_EQ(FileState::OPEN, state_);
return offset_;
Status ReadPBContainerFromPath(Env* env, const std::string& path, Message* msg) {
unique_ptr<RandomAccessFile> file;
RETURN_NOT_OK(env->NewRandomAccessFile(path, &file));
ReadablePBContainerFile pb_file(std::move(file));
return pb_file.Close();
Status WritePBContainerToPath(Env* env, const std::string& path,
const Message& msg,
CreateMode create,
SyncMode sync) {
TRACE_EVENT2("io", "WritePBContainerToPath",
"path", path,
"msg_type", msg.GetTypeName());
if (create == NO_OVERWRITE && env->FileExists(path)) {
return Status::AlreadyPresent(Substitute("File $0 already exists", path));
const string tmp_template = path + kTmpInfix + kTmpTemplateSuffix;
string tmp_path;
unique_ptr<RWFile> file;
RETURN_NOT_OK(env->NewTempRWFile(RWFileOptions(), tmp_template, &tmp_path, &file));
auto tmp_deleter = MakeScopedCleanup([&]() {
WARN_NOT_OK(env->DeleteFile(tmp_path), "Could not delete file " + tmp_path);
WritablePBContainerFile pb_file(std::move(file));
if (sync == pb_util::SYNC) {
RETURN_NOT_OK_PREPEND(env->RenameFile(tmp_path, path),
"Failed to rename tmp file to " + path);
if (sync == pb_util::SYNC) {
"Failed to SyncDir() parent of " + path);
return Status::OK();
scoped_refptr<debug::ConvertableToTraceFormat> PbTracer::TracePb(const Message& msg) {
return make_scoped_refptr(new PbTracer(msg));
PbTracer::PbTracer(const Message& msg) : msg_(msg.New()) {
void PbTracer::AppendAsTraceFormat(std::string* out) const {
pb_util::TruncateFields(msg_.get(), kMaxFieldLengthToTrace);
std::ostringstream ss;
JsonWriter jw(&ss, JsonWriter::COMPACT);
} // namespace pb_util
} // namespace kudu