// Copyright 2013 Cloudera, Inc.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
// Some portions copyright (C) 2008, Google, inc.
// Utilities for working with protobufs.
// Some of this code is cribbed from the protobuf source,
// but modified to work with kudu's 'faststring' instead of STL strings.
#include "kudu/util/pb_util.h"
#include <deque>
#include <string>
#include <tr1/memory>
#include <tr1/unordered_set>
#include <vector>
#include <boost/foreach.hpp>
#include <glog/logging.h>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/descriptor.pb.h>
#include <google/protobuf/descriptor_database.h>
#include <google/protobuf/dynamic_message.h>
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream.h>
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
#include <google/protobuf/message_lite.h>
#include <google/protobuf/message.h>
#include "kudu/gutil/bind.h"
#include "kudu/gutil/callback.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/strings/escaping.h"
#include "kudu/gutil/strings/fastmem.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/coding.h"
#include "kudu/util/coding-inl.h"
#include "kudu/util/crc.h"
#include "kudu/util/debug/sanitizer_scopes.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/env.h"
#include "kudu/util/env_util.h"
#include "kudu/util/path_util.h"
#include "kudu/util/pb_util.pb.h"
#include "kudu/util/pb_util-internal.h"
#include "kudu/util/status.h"
using google::protobuf::Descriptor;
using google::protobuf::DescriptorPool;
using google::protobuf::DynamicMessageFactory;
using google::protobuf::FieldDescriptor;
using google::protobuf::FileDescriptor;
using google::protobuf::FileDescriptorProto;
using google::protobuf::FileDescriptorSet;
using google::protobuf::io::ArrayInputStream;
using google::protobuf::io::CodedInputStream;
using google::protobuf::Message;
using google::protobuf::MessageLite;
using google::protobuf::Reflection;
using google::protobuf::SimpleDescriptorDatabase;
using kudu::crc::Crc;
using kudu::pb_util::internal::SequentialFileFileInputStream;
using kudu::pb_util::internal::WritableFileOutputStream;
using std::deque;
using std::endl;
using std::string;
using std::tr1::shared_ptr;
using std::tr1::unordered_set;
using std::vector;
using strings::Substitute;
using strings::Utf8SafeCEscape;
static const char* const kTmpTemplateSuffix = ".tmp.XXXXXX";
// Protobuf container constants.
static const int kPBContainerVersion = 1;
static const char kPBContainerMagic[] = "kuducntr";
static const int kPBContainerMagicLen = 8;
static const int kPBContainerHeaderLen =
// magic number + version
kPBContainerMagicLen + sizeof(uint32_t);
static const int kPBContainerChecksumLen = sizeof(uint32_t);
COMPILE_ASSERT((arraysize(kPBContainerMagic) - 1) == kPBContainerMagicLen,
namespace kudu {
namespace pb_util {
namespace {
// When serializing, we first compute the byte size, then serialize the message.
// If serialization produces a different number of bytes than expected, we
// call this function, which crashes. The problem could be due to a bug in the
// protobuf implementation but is more likely caused by concurrent modification
// of the message. This function attempts to distinguish between the two and
// provide a useful error message.
void ByteSizeConsistencyError(int byte_size_before_serialization,
int byte_size_after_serialization,
int bytes_produced_by_serialization) {
CHECK_EQ(byte_size_before_serialization, byte_size_after_serialization)
<< "Protocol message was modified concurrently during serialization.";
CHECK_EQ(bytes_produced_by_serialization, byte_size_before_serialization)
<< "Byte size calculation and serialization were inconsistent. This "
"may indicate a bug in protocol buffers or it may be caused by "
"concurrent modification of the message.";
LOG(FATAL) << "This shouldn't be called if all the sizes are equal.";
string InitializationErrorMessage(const char* action,
const MessageLite& message) {
// Note: We want to avoid depending on strutil in the lite library, otherwise
// we'd use:
// return strings::Substitute(
// "Can't $0 message of type \"$1\" because it is missing required "
// "fields: $2",
// action, message.GetTypeName(),
// message.InitializationErrorString());
string result;
result += "Can't ";
result += action;
result += " message of type \"";
result += message.GetTypeName();
result += "\" because it is missing required fields: ";
result += message.InitializationErrorString();
return result;
} // anonymous namespace
bool AppendToString(const MessageLite &msg, faststring *output) {
DCHECK(msg.IsInitialized()) << InitializationErrorMessage("serialize", msg);
return AppendPartialToString(msg, output);
bool AppendPartialToString(const MessageLite &msg, faststring* output) {
int old_size = output->size();
int byte_size = msg.ByteSize();
output->resize(old_size + byte_size);
uint8* start = &((*output)[old_size]);
uint8* end = msg.SerializeWithCachedSizesToArray(start);
if (end - start != byte_size) {
ByteSizeConsistencyError(byte_size, msg.ByteSize(), end - start);
return true;
bool SerializeToString(const MessageLite &msg, faststring *output) {
return AppendToString(msg, output);
bool ParseFromSequentialFile(MessageLite *msg, SequentialFile *rfile) {
SequentialFileFileInputStream istream(rfile);
return msg->ParseFromZeroCopyStream(&istream);
Status ParseFromArray(MessageLite* msg, const uint8_t* data, uint32_t length) {
if (!msg->ParseFromArray(data, length)) {
return Status::Corruption("Error parsing msg", InitializationErrorMessage("parse", *msg));
return Status::OK();
Status WritePBToPath(Env* env, const std::string& path,
const MessageLite& msg,
SyncMode sync) {
const string tmp_template = path + kTmpTemplateSuffix;
string tmp_path;
gscoped_ptr<WritableFile> file;
RETURN_NOT_OK(env->NewTempWritableFile(WritableFileOptions(), tmp_template, &tmp_path, &file));
env_util::ScopedFileDeleter tmp_deleter(env, tmp_path);
WritableFileOutputStream ostream(file.get());
bool res = msg.SerializeToZeroCopyStream(&ostream);
if (!res || !ostream.Flush()) {
return Status::IOError("Unable to serialize PB to file");
if (sync == pb_util::SYNC) {
RETURN_NOT_OK_PREPEND(file->Sync(), "Failed to Sync() " + tmp_path);
RETURN_NOT_OK_PREPEND(file->Close(), "Failed to Close() " + tmp_path);
RETURN_NOT_OK_PREPEND(env->RenameFile(tmp_path, path), "Failed to rename tmp file to " + path);
if (sync == pb_util::SYNC) {
RETURN_NOT_OK_PREPEND(env->SyncDir(DirName(path)), "Failed to SyncDir() parent of " + path);
return Status::OK();
Status ReadPBFromPath(Env* env, const std::string& path, MessageLite* msg) {
shared_ptr<SequentialFile> rfile;
RETURN_NOT_OK(env_util::OpenFileForSequential(env, path, &rfile));
if (!ParseFromSequentialFile(msg, rfile.get())) {
return Status::IOError("Unable to parse PB from path", path);
return Status::OK();
static void TruncateString(string* s, int max_len) {
if (s->size() > max_len) {
void TruncateFields(Message* message, int max_len) {
const Reflection* reflection = message->GetReflection();
vector<const FieldDescriptor*> fields;
reflection->ListFields(*message, &fields);
BOOST_FOREACH(const FieldDescriptor* field, fields) {
if (field->is_repeated()) {
for (int i = 0; i < reflection->FieldSize(*message, field); i++) {
switch (field->cpp_type()) {
case FieldDescriptor::CPPTYPE_STRING: {
const string& s_const = reflection->GetRepeatedStringReference(*message, field, i,
TruncateString(const_cast<string*>(&s_const), max_len);
case FieldDescriptor::CPPTYPE_MESSAGE: {
TruncateFields(reflection->MutableRepeatedMessage(message, field, i), max_len);
} else {
switch (field->cpp_type()) {
case FieldDescriptor::CPPTYPE_STRING: {
const string& s_const = reflection->GetStringReference(*message, field, NULL);
TruncateString(const_cast<string*>(&s_const), max_len);
case FieldDescriptor::CPPTYPE_MESSAGE: {
TruncateFields(reflection->MutableMessage(message, field), max_len);
WritablePBContainerFile::WritablePBContainerFile(gscoped_ptr<WritableFile> writer)
: closed_(false),
writer_(writer.Pass()) {
WritablePBContainerFile::~WritablePBContainerFile() {
WARN_NOT_OK(Close(), "Could not Close() when destroying file");
Status WritablePBContainerFile::Init(const Message& msg) {
faststring buf;
// Serialize the magic.
strings::memcpy_inlined(, kPBContainerMagic, kPBContainerMagicLen);
size_t offset = kPBContainerMagicLen;
// Serialize the version.
InlineEncodeFixed32( + offset, kPBContainerVersion);
offset += sizeof(uint32_t);
DCHECK_EQ(kPBContainerHeaderLen, offset)
<< "Serialized unexpected number of total bytes";
// Serialize the supplemental header.
ContainerSupHeaderPB sup_header;
RETURN_NOT_OK_PREPEND(AppendMsgToBuffer(sup_header, &buf),
"Failed to prepare supplemental header for writing");
// Write the serialized buffer to the file.
"Failed to Append() header to file");
return Status::OK();
Status WritablePBContainerFile::Append(const Message& msg) {
faststring buf;
RETURN_NOT_OK_PREPEND(AppendMsgToBuffer(msg, &buf),
"Failed to prepare buffer for writing");
RETURN_NOT_OK_PREPEND(writer_->Append(buf), "Failed to Append() data to file");
return Status::OK();
Status WritablePBContainerFile::Flush() {
// TODO: Flush just the dirty bytes.
RETURN_NOT_OK_PREPEND(writer_->Flush(WritableFile::FLUSH_ASYNC), "Failed to Flush() file");
return Status::OK();
Status WritablePBContainerFile::Sync() {
RETURN_NOT_OK_PREPEND(writer_->Sync(), "Failed to Sync() file");
return Status::OK();
Status WritablePBContainerFile::Close() {
if (!closed_) {
closed_ = true;
RETURN_NOT_OK_PREPEND(writer_->Close(), "Failed to Close() file");
return Status::OK();
Status WritablePBContainerFile::AppendMsgToBuffer(const Message& msg, faststring* buf) {
DCHECK(msg.IsInitialized()) << InitializationErrorMessage("serialize", msg);
int data_size = msg.ByteSize();
uint64_t bufsize = sizeof(uint32_t) + data_size + kPBContainerChecksumLen;
// Grow the buffer to hold the new data.
size_t orig_size = buf->size();
buf->resize(orig_size + bufsize);
uint8_t* dst = buf->data() + orig_size;
// Serialize the data size.
InlineEncodeFixed32(dst, static_cast<uint32_t>(data_size));
size_t offset = sizeof(uint32_t);
// Serialize the data.
if (PREDICT_FALSE(!msg.SerializeWithCachedSizesToArray(dst + offset))) {
return Status::IOError("Failed to serialize PB to array");
offset += data_size;
// Calculate and serialize the checksum.
uint32_t checksum = crc::Crc32c(dst, offset);
InlineEncodeFixed32(dst + offset, checksum);
offset += kPBContainerChecksumLen;
DCHECK_EQ(bufsize, offset) << "Serialized unexpected number of total bytes";
return Status::OK();
void WritablePBContainerFile::PopulateDescriptorSet(
const FileDescriptor* desc, FileDescriptorSet* output) {
// Because we don't compile protobuf with TSAN enabled, copying the
// static PB descriptors in this function ends up triggering a lot of
// race reports. We suppress the reports, but TSAN still has to walk
// the stack, etc, and this function becomes very slow. So, we ignore
// TSAN here.
debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
FileDescriptorSet all_descs;
// Tracks all schemas that have been added to 'unemitted' at one point
// or another. Is a superset of 'unemitted' and only ever grows.
unordered_set<const FileDescriptor*> processed;
// Tracks all remaining unemitted schemas.
deque<const FileDescriptor*> unemitted;
InsertOrDie(&processed, desc);
while (!unemitted.empty()) {
const FileDescriptor* proto = unemitted.front();
// The current schema is emitted iff we've processed (i.e. emitted) all
// of its dependencies.
bool emit = true;
for (int i = 0; i < proto->dependency_count(); i++) {
const FileDescriptor* dep = proto->dependency(i);
if (InsertIfNotPresent(&processed, dep)) {
emit = false;
if (emit) {
ReadablePBContainerFile::ReadablePBContainerFile(gscoped_ptr<RandomAccessFile> reader)
: offset_(0),
reader_(reader.Pass()) {
ReadablePBContainerFile::~ReadablePBContainerFile() {
WARN_NOT_OK(Close(), "Could not Close() when destroying file");
Status ReadablePBContainerFile::Init() {
// Read header data.
Slice header;
gscoped_ptr<uint8_t[]> scratch;
RETURN_NOT_OK_PREPEND(ValidateAndRead(kPBContainerHeaderLen, EOF_NOT_OK, &header, &scratch),
Substitute("Could not read header for proto container file $0",
// Validate magic number.
if (PREDICT_FALSE(!strings::memeq(kPBContainerMagic,, kPBContainerMagicLen))) {
string file_magic(reinterpret_cast<const char*>(, kPBContainerMagicLen);
return Status::Corruption("Invalid magic number",
Substitute("Expected: $0, found: $1",
// Validate container file version.
uint32_t version = DecodeFixed32( + kPBContainerMagicLen);
if (PREDICT_FALSE(version != kPBContainerVersion)) {
// We only support version 1.
return Status::NotSupported(
Substitute("Protobuf container has version $0, we only support version $1",
version, kPBContainerVersion));
// Read the supplemental header.
ContainerSupHeaderPB sup_header;
RETURN_NOT_OK_PREPEND(ReadNextPB(&sup_header), Substitute(
"Could not read supplemental header from proto container file $0",
pb_type_ = sup_header.pb_type();
return Status::OK();
Status ReadablePBContainerFile::ReadNextPB(Message* msg) {
VLOG(1) << "Reading PB from offset $0 " << offset_;
// Read the size from the file. EOF here is acceptable: it means we're
// out of PB entries.
Slice size;
gscoped_ptr<uint8_t[]> size_scratch;
RETURN_NOT_OK_PREPEND(ValidateAndRead(sizeof(uint32_t), EOF_OK, &size, &size_scratch),
Substitute("Could not read data size from proto container file $0",
uint32_t data_size = DecodeFixed32(;
// Read body into buffer for checksum & parsing.
Slice body;
gscoped_ptr<uint8_t[]> body_scratch;
RETURN_NOT_OK_PREPEND(ValidateAndRead(data_size, EOF_NOT_OK, &body, &body_scratch),
Substitute("Could not read body from proto container file $0",
// Read checksum.
uint32_t expected_checksum = 0;
Slice encoded_checksum;
gscoped_ptr<uint8_t[]> encoded_checksum_scratch;
RETURN_NOT_OK_PREPEND(ValidateAndRead(kPBContainerChecksumLen, EOF_NOT_OK,
&encoded_checksum, &encoded_checksum_scratch),
Substitute("Could not read checksum from proto container file $0",
expected_checksum = DecodeFixed32(;
// Validate CRC32C checksum.
Crc* crc32c = crc::GetCrc32cInstance();
uint64_t actual_checksum = 0;
// Compute a rolling checksum over the two byte arrays (size, body).
crc32c->Compute(, size.size(), &actual_checksum);
crc32c->Compute(, body.size(), &actual_checksum);
if (PREDICT_FALSE(actual_checksum != expected_checksum)) {
return Status::Corruption(Substitute("Incorrect checksum of file $0: actually $1, expected $2",
reader_->filename(), actual_checksum, expected_checksum));
// The checksum is correct. Time to decode the body.
// We could compare pb_type_ against msg.GetTypeName(), but:
// 1. pb_type_ is not available when reading the supplemental header,
// 2. ParseFromArray() should fail if the data cannot be parsed into the
// provided message type.
// To permit parsing of very large PB messages, we must use parse through a
// CodedInputStream and bump the byte limit. The SetTotalBytesLimit() docs
// say that 512MB is the shortest theoretical message length that may produce
// integer overflow warnings, so that's what we'll use.
ArrayInputStream ais(, body.size());
CodedInputStream cis(&ais);
cis.SetTotalBytesLimit(512 * 1024 * 1024, -1);
if (PREDICT_FALSE(!msg->ParseFromCodedStream(&cis))) {
return Status::IOError("Unable to parse PB from path", reader_->filename());
return Status::OK();
Status ReadablePBContainerFile::Dump(ostream* os, bool oneline) {
// Use the embedded protobuf information from the container file to
// create the appropriate kind of protobuf Message.
// Loading the schemas into a DescriptorDatabase (and not directly into
// a DescriptorPool) defers resolution until FindMessageTypeByName()
// below, allowing for schemas to be loaded in any order.
SimpleDescriptorDatabase db;
for (int i = 0; i < protos()->file_size(); i++) {
if (!db.Add(protos()->file(i))) {
return Status::Corruption("Descriptor not loaded", Substitute(
"Could not load descriptor for PB type $0 referenced in container file",
DescriptorPool pool(&db);
const Descriptor* desc = pool.FindMessageTypeByName(pb_type());
if (!desc) {
return Status::NotFound("Descriptor not found", Substitute(
"Could not find descriptor for PB type $0 referenced in container file",
DynamicMessageFactory factory;
const Message* prototype = factory.GetPrototype(desc);
if (!prototype) {
return Status::NotSupported("Descriptor not supported", Substitute(
"Descriptor $0 referenced in container file not supported",
gscoped_ptr<Message> msg(prototype->New());
// Dump each message in the container file.
int count = 0;
Status s;
for (s = ReadNextPB(msg.get());
s = ReadNextPB(msg.get())) {
if (oneline) {
*os << count++ << "\t" << msg->ShortDebugString() << endl;
} else {
*os << "Message " << count << endl;
*os << "-------" << endl;
*os << msg->DebugString() << endl;
return s.IsEndOfFile() ? s.OK() : s;
Status ReadablePBContainerFile::Close() {
gscoped_ptr<RandomAccessFile> deleter;
return Status::OK();
Status ReadablePBContainerFile::ValidateAndRead(size_t length, EofOK eofOK,
Slice* result, gscoped_ptr<uint8_t[]>* scratch) {
// Validate the read length using the file size.
uint64_t file_size;
if (offset_ + length > file_size) {
switch (eofOK) {
case EOF_OK:
return Status::EndOfFile("Reached end of file");
case EOF_NOT_OK:
return Status::Corruption("File size not large enough to be valid",
Substitute("Proto container file $0: "
"tried to read $0 bytes at offset "
"$1 but file size is only $2",
reader_->filename(), length,
offset_, file_size));
LOG(FATAL) << "Unknown value for eofOK: " << eofOK;
// Perform the read.
Slice s;
gscoped_ptr<uint8_t[]> local_scratch(new uint8_t[length]);
RETURN_NOT_OK(reader_->Read(offset_, length, &s, local_scratch.get()));
// Sanity check the result.
if (PREDICT_FALSE(s.size() < length)) {
return Status::Corruption("Unexpected short read", Substitute(
"Proto container file $0: tried to read $1 bytes; got $2 bytes",
reader_->filename(), length, s.size()));
*result = s;
offset_ += s.size();
return Status::OK();
Status ReadPBContainerFromPath(Env* env, const std::string& path, Message* msg) {
gscoped_ptr<RandomAccessFile> file;
RETURN_NOT_OK(env->NewRandomAccessFile(path, &file));
ReadablePBContainerFile pb_file(file.Pass());
return pb_file.Close();
Status WritePBContainerToPath(Env* env, const std::string& path,
const Message& msg,
CreateMode create,
SyncMode sync) {
TRACE_EVENT2("io", "WritePBContainerToPath",
"path", path,
"msg_type", msg.GetTypeName());
if (create == NO_OVERWRITE && env->FileExists(path)) {
return Status::AlreadyPresent(Substitute("File $0 already exists", path));
const string tmp_template = path + kTmpTemplateSuffix;
string tmp_path;
gscoped_ptr<WritableFile> file;
RETURN_NOT_OK(env->NewTempWritableFile(WritableFileOptions(), tmp_template, &tmp_path, &file));
env_util::ScopedFileDeleter tmp_deleter(env, tmp_path);
WritablePBContainerFile pb_file(file.Pass());
if (sync == pb_util::SYNC) {
RETURN_NOT_OK_PREPEND(env->RenameFile(tmp_path, path),
"Failed to rename tmp file to " + path);
if (sync == pb_util::SYNC) {
"Failed to SyncDir() parent of " + path);
return Status::OK();
} // namespace pb_util
} // namespace kudu