[cfile] unsorted updates on cfile_reader.{h,cc}

This patch makes the code in cfile_reader.{h,cc} a bit more robust:
  * check more rigorously for invariants to protect against unsigned
    integer underflow, etc.
  * don't crash but instead return Status::Corruption() when possible
  * optimized the check on whether to verify checksums
  * added 'final' to CFileIterator and DefaultColumnValueIterator
    classes to provide more de-virtualisation opportunities for compiler
  * added PREDICT_{FALSE,TRUE} where appropriate
  * added the 'runtime' tag for the --cfile_verify_checksums flag
  * other minor updates

Change-Id: I25507963f87e08a6c3a8ba2ff1ca58836b713e18
Reviewed-on: http://gerrit.cloudera.org:8080/18965
Tested-by: Alexey Serbin <alexey@apache.org>
Reviewed-by: Abhishek Chennaka <achennaka@cloudera.com>
Reviewed-by: Yifan Zhang <chinazhangyifan@163.com>
diff --git a/src/kudu/cfile/cfile_reader.cc b/src/kudu/cfile/cfile_reader.cc
index bbd8779..5f1968d 100644
--- a/src/kudu/cfile/cfile_reader.cc
+++ b/src/kudu/cfile/cfile_reader.cc
@@ -78,6 +78,7 @@
 DEFINE_bool(cfile_verify_checksums, true,
             "Verify the checksum for each block on read if one exists");
 TAG_FLAG(cfile_verify_checksums, evolving);
+TAG_FLAG(cfile_verify_checksums, runtime);
 
 DEFINE_double(cfile_inject_corruption, 0,
               "Fraction of the time that read operations on CFiles will fail "
@@ -108,8 +109,8 @@
 static Status ParseMagicAndLength(const Slice& data,
                                   uint8_t* cfile_version,
                                   uint32_t* parsed_len) {
-  if (data.size() != kMagicAndLengthSize) {
-    return Status::Corruption("Bad size data");
+  if (PREDICT_FALSE(data.size() != kMagicAndLengthSize)) {
+    return Status::Corruption("invalid data size", to_string(data.size()));
   }
 
   uint8_t version;
@@ -122,7 +123,7 @@
   }
 
   uint32_t len = DecodeFixed32(data.data() + kMagicLength);
-  if (len > kMaxHeaderFooterPBSize) {
+  if (PREDICT_FALSE(len > kMaxHeaderFooterPBSize)) {
     return Status::Corruption("invalid data size for header", to_string(len));
   }
 
@@ -138,6 +139,7 @@
   block_(std::move(block)),
   file_size_(file_size),
   codec_(nullptr),
+  do_verify_checksum_(false),
   mem_consumption_(std::move(options.parent_mem_tracker),
                    memory_footprint()) {
 }
@@ -228,12 +230,11 @@
                         "failed to parse CFile pre-header");
 
   // Quick check to ensure the header size is reasonable.
-  if (header_size >= file_size_ - kMagicAndLengthSize) {
+  if (PREDICT_FALSE(file_size_ <= header_size + kMagicAndLengthSize)) {
     return Status::Corruption("invalid CFile header size", to_string(header_size));
   }
 
   // Setup the data slices.
-  uint64_t off = kMagicAndLengthSize;
   uint8_t header_scratch[header_size];
   Slice header(header_scratch, header_size);
   uint8_t checksum_scratch[kChecksumSize];
@@ -241,23 +242,22 @@
 
   // Read the header and checksum if needed.
   vector<Slice> results = { header };
-  if (has_checksums() && FLAGS_cfile_verify_checksums) {
+  if (do_verify_checksum()) {
     results.push_back(checksum);
   }
-  RETURN_NOT_OK(block_->ReadV(off, results));
+  RETURN_NOT_OK(block_->ReadV(kMagicAndLengthSize, results));
 
-  if (has_checksums() && FLAGS_cfile_verify_checksums) {
-    Slice slices[] = { mal, header };
+  if (do_verify_checksum()) {
+    Slice slices[]{ mal, header };
     RETURN_NOT_OK(VerifyChecksum(slices, checksum));
   }
 
   // Parse the protobuf header.
   header_.reset(new CFileHeaderPB());
-  if (!header_->ParseFromArray(header.data(), header.size())) {
+  if (PREDICT_FALSE(!header_->ParseFromArray(header.data(), header.size()))) {
     return Status::Corruption("invalid CFile pb header",
                               header.ToDebugString());
   }
-
   VLOG(2) << "Read header: " << SecureDebugString(*header_);
 
   return Status::OK();
@@ -267,7 +267,10 @@
   TRACE_EVENT1("io", "CFileReader::ReadAndParseFooter",
                "cfile", ToString());
   DCHECK(!init_once_.init_succeeded());
-  CHECK_GT(file_size_, kMagicAndLengthSize) << "file too short: " << file_size_;
+
+  if (PREDICT_FALSE(file_size_ <= kMagicAndLengthSize)) {
+    return Status::Corruption(Substitute("file too short: $0", file_size_));
+  }
 
   // First read and parse the "post-footer", which has magic
   // and the length of the actual protobuf footer.
@@ -278,7 +281,7 @@
   RETURN_NOT_OK(ParseMagicAndLength(mal, &cfile_version_, &footer_size));
 
   // Quick check to ensure the footer size is reasonable.
-  if (footer_size >= file_size_ - kMagicAndLengthSize) {
+  if (PREDICT_FALSE(file_size_ <= footer_size + kMagicAndLengthSize)) {
     return Status::Corruption(Substitute(
         "invalid CFile footer size $0 in block of size $1",
         footer_size, file_size_));
@@ -294,21 +297,31 @@
   // We read the checksum position in case one exists.
   // This is done to avoid the need for a follow up read call.
   Slice results[2] = {checksum, footer};
-  uint64_t off = file_size_ - kMagicAndLengthSize - footer_size - kChecksumSize;
-  RETURN_NOT_OK(block_->ReadV(off, results));
+  if (PREDICT_FALSE(file_size_ <
+                    kMagicAndLengthSize + footer_size + kChecksumSize)) {
+    return Status::Corruption(Substitute(
+        "unexpected CFile contents: total size $0, footer size $1",
+        file_size_, footer_size));
+  }
+  RETURN_NOT_OK(block_->ReadV(
+      file_size_ - kMagicAndLengthSize - footer_size - kChecksumSize, results));
 
   // Parse the protobuf footer.
   // This needs to be done before validating the checksum since the
   // incompatible_features flag tells us if a checksum exists at all.
   footer_.reset(new CFileFooterPB());
-  if (!footer_->ParseFromArray(footer.data(), footer.size())) {
+  if (PREDICT_FALSE(!footer_->ParseFromArray(footer.data(), footer.size()))) {
     return Status::Corruption("invalid CFile pb footer", footer.ToDebugString());
   }
 
+  // Remember the checksum verification choice.
+  do_verify_checksum_ =
+      PREDICT_TRUE(FLAGS_cfile_verify_checksums) && has_checksum();
+
   // Verify the footer checksum if needed.
-  if (has_checksums() && FLAGS_cfile_verify_checksums) {
+  if (do_verify_checksum()) {
     // If a checksum exists it was pre-read.
-    Slice slices[2] = {footer, mal};
+    Slice slices[]{ footer, mal };
     RETURN_NOT_OK(VerifyChecksum(slices, checksum));
   }
 
@@ -317,26 +330,21 @@
     RETURN_NOT_OK_PREPEND(GetCompressionCodec(footer_->compression(), &codec_),
                           "failed to load CFile compression codec");
   }
-
   VLOG(2) << "Read footer: " << SecureDebugString(*footer_);
 
   return Status::OK();
 }
 
-bool CFileReader::has_checksums() const {
-  return footer_->incompatible_features() & IncompatibleFeatures::CHECKSUM;
-}
-
-Status CFileReader::VerifyChecksum(ArrayView<const Slice> data, const Slice& checksum) const {
+Status CFileReader::VerifyChecksum(ArrayView<const Slice> data, const Slice& checksum) {
   uint32_t expected_checksum = DecodeFixed32(checksum.data());
   uint32_t checksum_value = 0;
-  for (auto& d : data) {
+  for (const auto& d : data) {
     checksum_value = crc::Crc32c(d.data(), d.size(), checksum_value);
   }
   if (PREDICT_FALSE(checksum_value != expected_checksum ||
                     MaybeTrue(FLAGS_cfile_inject_corruption))) {
     return Status::Corruption(
-        Substitute("Checksum does not match: $0 vs expected $1",
+        Substitute("checksum does not match: $0 vs expected $1",
                    checksum_value, expected_checksum));
   }
   return Status::OK();
@@ -361,7 +369,9 @@
  public:
   ScratchMemory() : ptr_(nullptr), size_(-1) {}
   ~ScratchMemory() {
-    if (!ptr_) return;
+    if (!ptr_) {
+      return;
+    }
     if (!from_cache_.valid()) {
       delete[] ptr_;
     }
@@ -372,14 +382,13 @@
   // to allocating from the heap. In that case, IsFromCache() will
   // return false.
   void TryAllocateFromCache(BlockCache* cache, const BlockCache::CacheKey& key, int size) {
+    DCHECK(!from_cache_.valid());
     DCHECK(!ptr_);
     from_cache_ = cache->Allocate(key, size);
     if (!from_cache_.valid()) {
-      AllocateFromHeap(size);
-      return;
-    } else {
-      ptr_ = from_cache_.val_ptr();
+      return AllocateFromHeap(size);
     }
+    ptr_ = from_cache_.val_ptr();
     size_ = size;
   }
 
@@ -434,8 +443,13 @@
                               CacheControl cache_control,
                               scoped_refptr<BlockHandle>* ret) const {
   DCHECK(init_once_.init_succeeded());
-  CHECK(ptr.offset() > 0 && ptr.offset() + ptr.size() < file_size_)
-      << Substitute("bad offset $0 in file of size $1", ptr.ToString(), file_size_);
+
+  if (PREDICT_FALSE(ptr.offset() == 0 ||
+                    file_size_ <= ptr.offset() + ptr.size())) {
+    return Status::Corruption(Substitute(
+        "bad offset $0 in file of size $1", ptr.ToString(), file_size_));
+  }
+
   BlockCacheHandle bc_handle;
   Cache::CacheBehavior cache_behavior = cache_control == CACHE_BLOCK ?
       Cache::EXPECT_IN_CACHE : Cache::NO_EXPECT_IN_CACHE;
@@ -459,8 +473,8 @@
   TRACE_COUNTER_INCREMENT(CFILE_CACHE_MISS_BYTES_METRIC_NAME, ptr.size());
 
   uint32_t data_size = ptr.size();
-  if (has_checksums()) {
-    if (PREDICT_FALSE(kChecksumSize > data_size)) {
+  if (has_checksum()) {
+    if (PREDICT_FALSE(data_size < kChecksumSize)) {
       return Status::Corruption("invalid data size for block pointer",
                                 ptr.ToString());
     }
@@ -483,15 +497,14 @@
 
   // Read the data and checksum if needed.
   Slice results_backing[] = { block, checksum };
-  bool read_checksum = has_checksums() && FLAGS_cfile_verify_checksums;
-  ArrayView<Slice> results(results_backing, read_checksum ? 2 : 1);
+  ArrayView<Slice> results(results_backing, do_verify_checksum() ? 2 : 1);
   RETURN_NOT_OK_PREPEND(block_->ReadV(ptr.offset(), results),
                         Substitute("failed to read CFile block $0 at $1",
                                    block_id().ToString(), ptr.ToString()));
 
-  if (has_checksums() && FLAGS_cfile_verify_checksums) {
-    Status s = VerifyChecksum(ArrayView<const Slice>(&block, 1), checksum);
-    if (!s.ok()) {
+  if (do_verify_checksum()) {
+    if (auto s = VerifyChecksum(ArrayView<const Slice>(&block, 1), checksum);
+        PREDICT_FALSE(!s.ok())) {
       RETURN_NOT_OK_HANDLE_CORRUPTION(
           s.CloneAndPrepend(Substitute("checksum error on CFile block $0 at $1",
                                        block_id().ToString(), ptr.ToString())),
@@ -503,11 +516,10 @@
   if (codec_ != nullptr) {
     // Init the decompressor and get the size required for the uncompressed buffer.
     CompressedBlockDecoder uncompressor(codec_, cfile_version_, block);
-    Status s = uncompressor.Init();
-    if (!s.ok()) {
-      LOG(WARNING) << "Unable to validate compressed block " << block_id().ToString()
-                   << " at " << ptr.offset() << " of size " << block.size() << ": "
-                   << s.ToString();
+    if (auto s = uncompressor.Init(); PREDICT_FALSE(!s.ok())) {
+      LOG(WARNING) << Substitute(
+          "unable to validate compressed block $0 of size $1 at offset $2: $3",
+          block_id().ToString(), block.size(), ptr.offset(), s.ToString());
       return s;
     }
     int uncompressed_size = uncompressor.uncompressed_size();
@@ -520,11 +532,11 @@
     } else {
       decompressed_scratch.AllocateFromHeap(uncompressed_size);
     }
-    s = uncompressor.UncompressIntoBuffer(decompressed_scratch.get());
-    if (!s.ok()) {
-      LOG(WARNING) << "Unable to uncompress block " << block_id().ToString()
-                   << " at " << ptr.offset()
-                   << " of size " <<  block.size() << ": " << s.ToString();
+    if (auto s = uncompressor.UncompressIntoBuffer(decompressed_scratch.get());
+        PREDICT_FALSE(!s.ok())) {
+      LOG(WARNING) << Substitute(
+          "unable to uncompress block $0 of size $1 at offset $2: $3",
+          block_id().ToString(), block.size(), ptr.offset(), s.ToString());
       return s;
     }
 
@@ -582,13 +594,6 @@
   return false;
 }
 
-void CFileReader::HandleCorruption(const fs::IOContext* io_context) const {
-  DCHECK(io_context);
-  LOG(ERROR) << "Encountered corrupted CFile in filesystem block: " << block_->id().ToString();
-  block_->block_manager()->error_manager()->RunErrorNotificationCb(
-      ErrorHandlerType::CFILE_CORRUPTION, io_context->tablet_id);
-}
-
 Status CFileReader::NewIterator(unique_ptr<CFileIterator>* iter,
                                 CacheControl cache_control,
                                 const IOContext* io_context) {
@@ -596,6 +601,16 @@
   return Status::OK();
 }
 
+bool CFileReader::has_checksum() const {
+  DCHECK(footer_);
+  return footer_->incompatible_features() & IncompatibleFeatures::CHECKSUM;
+}
+
+bool CFileReader::do_verify_checksum() const {
+  DCHECK(footer_);
+  return do_verify_checksum_;
+}
+
 size_t CFileReader::memory_footprint() const {
   size_t size = kudu_malloc_usable_size(this);
   size += block_->memory_footprint();
@@ -613,6 +628,13 @@
   return size;
 }
 
+void CFileReader::HandleCorruption(const fs::IOContext* io_context) const {
+  DCHECK(io_context);
+  LOG(ERROR) << "Encountered corrupted CFile in filesystem block: " << block_->id().ToString();
+  block_->block_manager()->error_manager()->RunErrorNotificationCb(
+      ErrorHandlerType::CFILE_CORRUPTION, io_context->tablet_id);
+}
+
 ////////////////////////////////////////////////////////////
 // Default Column Value Iterator
 ////////////////////////////////////////////////////////////
@@ -924,12 +946,12 @@
                       uint32_t* num_rows_in_block,
                       Slice* non_null_bitmap) {
   Slice data_block = (*data_block_handle)->data();
-  if (!GetVarint32(&data_block, num_rows_in_block)) {
+  if (PREDICT_FALSE(!GetVarint32(&data_block, num_rows_in_block))) {
     return Status::Corruption("bad null header, num elements in block");
   }
 
   uint32_t non_null_bitmap_size;
-  if (!GetVarint32(&data_block, &non_null_bitmap_size)) {
+  if (PREDICT_FALSE(!GetVarint32(&data_block, &non_null_bitmap_size))) {
     return Status::Corruption("bad null header, bitmap size");
   }
 
@@ -1012,10 +1034,10 @@
   // prepared_blocks_ queue.
   while (prepared_blocks_.back()->last_row_idx() < end_idx) {
     Status s = seeked_->Next();
-    if (PREDICT_FALSE(s.IsNotFound())) {
+    if (s.IsNotFound()) {
       VLOG(1) << "Reached EOF";
       break;
-    } else if (!s.ok()) {
+    } else if (PREDICT_FALSE(!s.ok())) {
       return s;
     }
     RETURN_NOT_OK(QueueCurrentDataBlock(*seeked_));
diff --git a/src/kudu/cfile/cfile_reader.h b/src/kudu/cfile/cfile_reader.h
index f0f406c..2f05ca4 100644
--- a/src/kudu/cfile/cfile_reader.h
+++ b/src/kudu/cfile/cfile_reader.h
@@ -176,19 +176,15 @@
     return BlockPointer(footer().validx_info().root_block());
   }
 
-  // Returns true if the file has checksums on the header, footer, and data blocks.
-  bool has_checksums() const;
-
   // Can be called before Init().
   std::string ToString() const { return block_->id().ToString(); }
 
-  // Handles a corruption error. Functions that may return due to a CFile
-  // corruption should call this method before returning.
-  void HandleCorruption(const fs::IOContext* io_context) const;
-
  private:
   DISALLOW_COPY_AND_ASSIGN(CFileReader);
 
+  static Status VerifyChecksum(ArrayView<const Slice> data,
+                               const Slice& checksum);
+
   CFileReader(ReaderOptions options,
               uint64_t file_size,
               std::unique_ptr<fs::ReadableBlock> block);
@@ -198,11 +194,21 @@
 
   Status ReadAndParseHeader();
   Status ReadAndParseFooter();
-  Status VerifyChecksum(ArrayView<const Slice> data, const Slice& checksum) const;
+
+  // Return true if the file has checksum on the header, footer, and data blocks.
+  bool has_checksum() const;
+
+  // Return true if has_checksum() returns true and the checksum verification
+  // is requested.
+  bool do_verify_checksum() const;
 
   // Returns the memory usage of the object including the object itself.
   size_t memory_footprint() const;
 
+  // Handles a corruption error. Functions that may return due to a CFile
+  // corruption should call this method before returning.
+  void HandleCorruption(const fs::IOContext* io_context) const;
+
   const std::unique_ptr<fs::ReadableBlock> block_;
   const uint64_t file_size_;
 
@@ -214,6 +220,8 @@
   const TypeInfo* type_info_;
   const TypeEncodingInfo* type_encoding_info_;
 
+  bool do_verify_checksum_;
+
   KuduOnceLambda init_once_;
 
   ScopedTrackedConsumption mem_consumption_;
@@ -282,7 +290,7 @@
 // Example:
 //    DefaultColumnValueIterator iter;
 //    iter.Scan(&column_block);
-class DefaultColumnValueIterator : public ColumnIterator {
+class DefaultColumnValueIterator final : public ColumnIterator {
  public:
   DefaultColumnValueIterator(const TypeInfo* typeinfo, const void* value)
       : typeinfo_(typeinfo),
@@ -312,7 +320,7 @@
 };
 
 
-class CFileIterator : public ColumnIterator {
+class CFileIterator final : public ColumnIterator {
  public:
   CFileIterator(CFileReader* reader,
                 CFileReader::CacheControl cache_control,