// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
//
// Utilities for dealing with protocol buffers.
// These are mostly just functions similar to what are found in the protobuf
// library itself, but using kudu::faststring instances instead of STL strings.
#pragma once

#include <cstdint>
#include <iosfwd>
#include <memory>
#include <optional>
#include <string>

#include <google/protobuf/message.h>
#include <gtest/gtest_prod.h>

#include "kudu/gutil/ref_counted.h"
#include "kudu/util/debug/trace_event_impl.h"
#include "kudu/util/mutex.h"

namespace google {
namespace protobuf {
class DescriptorPool;
class FileDescriptor;
class FileDescriptorSet;
class MessageLite;
class SimpleDescriptorDatabase;
} // namespace protobuf
} // namespace google

namespace kudu {

class Env;
class RWFile;
class RandomAccessFile;
class SequentialFile;
class Slice;
class Status;
class faststring;

namespace pb_util {

enum SyncMode {
  SYNC,
  NO_SYNC
};

enum CreateMode {
  OVERWRITE,
  NO_OVERWRITE
};

enum SensitivityMode {
  SENSITIVE,
  NOT_SENSITIVE
};

enum class FileState {
  NOT_INITIALIZED,
  OPEN,
  CLOSED
};

// The minimum valid length of a PBC file.
extern const int kPBContainerMinimumValidLength;

// See MessageLite::AppendToString
void AppendToString(const google::protobuf::MessageLite &msg, faststring *output);

// See MessageLite::AppendPartialToString
void AppendPartialToString(const google::protobuf::MessageLite &msg, faststring *output);

// See MessageLite::SerializeToString.
void SerializeToString(const google::protobuf::MessageLite &msg, faststring *output);

// See MessageLite::ParseFromZeroCopyStream
Status ParseFromSequentialFile(google::protobuf::MessageLite *msg, SequentialFile *rfile);

// Similar to MessageLite::ParseFromArray, with the difference that it returns
// Status::Corruption() if the message could not be parsed.
Status ParseFromArray(google::protobuf::MessageLite* msg, const uint8_t* data, uint32_t length);

// Load a protobuf from the given path.
Status ReadPBFromPath(Env* env, const std::string& path, google::protobuf::MessageLite* msg);

// Serialize a protobuf to the given path.
//
// If SyncMode SYNC is provided, ensures the changes are made durable.
Status WritePBToPath(Env* env, const std::string& path,
                     const google::protobuf::MessageLite& msg, SyncMode sync);

// Truncate any 'bytes' or 'string' fields of this message to max_len.
// The text "<truncated>" is appended to any such truncated fields.
void TruncateFields(google::protobuf::Message* message, int max_len);

// Redaction-sensitive variant of Message::DebugString.
//
// For most protobufs, this has identical output to Message::DebugString. However,
// a field with string or binary type may be tagged with the 'kudu.REDACT' option,
// available by importing 'pb_util.proto'. When such a field is encountered by this
// method, its contents will be redacted using the 'KUDU_REDACT' macro as documented
// in kudu/util/logging.h.
std::string SecureDebugString(const google::protobuf::Message& msg);

// Same as SecureDebugString() above, but equivalent to Message::ShortDebugString.
std::string SecureShortDebugString(const google::protobuf::Message& msg);

// A protobuf "container" has the following format (all integers in
// little-endian byte order).
//
// <file header>
// <1 or more records>
//
// Note: There are two versions (version 1 and version 2) of the record format.
// Version 2 of the file format differs from version 1 in the following ways:
//
//   * Version 2 has a file header checksum.
//   * Version 2 has separate checksums for the record length and record data
//     fields.
//
// File header format
// ------------------
//
// Each protobuf container file contains a file header identifying the file.
// This includes:
//
// magic number: 8 byte string identifying the file format.
//
//    Included so that we have a minimal guarantee that this file is of the
//    type we expect and that we are not just reading garbage.
//
// container_version: 4 byte unsigned integer indicating the "version" of the
//                    container format. May be set to 1 or 2.
//
//    Included so that this file format may be extended at some later date
//    while maintaining backwards compatibility.
//
// file_header_checksum (version 2+ only): 4 byte unsigned integer with a CRC32C
//                                         of the magic and version fields.
//
//    Included so that we can validate the container version number.
//
// The remaining container fields are considered part of a "record". There may
// be 1 or more records in a valid protobuf container file.
//
// Record format
// -------------
//
// data length: 4 byte unsigned integer indicating the size of the encoded data.
//
//    Included because PB messages aren't self-delimiting, and thus
//    writing a stream of messages to the same file requires
//    delimiting each with its size.
//
//    See https://developers.google.com/protocol-buffers/docs/techniques?hl=zh-cn#streaming
//    for more details.
//
// length checksum (version 2+ only): 4-byte unsigned integer containing the
//                                    CRC32C checksum of "data length".
//
//    Included so that we may discern the difference between a truncated file
//    and a corrupted length field.
//
// data: "size" bytes of protobuf data encoded according to the schema.
//
//    Our payload.
//
// data checksum: 4 byte unsigned integer containing the CRC32C checksum of "data".
//
//    Included to ensure validity of the data on-disk.
//    Note: In version 1 of the file format, this is a checksum of both the
//    "data length" and "data" fields. In version 2+, this is only a checksum
//    of the "data" field.
//
// Supplemental header
// -------------------
//
// A valid container must have at least one record, the first of
// which is known as the "supplemental header". The supplemental header
// contains additional container-level information, including the protobuf
// schema used for the records following it. See pb_util.proto for details. As
// a containerized PB message, the supplemental header is protected by a CRC32C
// checksum like any other message.
//
// Error detection and tolerance
// -----------------------------
//
// It is worth describing the kinds of errors that can be detected by the
// protobuf container and the kinds that cannot.
//
// The checksums in the container are independent, not rolling. As such,
// they won't detect the disappearance or reordering of entire protobuf
// messages, which can happen if a range of the file is collapsed (see
// man fallocate(2)) or if the file is otherwise manually manipulated.
//
// In version 1, the checksums do not protect against corruption in the data
// length field. However, version 2 of the format resolves that problem. The
// benefit is that version 2 files can tell the difference between a record
// with a corrupted length field and a record that was only partially written.
// See ReadablePBContainerFile::ReadNextPB() for discussion on how this
// difference is expressed via the API.
//
// In version 1 of the format, corruption of the version field in the file
// header is not detectable. However, version 2 of the format addresses that
// limitation as well.
//
// Corruption of the protobuf data itself is detected in all versions of the
// file format (subject to CRC32 limitations).
//
// The container does not include footers or periodic checkpoints. As such, it
// will not detect if entire records are truncated.
//
// The design and implementation relies on data ordering guarantees provided by
// the file system to ensure that bytes are written to a file before the file
// metadata (file size) is updated. A partially-written record (the result of a
// failed append) is identified by one of the following criteria:
// 1. Too-few bytes remain in the file to constitute a valid record. For
//    version 2, that would be fewer than 12 bytes (data len, data len
//    checksum, and data checksum), or
// 2. Assuming a record's data length field is valid, then fewer bytes remain
//    in the file than are specified in the data length field (plus enough for
//    checksums).
// In the above scenarios, it is assumed that the system faulted while in the
// middle of appending a record, and it is considered safe to truncate the file
// at the beginning of the partial record.
//
// If filesystem preallocation is used (at the time of this writing, the
// implementation does not support preallocation) then even version 2 of the
// format cannot safely support culling trailing partially-written records.
// This is because it is not possible to reliably tell the difference between a
// partially-written record that did not complete fsync (resulting in a bad
// checksum) vs. a record that successfully was written to disk but then fell
// victim to bit-level disk corruption. See also KUDU-1414.
//
// These tradeoffs in error detection are reasonable given the failure
// environment that Kudu operates within. We tolerate failures such as
// "kill -9" of the Kudu process, machine power loss, or fsync/fdatasync
// failure, but not failures like runaway processes mangling data files
// in arbitrary ways or attackers crafting malicious data files.
//
// In short, no version of the file format will detect truncation of entire
// protobuf records. Version 2 relies on ordered data flushing semantics for
// automatic recoverability from partial record writes. Version 1 of the file
// format cannot support automatic recoverability from partial record writes.
//
// For further reading on what files might look like following a normal
// filesystem failure or disk corruption, and the likelihood of various types
// of disk errors, see the following papers:
//
// https://www.usenix.org/system/files/conference/osdi14/osdi14-paper-pillai.pdf
// https://www.usenix.org/legacy/event/fast08/tech/full_papers/bairavasundaram/bairavasundaram.pdf

// Protobuf container file opened for writing. Can be built around an existing
// file or a completely new file.
//
// Every function is thread-safe unless indicated otherwise.
class WritablePBContainerFile {
 public:

  // Initializes the class instance; writer must be open.
  explicit WritablePBContainerFile(std::shared_ptr<RWFile> writer);

  // Closes the container if not already closed.
  ~WritablePBContainerFile();

  // Writes the file header to disk and initializes the write offset to the
  // byte after the file header. This method should NOT be called when opening
  // an existing file for append; use OpenExisting() for that.
  //
  // 'msg' need not be populated; its type is used to "lock" the container
  // to a particular protobuf message type in Append().
  //
  // Not thread-safe.
  Status CreateNew(const google::protobuf::Message& msg);

  // Opens an existing protobuf container file for append. The file must
  // already have a valid file header. To initialize a new blank file for
  // writing, use CreateNew() instead.
  //
  // The file header is read and the version specified there is used as the
  // format version. The length of the file is also read and is used as the
  // write offset for subsequent Append() calls. WritablePBContainerFile caches
  // the write offset instead of constantly calling stat() on the file each
  // time append is called.
  //
  // Not thread-safe.
  Status OpenExisting();

  // Writes a protobuf message to the container, beginning with its size
  // and ending with its CRC32 checksum. One of CreateNew() or OpenExisting()
  // must be called prior to calling Append(), i.e. the file must be open.
  Status Append(const google::protobuf::Message& msg);

  // Asynchronously flushes all dirty container data to the filesystem.
  // The file must be open.
  Status Flush();

  // Synchronizes all dirty container data to the filesystem.
  // The file must be open.
  //
  // Note: the parent directory is _not_ synchronized. Because the
  // container file was provided during construction, we don't know whether
  // it was created or reopened, and parent directory synchronization is
  // only needed in the former case.
  Status Sync();

  // Get current offset of underlying file.
  uint64_t Offset() const;

  // Closes the container.
  //
  // Not thread-safe.
  Status Close();

  // Returns the path to the container's underlying file handle.
  const std::string& filename() const;

 private:
  friend class TestPBUtil;
  FRIEND_TEST(TestPBUtil, TestPopulateDescriptorSet);

  // Set the file format version. Only used for testing.
  // Must be called before CreateNew().
  Status SetVersionForTests(int version);

  // Write the protobuf schemas belonging to 'desc' and all of its
  // dependencies to 'output'.
  //
  // Schemas are written in dependency order (i.e. if A depends on B which
  // depends on C, the order is C, B, A).
  static void PopulateDescriptorSet(const google::protobuf::FileDescriptor* desc,
                                    google::protobuf::FileDescriptorSet* output);

  // Serialize the contents of 'msg' into 'buf' along with additional metadata
  // to aid in deserialization.
  Status AppendMsgToBuffer(const google::protobuf::Message& msg, faststring* buf);

  // Append bytes to the file.
  Status AppendBytes(const Slice& data);

  // State of the file.
  FileState state_;

  // Protects offset_.
  mutable Mutex offset_lock_;

  // Current write offset into the file.
  uint64_t offset_;

  // Protobuf container file version.
  int version_;

  // File writer.
  std::shared_ptr<RWFile> writer_;
};

// Protobuf container file opened for reading.
//
// Can be built around a file with existing contents or an empty file (in
// which case it's safe to interleave with WritablePBContainerFile).
class ReadablePBContainerFile {
 public:

  // Initializes the class instance; reader must be open.
  explicit ReadablePBContainerFile(std::shared_ptr<RandomAccessFile> reader);

  // Closes the file if not already closed.
  ~ReadablePBContainerFile();

  // Reads the header information from the container and validates it.
  // Must be called before any of the other methods.
  Status Open();

  // Reads a protobuf message from the container, validating its size and
  // data using a CRC32 checksum. File must be open.
  //
  // Return values:
  // * If there are no more records in the file, returns Status::EndOfFile.
  // * If there is a partial record, but it is not long enough to be a full
  //   record or the written length of the record is less than the remaining
  //   bytes in the file, returns Status::Incomplete. If Status::Incomplete
  //   is returned, calling offset() will return the point in the file where
  //   the invalid partial record begins. In order to append additional records
  //   to the file, the file must first be truncated at that offset.
  //   Note: Version 1 of this file format will never return
  //   Status::Incomplete() from this method.
  // * If a corrupt record is encountered, returns Status::Corruption.
  // * On success, stores the result in '*msg' and returns OK.
  Status ReadNextPB(google::protobuf::Message* msg);

  // Dumps any unread protobuf messages in the container to 'os'. Each
  // message's DebugString() method is invoked to produce its textual form.
  // File must be open.
  enum class Format {
    // Print each message on multiple lines, with intervening headers.
    DEFAULT,
    // Same as DEFAULT but includes additional metadata information.
    DEBUG,
    // Print each message on its own line.
    ONELINE,
    // Dump in JSON compact format.
    JSON,
    // Dump in JSON pretty format.
    JSON_PRETTY
  };
  Status Dump(std::ostream* os, Format format);

  // Closes the container.
  Status Close();

  // Expected PB type and schema for each message to be read.
  //
  // Only valid after a successful call to Open().
  const std::string& pb_type() const { return pb_type_; }
  const google::protobuf::FileDescriptorSet* protos() const {
    return protos_.get();
  }

  // Get the prototype instance for the type of messages stored in this
  // file. The returned Message is owned by this ReadablePBContainerFile instance.
  Status GetPrototype(const google::protobuf::Message** prototype);

  // Return the protobuf container file format version.
  // File must be open.
  int version() const;

  // Return current read offset.
  // File must be open.
  uint64_t offset() const;

 private:
  FileState state_;
  int version_;
  uint64_t offset_;

  // The size of the file we are reading, or 'none' if it hasn't yet been
  // read.
  std::optional<uint64_t> cached_file_size_;

  // The fully-qualified PB type name of the messages in the container.
  std::string pb_type_;

  // Wrapped in a unique_ptr so that clients need not include PB headers.
  std::unique_ptr<google::protobuf::FileDescriptorSet> protos_;

  // Protobuf infrastructure which owns the message prototype 'prototype_'.
  std::unique_ptr<google::protobuf::SimpleDescriptorDatabase> db_;
  std::unique_ptr<google::protobuf::DescriptorPool> descriptor_pool_;
  std::unique_ptr<google::protobuf::MessageFactory> message_factory_;
  const google::protobuf::Message* prototype_ = nullptr;

  std::shared_ptr<RandomAccessFile> reader_;
};

// Convenience functions for protobuf containers holding just one record.

// Load a "containerized" protobuf from the given path.
// If the file does not exist, returns Status::NotFound(). Otherwise, may
// return other Status error codes such as Status::IOError.
Status ReadPBContainerFromPath(Env* env, const std::string& path,
                               google::protobuf::Message* msg,
                               SensitivityMode sensitivity_mode);

// Serialize a "containerized" protobuf to the given path.
//
// If create == NO_OVERWRITE and 'path' already exists, the function will fail.
// If sync == SYNC, the newly created file will be fsynced before returning.
Status WritePBContainerToPath(Env* env, const std::string& path,
                              const google::protobuf::Message& msg,
                              CreateMode create,
                              SyncMode sync,
                              SensitivityMode sensitivity_mode);

// Wrapper for a protobuf message which lazily converts to JSON when
// the trace buffer is dumped.
//
// When tracing, an instance of this class can be associated with
// a given trace, instead of a stringified PB, thus avoiding doing
// stringification inline and moving that work to the tracing process.
//
// Example usage:
//  TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this,
//                         "response", pb_util::PbTracer::TracePb(*response_pb_),
//                         ...);
//
class PbTracer : public debug::ConvertableToTraceFormat {
 public:
  enum {
    kMaxFieldLengthToTrace = 100
  };

  // Static helper to be called when adding a stringified PB to a trace.
  // This does not actually stringify 'msg', that will be done later
  // when/if AppendAsTraceFormat() is called on the returned object.
  static scoped_refptr<debug::ConvertableToTraceFormat> TracePb(
      const google::protobuf::Message& msg);

  explicit PbTracer(const google::protobuf::Message& msg);

  // Actually stringifies the PB and appends the string to 'out'.
  void AppendAsTraceFormat(std::string* out) const override;
 private:
  const std::unique_ptr<google::protobuf::Message> msg_;
};

} // namespace pb_util

} // namespace kudu
