| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| // |
| // Utilities for dealing with protocol buffers. |
| // These are mostly just functions similar to what are found in the protobuf |
| // library itself, but using kudu::faststring instances instead of STL strings. |
| #pragma once |
| |
| #include <cstdint> |
| #include <iosfwd> |
| #include <memory> |
| #include <optional> |
| #include <string> |
| |
| #include <google/protobuf/message.h> |
| #include <gtest/gtest_prod.h> |
| |
| #include "kudu/gutil/ref_counted.h" |
| #include "kudu/util/debug/trace_event_impl.h" |
| #include "kudu/util/mutex.h" |
| |
| namespace google { |
| namespace protobuf { |
| class DescriptorPool; |
| class FileDescriptor; |
| class FileDescriptorSet; |
| class MessageLite; |
| class SimpleDescriptorDatabase; |
| } // namespace protobuf |
| } // namespace google |
| |
| namespace kudu { |
| |
| class Env; |
| class RWFile; |
| class RandomAccessFile; |
| class SequentialFile; |
| class Slice; |
| class Status; |
| class faststring; |
| |
| namespace pb_util { |
| |
| enum SyncMode { |
| SYNC, |
| NO_SYNC |
| }; |
| |
| enum CreateMode { |
| OVERWRITE, |
| NO_OVERWRITE |
| }; |
| |
| enum SensitivityMode { |
| SENSITIVE, |
| NOT_SENSITIVE |
| }; |
| |
| enum class FileState { |
| NOT_INITIALIZED, |
| OPEN, |
| CLOSED |
| }; |
| |
| // The minimum valid length of a PBC file. |
| extern const int kPBContainerMinimumValidLength; |
| |
| // See MessageLite::AppendToString |
| void AppendToString(const google::protobuf::MessageLite &msg, faststring *output); |
| |
| // See MessageLite::AppendPartialToString |
| void AppendPartialToString(const google::protobuf::MessageLite &msg, faststring *output); |
| |
| // See MessageLite::SerializeToString. |
| void SerializeToString(const google::protobuf::MessageLite &msg, faststring *output); |
| |
| // See MessageLite::ParseFromZeroCopyStream |
| Status ParseFromSequentialFile(google::protobuf::MessageLite *msg, SequentialFile *rfile); |
| |
| // Similar to MessageLite::ParseFromArray, with the difference that it returns |
| // Status::Corruption() if the message could not be parsed. |
| Status ParseFromArray(google::protobuf::MessageLite* msg, const uint8_t* data, uint32_t length); |
| |
| // Load a protobuf from the given path. |
| Status ReadPBFromPath(Env* env, const std::string& path, google::protobuf::MessageLite* msg); |
| |
| // Serialize a protobuf to the given path. |
| // |
| // If SyncMode SYNC is provided, ensures the changes are made durable. |
| Status WritePBToPath(Env* env, const std::string& path, |
| const google::protobuf::MessageLite& msg, SyncMode sync); |
| |
| // Truncate any 'bytes' or 'string' fields of this message to max_len. |
| // The text "<truncated>" is appended to any such truncated fields. |
| void TruncateFields(google::protobuf::Message* message, int max_len); |
| |
| // Redaction-sensitive variant of Message::DebugString. |
| // |
| // For most protobufs, this has identical output to Message::DebugString. However, |
| // a field with string or binary type may be tagged with the 'kudu.REDACT' option, |
| // available by importing 'pb_util.proto'. When such a field is encountered by this |
| // method, its contents will be redacted using the 'KUDU_REDACT' macro as documented |
| // in kudu/util/logging.h. |
| std::string SecureDebugString(const google::protobuf::Message& msg); |
| |
| // Same as SecureDebugString() above, but equivalent to Message::ShortDebugString. |
| std::string SecureShortDebugString(const google::protobuf::Message& msg); |
| |
| // A protobuf "container" has the following format (all integers in |
| // little-endian byte order). |
| // |
| // <file header> |
| // <1 or more records> |
| // |
| // Note: There are two versions (version 1 and version 2) of the record format. |
| // Version 2 of the file format differs from version 1 in the following ways: |
| // |
| // * Version 2 has a file header checksum. |
| // * Version 2 has separate checksums for the record length and record data |
| // fields. |
| // |
| // File header format |
| // ------------------ |
| // |
| // Each protobuf container file contains a file header identifying the file. |
| // This includes: |
| // |
| // magic number: 8 byte string identifying the file format. |
| // |
| // Included so that we have a minimal guarantee that this file is of the |
| // type we expect and that we are not just reading garbage. |
| // |
| // container_version: 4 byte unsigned integer indicating the "version" of the |
| // container format. May be set to 1 or 2. |
| // |
| // Included so that this file format may be extended at some later date |
| // while maintaining backwards compatibility. |
| // |
| // file_header_checksum (version 2+ only): 4 byte unsigned integer with a CRC32C |
| // of the magic and version fields. |
| // |
| // Included so that we can validate the container version number. |
| // |
| // The remaining container fields are considered part of a "record". There may |
| // be 1 or more records in a valid protobuf container file. |
| // |
| // Record format |
| // ------------- |
| // |
| // data length: 4 byte unsigned integer indicating the size of the encoded data. |
| // |
| // Included because PB messages aren't self-delimiting, and thus |
| // writing a stream of messages to the same file requires |
| // delimiting each with its size. |
| // |
| // See https://developers.google.com/protocol-buffers/docs/techniques?hl=zh-cn#streaming |
| // for more details. |
| // |
| // length checksum (version 2+ only): 4-byte unsigned integer containing the |
| // CRC32C checksum of "data length". |
| // |
| // Included so that we may discern the difference between a truncated file |
| // and a corrupted length field. |
| // |
| // data: "size" bytes of protobuf data encoded according to the schema. |
| // |
| // Our payload. |
| // |
| // data checksum: 4 byte unsigned integer containing the CRC32C checksum of "data". |
| // |
| // Included to ensure validity of the data on-disk. |
| // Note: In version 1 of the file format, this is a checksum of both the |
| // "data length" and "data" fields. In version 2+, this is only a checksum |
| // of the "data" field. |
| // |
| // Supplemental header |
| // ------------------- |
| // |
| // A valid container must have at least one record, the first of |
| // which is known as the "supplemental header". The supplemental header |
| // contains additional container-level information, including the protobuf |
| // schema used for the records following it. See pb_util.proto for details. As |
| // a containerized PB message, the supplemental header is protected by a CRC32C |
| // checksum like any other message. |
| // |
| // Error detection and tolerance |
| // ----------------------------- |
| // |
| // It is worth describing the kinds of errors that can be detected by the |
| // protobuf container and the kinds that cannot. |
| // |
| // The checksums in the container are independent, not rolling. As such, |
| // they won't detect the disappearance or reordering of entire protobuf |
| // messages, which can happen if a range of the file is collapsed (see |
| // man fallocate(2)) or if the file is otherwise manually manipulated. |
| // |
| // In version 1, the checksums do not protect against corruption in the data |
| // length field. However, version 2 of the format resolves that problem. The |
| // benefit is that version 2 files can tell the difference between a record |
| // with a corrupted length field and a record that was only partially written. |
| // See ReadablePBContainerFile::ReadNextPB() for discussion on how this |
| // difference is expressed via the API. |
| // |
| // In version 1 of the format, corruption of the version field in the file |
| // header is not detectable. However, version 2 of the format addresses that |
| // limitation as well. |
| // |
| // Corruption of the protobuf data itself is detected in all versions of the |
| // file format (subject to CRC32 limitations). |
| // |
| // The container does not include footers or periodic checkpoints. As such, it |
| // will not detect if entire records are truncated. |
| // |
| // The design and implementation relies on data ordering guarantees provided by |
| // the file system to ensure that bytes are written to a file before the file |
| // metadata (file size) is updated. A partially-written record (the result of a |
| // failed append) is identified by one of the following criteria: |
| // 1. Too-few bytes remain in the file to constitute a valid record. For |
| // version 2, that would be fewer than 12 bytes (data len, data len |
| // checksum, and data checksum), or |
| // 2. Assuming a record's data length field is valid, then fewer bytes remain |
| // in the file than are specified in the data length field (plus enough for |
| // checksums). |
| // In the above scenarios, it is assumed that the system faulted while in the |
| // middle of appending a record, and it is considered safe to truncate the file |
| // at the beginning of the partial record. |
| // |
| // If filesystem preallocation is used (at the time of this writing, the |
| // implementation does not support preallocation) then even version 2 of the |
| // format cannot safely support culling trailing partially-written records. |
| // This is because it is not possible to reliably tell the difference between a |
| // partially-written record that did not complete fsync (resulting in a bad |
| // checksum) vs. a record that successfully was written to disk but then fell |
| // victim to bit-level disk corruption. See also KUDU-1414. |
| // |
| // These tradeoffs in error detection are reasonable given the failure |
| // environment that Kudu operates within. We tolerate failures such as |
| // "kill -9" of the Kudu process, machine power loss, or fsync/fdatasync |
| // failure, but not failures like runaway processes mangling data files |
| // in arbitrary ways or attackers crafting malicious data files. |
| // |
| // In short, no version of the file format will detect truncation of entire |
| // protobuf records. Version 2 relies on ordered data flushing semantics for |
| // automatic recoverability from partial record writes. Version 1 of the file |
| // format cannot support automatic recoverability from partial record writes. |
| // |
| // For further reading on what files might look like following a normal |
| // filesystem failure or disk corruption, and the likelihood of various types |
| // of disk errors, see the following papers: |
| // |
| // https://www.usenix.org/system/files/conference/osdi14/osdi14-paper-pillai.pdf |
| // https://www.usenix.org/legacy/event/fast08/tech/full_papers/bairavasundaram/bairavasundaram.pdf |
| |
| // Protobuf container file opened for writing. Can be built around an existing |
| // file or a completely new file. |
| // |
| // Every function is thread-safe unless indicated otherwise. |
| class WritablePBContainerFile { |
| public: |
| |
| // Initializes the class instance; writer must be open. |
| explicit WritablePBContainerFile(std::shared_ptr<RWFile> writer); |
| |
| // Closes the container if not already closed. |
| ~WritablePBContainerFile(); |
| |
| // Writes the file header to disk and initializes the write offset to the |
| // byte after the file header. This method should NOT be called when opening |
| // an existing file for append; use OpenExisting() for that. |
| // |
| // 'msg' need not be populated; its type is used to "lock" the container |
| // to a particular protobuf message type in Append(). |
| // |
| // Not thread-safe. |
| Status CreateNew(const google::protobuf::Message& msg); |
| |
| // Opens an existing protobuf container file for append. The file must |
| // already have a valid file header. To initialize a new blank file for |
| // writing, use CreateNew() instead. |
| // |
| // The file header is read and the version specified there is used as the |
| // format version. The length of the file is also read and is used as the |
| // write offset for subsequent Append() calls. WritablePBContainerFile caches |
| // the write offset instead of constantly calling stat() on the file each |
| // time append is called. |
| // |
| // Not thread-safe. |
| Status OpenExisting(); |
| |
| // Writes a protobuf message to the container, beginning with its size |
| // and ending with its CRC32 checksum. One of CreateNew() or OpenExisting() |
| // must be called prior to calling Append(), i.e. the file must be open. |
| Status Append(const google::protobuf::Message& msg); |
| |
| // Asynchronously flushes all dirty container data to the filesystem. |
| // The file must be open. |
| Status Flush(); |
| |
| // Synchronizes all dirty container data to the filesystem. |
| // The file must be open. |
| // |
| // Note: the parent directory is _not_ synchronized. Because the |
| // container file was provided during construction, we don't know whether |
| // it was created or reopened, and parent directory synchronization is |
| // only needed in the former case. |
| Status Sync(); |
| |
| // Get current offset of underlying file. |
| uint64_t Offset() const; |
| |
| // Closes the container. |
| // |
| // Not thread-safe. |
| Status Close(); |
| |
| // Returns the path to the container's underlying file handle. |
| const std::string& filename() const; |
| |
| private: |
| friend class TestPBUtil; |
| FRIEND_TEST(TestPBUtil, TestPopulateDescriptorSet); |
| |
| // Set the file format version. Only used for testing. |
| // Must be called before CreateNew(). |
| Status SetVersionForTests(int version); |
| |
| // Write the protobuf schemas belonging to 'desc' and all of its |
| // dependencies to 'output'. |
| // |
| // Schemas are written in dependency order (i.e. if A depends on B which |
| // depends on C, the order is C, B, A). |
| static void PopulateDescriptorSet(const google::protobuf::FileDescriptor* desc, |
| google::protobuf::FileDescriptorSet* output); |
| |
| // Serialize the contents of 'msg' into 'buf' along with additional metadata |
| // to aid in deserialization. |
| Status AppendMsgToBuffer(const google::protobuf::Message& msg, faststring* buf); |
| |
| // Append bytes to the file. |
| Status AppendBytes(const Slice& data); |
| |
| // State of the file. |
| FileState state_; |
| |
| // Protects offset_. |
| mutable Mutex offset_lock_; |
| |
| // Current write offset into the file. |
| uint64_t offset_; |
| |
| // Protobuf container file version. |
| int version_; |
| |
| // File writer. |
| std::shared_ptr<RWFile> writer_; |
| }; |
| |
| // Protobuf container file opened for reading. |
| // |
| // Can be built around a file with existing contents or an empty file (in |
| // which case it's safe to interleave with WritablePBContainerFile). |
| class ReadablePBContainerFile { |
| public: |
| |
| // Initializes the class instance; reader must be open. |
| explicit ReadablePBContainerFile(std::shared_ptr<RandomAccessFile> reader); |
| |
| // Closes the file if not already closed. |
| ~ReadablePBContainerFile(); |
| |
| // Reads the header information from the container and validates it. |
| // Must be called before any of the other methods. |
| Status Open(); |
| |
| // Reads a protobuf message from the container, validating its size and |
| // data using a CRC32 checksum. File must be open. |
| // |
| // Return values: |
| // * If there are no more records in the file, returns Status::EndOfFile. |
| // * If there is a partial record, but it is not long enough to be a full |
| // record or the written length of the record is less than the remaining |
| // bytes in the file, returns Status::Incomplete. If Status::Incomplete |
| // is returned, calling offset() will return the point in the file where |
| // the invalid partial record begins. In order to append additional records |
| // to the file, the file must first be truncated at that offset. |
| // Note: Version 1 of this file format will never return |
| // Status::Incomplete() from this method. |
| // * If a corrupt record is encountered, returns Status::Corruption. |
| // * On success, stores the result in '*msg' and returns OK. |
| Status ReadNextPB(google::protobuf::Message* msg); |
| |
| // Dumps any unread protobuf messages in the container to 'os'. Each |
| // message's DebugString() method is invoked to produce its textual form. |
| // File must be open. |
| enum class Format { |
| // Print each message on multiple lines, with intervening headers. |
| DEFAULT, |
| // Same as DEFAULT but includes additional metadata information. |
| DEBUG, |
| // Print each message on its own line. |
| ONELINE, |
| // Dump in JSON compact format. |
| JSON, |
| // Dump in JSON pretty format. |
| JSON_PRETTY |
| }; |
| Status Dump(std::ostream* os, Format format); |
| |
| // Closes the container. |
| Status Close(); |
| |
| // Expected PB type and schema for each message to be read. |
| // |
| // Only valid after a successful call to Open(). |
| const std::string& pb_type() const { return pb_type_; } |
| const google::protobuf::FileDescriptorSet* protos() const { |
| return protos_.get(); |
| } |
| |
| // Get the prototype instance for the type of messages stored in this |
| // file. The returned Message is owned by this ReadablePBContainerFile instance. |
| Status GetPrototype(const google::protobuf::Message** prototype); |
| |
| // Return the protobuf container file format version. |
| // File must be open. |
| int version() const; |
| |
| // Return current read offset. |
| // File must be open. |
| uint64_t offset() const; |
| |
| private: |
| FileState state_; |
| int version_; |
| uint64_t offset_; |
| |
| // The size of the file we are reading, or 'none' if it hasn't yet been |
| // read. |
| std::optional<uint64_t> cached_file_size_; |
| |
| // The fully-qualified PB type name of the messages in the container. |
| std::string pb_type_; |
| |
| // Wrapped in a unique_ptr so that clients need not include PB headers. |
| std::unique_ptr<google::protobuf::FileDescriptorSet> protos_; |
| |
| // Protobuf infrastructure which owns the message prototype 'prototype_'. |
| std::unique_ptr<google::protobuf::SimpleDescriptorDatabase> db_; |
| std::unique_ptr<google::protobuf::DescriptorPool> descriptor_pool_; |
| std::unique_ptr<google::protobuf::MessageFactory> message_factory_; |
| const google::protobuf::Message* prototype_ = nullptr; |
| |
| std::shared_ptr<RandomAccessFile> reader_; |
| }; |
| |
| // Convenience functions for protobuf containers holding just one record. |
| |
| // Load a "containerized" protobuf from the given path. |
| // If the file does not exist, returns Status::NotFound(). Otherwise, may |
| // return other Status error codes such as Status::IOError. |
| Status ReadPBContainerFromPath(Env* env, const std::string& path, |
| google::protobuf::Message* msg, |
| SensitivityMode sensitivity_mode); |
| |
| // Serialize a "containerized" protobuf to the given path. |
| // |
| // If create == NO_OVERWRITE and 'path' already exists, the function will fail. |
| // If sync == SYNC, the newly created file will be fsynced before returning. |
| Status WritePBContainerToPath(Env* env, const std::string& path, |
| const google::protobuf::Message& msg, |
| CreateMode create, |
| SyncMode sync, |
| SensitivityMode sensitivity_mode); |
| |
| // Wrapper for a protobuf message which lazily converts to JSON when |
| // the trace buffer is dumped. |
| // |
| // When tracing, an instance of this class can be associated with |
| // a given trace, instead of a stringified PB, thus avoiding doing |
| // stringification inline and moving that work to the tracing process. |
| // |
| // Example usage: |
| // TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this, |
| // "response", pb_util::PbTracer::TracePb(*response_pb_), |
| // ...); |
| // |
| class PbTracer : public debug::ConvertableToTraceFormat { |
| public: |
| enum { |
| kMaxFieldLengthToTrace = 100 |
| }; |
| |
| // Static helper to be called when adding a stringified PB to a trace. |
| // This does not actually stringify 'msg', that will be done later |
| // when/if AppendAsTraceFormat() is called on the returned object. |
| static scoped_refptr<debug::ConvertableToTraceFormat> TracePb( |
| const google::protobuf::Message& msg); |
| |
| explicit PbTracer(const google::protobuf::Message& msg); |
| |
| // Actually stringifies the PB and appends the string to 'out'. |
| void AppendAsTraceFormat(std::string* out) const override; |
| private: |
| const std::unique_ptr<google::protobuf::Message> msg_; |
| }; |
| |
| } // namespace pb_util |
| |
| } // namespace kudu |