// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "kudu/tablet/tablet_metadata.h"

#include <algorithm>
#include <functional>
#include <mutex>
#include <optional>
#include <ostream>
#include <string>
#include <type_traits>
#include <unordered_map>
#include <utility>

#include <gflags/gflags.h>

#include "kudu/common/common.pb.h"
#include "kudu/common/schema.h"
#include "kudu/common/timestamp.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/consensus/log_anchor_registry.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/fs/block_id.h"
#include "kudu/fs/block_manager.h"
#include "kudu/fs/data_dirs.h"
#include "kudu/fs/fs.pb.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/atomicops.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tablet/rowset_metadata.h"
#include "kudu/tablet/txn_metadata.h"
#include "kudu/tablet/txn_participant.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/env.h"
#include "kudu/util/fault_injection.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/logging.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/status.h"
#include "kudu/util/trace.h"

DEFINE_bool(enable_tablet_orphaned_block_deletion, true,
            "Whether to enable deletion of orphaned blocks from disk. "
            "Note: This is only exposed for debugging purposes!");
TAG_FLAG(enable_tablet_orphaned_block_deletion, advanced);
TAG_FLAG(enable_tablet_orphaned_block_deletion, hidden);
TAG_FLAG(enable_tablet_orphaned_block_deletion, runtime);

DEFINE_int32(tablet_metadata_load_inject_latency_ms, 0,
             "Amount of latency in ms to inject when load tablet metadata file. "
             "Only for testing.");
TAG_FLAG(tablet_metadata_load_inject_latency_ms, hidden);

using base::subtle::Barrier_AtomicIncrement;
using kudu::consensus::MinimumOpId;
using kudu::consensus::OpId;
using kudu::fs::BlockDeletionTransaction;
using kudu::fs::BlockManager;
using kudu::log::MinLogIndexAnchorer;
using kudu::pb_util::SecureDebugString;
using kudu::pb_util::SecureShortDebugString;
using std::memory_order_relaxed;
using std::make_optional;
using std::nullopt;
using std::optional;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::unordered_map;
using std::unordered_set;
using std::vector;
using strings::Substitute;

namespace kudu {
namespace tablet {

// ============================================================================
//  Tablet Metadata
// ============================================================================

Status TabletMetadata::CreateNew(FsManager* fs_manager,
                                 const string& tablet_id,
                                 const string& table_name,
                                 const string& table_id,
                                 const Schema& schema,
                                 const PartitionSchema& partition_schema,
                                 const Partition& partition,
                                 const TabletDataState& initial_tablet_data_state,
                                 optional<OpId> tombstone_last_logged_opid,
                                 bool supports_live_row_count,
                                 optional<TableExtraConfigPB> extra_config,
                                 optional<string> dimension_label,
                                 optional<TableTypePB> table_type,
                                 scoped_refptr<TabletMetadata>* metadata) {

  // Verify that no existing tablet exists with the same ID.
  if (fs_manager->env()->FileExists(fs_manager->GetTabletMetadataPath(tablet_id))) {
    return Status::AlreadyPresent("Tablet already exists", tablet_id);
  }

  RETURN_NOT_OK_PREPEND(fs_manager->dd_manager()->CreateDataDirGroup(tablet_id),
      "Failed to create TabletMetadata");
  auto dir_group_cleanup = MakeScopedCleanup([&]() {
    fs_manager->dd_manager()->DeleteDataDirGroup(tablet_id);
  });
  scoped_refptr<TabletMetadata> ret(new TabletMetadata(
      fs_manager,
      tablet_id,
      table_name,
      table_id,
      schema,
      partition_schema,
      partition,
      initial_tablet_data_state,
      std::move(tombstone_last_logged_opid),
      supports_live_row_count,
      std::move(extra_config),
      std::move(dimension_label),
      !table_type ||
          *table_type == TableTypePB::DEFAULT_TABLE ? nullopt
                                                    : std::move(table_type)));
  RETURN_NOT_OK(ret->Flush());
  dir_group_cleanup.cancel();

  metadata->swap(ret);
  return Status::OK();
}

Status TabletMetadata::Load(FsManager* fs_manager,
                            const string& tablet_id,
                            scoped_refptr<TabletMetadata>* metadata) {
  MAYBE_INJECT_FIXED_LATENCY(FLAGS_tablet_metadata_load_inject_latency_ms);
  scoped_refptr<TabletMetadata> ret(new TabletMetadata(fs_manager, tablet_id));
  RETURN_NOT_OK(ret->LoadFromDisk());
  metadata->swap(ret);
  return Status::OK();
}

Status TabletMetadata::LoadOrCreate(FsManager* fs_manager,
                                    const string& tablet_id,
                                    const string& table_name,
                                    const string& table_id,
                                    const Schema& schema,
                                    const PartitionSchema& partition_schema,
                                    const Partition& partition,
                                    const TabletDataState& initial_tablet_data_state,
                                    optional<OpId> tombstone_last_logged_opid,
                                    optional<TableExtraConfigPB> extra_config,
                                    optional<string> dimension_label,
                                    optional<TableTypePB> table_type,
                                    scoped_refptr<TabletMetadata>* metadata) {
  Status s = Load(fs_manager, tablet_id, metadata);
  if (s.ok()) {
    const SchemaPtr schema_ptr = (*metadata)->schema();
    if (*schema_ptr != schema) {
      return Status::Corruption(Substitute("Schema on disk ($0) does not "
        "match expected schema ($1)", schema_ptr->ToString(),
        schema.ToString()));
    }
    return Status::OK();
  }
  if (s.IsNotFound()) {
    return CreateNew(fs_manager, tablet_id, table_name, table_id, schema,
                     partition_schema, partition, initial_tablet_data_state,
                     std::move(tombstone_last_logged_opid),
                     /*supports_live_row_count=*/ true,
                     std::move(extra_config),
                     std::move(dimension_label),
                     std::move(table_type),
                     metadata);
  }
  return s;
}

vector<BlockIdPB> TabletMetadata::CollectBlockIdPBs(const TabletSuperBlockPB& superblock) {
  vector<BlockIdPB> block_ids;
  for (const RowSetDataPB& rowset : superblock.rowsets()) {
    for (const ColumnDataPB& column : rowset.columns()) {
      block_ids.push_back(column.block());
    }
    for (const DeltaDataPB& redo : rowset.redo_deltas()) {
      block_ids.push_back(redo.block());
    }
    for (const DeltaDataPB& undo : rowset.undo_deltas()) {
      block_ids.push_back(undo.block());
    }
    if (rowset.has_bloom_block()) {
      block_ids.push_back(rowset.bloom_block());
    }
    if (rowset.has_adhoc_index_block()) {
      block_ids.push_back(rowset.adhoc_index_block());
    }
  }
  return block_ids;
}

BlockIdContainer TabletMetadata::CollectBlockIds() const {
  BlockIdContainer block_ids;
  for (const auto& r : rowsets_) {
    BlockIdContainer rowset_block_ids = r->GetAllBlocks();
    block_ids.insert(block_ids.end(),
                     rowset_block_ids.begin(),
                     rowset_block_ids.end());
  }
  return block_ids;
}

BlockId TabletMetadata::GetMaxLiveBlockId() const {
  BlockId max_block_id;
  for (const auto& r : rowsets_) {
    max_block_id = std::max(max_block_id, r->GetMaxLiveBlockId());
  }
  return max_block_id;
}

Status TabletMetadata::DeleteTabletData(TabletDataState delete_type,
                                        const optional<OpId>& last_logged_opid) {
  DCHECK(!last_logged_opid || last_logged_opid->IsInitialized());
  CHECK(delete_type == TABLET_DATA_DELETED ||
        delete_type == TABLET_DATA_TOMBSTONED ||
        delete_type == TABLET_DATA_COPYING)
      << "DeleteTabletData() called with unsupported delete_type on tablet "
      << tablet_id_ << ": " << TabletDataState_Name(delete_type)
      << " (" << delete_type << ")";

  // First add all of our blocks to the orphan list
  // and clear our rowsets. This serves to erase all the data.
  //
  // We also set the state in our persisted metadata to indicate that
  // we have been deleted.
  {
    std::lock_guard<LockType> l(data_lock_);
    for (const shared_ptr<RowSetMetadata>& rsmd : rowsets_) {
      AddOrphanedBlocksUnlocked(rsmd->GetAllBlocks());
    }
    rowsets_.clear();
    tablet_data_state_ = delete_type;
    if (last_logged_opid) {
      tombstone_last_logged_opid_ = last_logged_opid;
    }
  }

  // Unregister the tablet's data dir group in memory (it is stored on disk in
  // the superblock). Even if we fail to flush below, the expectation is that
  // we will no longer be writing to the tablet, and therefore don't need its
  // data dir group.
  fs_manager_->dd_manager()->DeleteDataDirGroup(tablet_id_);

  // Flushing will sync the new tablet_data_state_ to disk and will now also
  // delete all the data.
  RETURN_NOT_OK(Flush());

  // Re-sync to disk one more time.
  // This call will typically re-sync with an empty orphaned blocks list
  // (unless deleting any orphans failed during the last Flush()), so that we
  // don't try to re-delete the deleted orphaned blocks on every startup.
  return Flush();
}

bool TabletMetadata::IsTombstonedWithNoBlocks() const {
  std::lock_guard<LockType> l(data_lock_);
  return tablet_data_state_ == TABLET_DATA_TOMBSTONED &&
      rowsets_.empty() &&
      orphaned_blocks_.empty();
}

Status TabletMetadata::DeleteSuperBlock() {
  std::lock_guard<LockType> l(data_lock_);
  if (!orphaned_blocks_.empty()) {
    return Status::InvalidArgument("The metadata for tablet " + tablet_id_ +
                                   " still references orphaned blocks. "
                                   "Call DeleteTabletData() first");
  }
  if (tablet_data_state_ != TABLET_DATA_DELETED) {
    return Status::IllegalState(
        Substitute("Tablet $0 is not in TABLET_DATA_DELETED state. "
                   "Call DeleteTabletData(TABLET_DATA_DELETED) first. "
                   "Tablet data state: $1 ($2)",
                   tablet_id_,
                   TabletDataState_Name(tablet_data_state_),
                   tablet_data_state_));
  }

  string path = fs_manager_->GetTabletMetadataPath(tablet_id_);
  RETURN_NOT_OK_PREPEND(fs_manager_->env()->DeleteFile(path),
                        "Unable to delete superblock for tablet " + tablet_id_);
  return Status::OK();
}

TabletMetadata::TabletMetadata(FsManager* fs_manager, string tablet_id,
                               string table_name, string table_id,
                               const Schema& schema, PartitionSchema partition_schema,
                               Partition partition,
                               const TabletDataState& tablet_data_state,
                               optional<OpId> tombstone_last_logged_opid,
                               bool supports_live_row_count,
                               optional<TableExtraConfigPB> extra_config,
                               optional<string> dimension_label,
                               optional<TableTypePB> table_type)
    : state_(kNotWrittenYet),
      tablet_id_(std::move(tablet_id)),
      table_id_(std::move(table_id)),
      partition_(std::move(partition)),
      fs_manager_(fs_manager),
      log_prefix_(Substitute("T $0 P $1: ", tablet_id_, fs_manager_->uuid())),
      next_rowset_idx_(0),
      last_durable_mrs_id_(kNoDurableMemStore),
      schema_(std::make_shared<Schema>(schema)),
      schema_version_(0),
      table_name_(std::move(table_name)),
      partition_schema_(std::move(partition_schema)),
      tablet_data_state_(tablet_data_state),
      tombstone_last_logged_opid_(std::move(tombstone_last_logged_opid)),
      extra_config_(std::move(extra_config)),
      dimension_label_(std::move(dimension_label)),
      table_type_(std::move(table_type)),
      num_flush_pins_(0),
      needs_flush_(false),
      flush_count_for_tests_(0),
      pre_flush_callback_(&DoNothingStatusClosure),
      supports_live_row_count_(supports_live_row_count) {
  CHECK(schema_->has_column_ids());
  CHECK_GT(schema_->num_key_columns(), 0);
}

TabletMetadata::~TabletMetadata() {
}

TabletMetadata::TabletMetadata(FsManager* fs_manager, string tablet_id)
    : state_(kNotLoadedYet),
      tablet_id_(std::move(tablet_id)),
      fs_manager_(fs_manager),
      next_rowset_idx_(0),
      num_flush_pins_(0),
      needs_flush_(false),
      flush_count_for_tests_(0),
      pre_flush_callback_(&DoNothingStatusClosure),
      supports_live_row_count_(false) {}

Status TabletMetadata::LoadFromDisk() {
  TRACE_EVENT1("tablet", "TabletMetadata::LoadFromDisk",
               "tablet_id", tablet_id_);

  CHECK_EQ(state_, kNotLoadedYet);

  TabletSuperBlockPB superblock;
  RETURN_NOT_OK(ReadSuperBlockFromDisk(&superblock));
  RETURN_NOT_OK_PREPEND(LoadFromSuperBlock(superblock),
                        "Failed to load data from superblock protobuf");
  RETURN_NOT_OK(UpdateOnDiskSize());
  state_ = kInitialized;
  return Status::OK();
}

Status TabletMetadata::UpdateOnDiskSize() {
  string path = fs_manager_->GetTabletMetadataPath(tablet_id_);
  uint64_t on_disk_size;
  RETURN_NOT_OK(fs_manager()->env()->GetFileSize(path, &on_disk_size));
  on_disk_size_.store(on_disk_size, memory_order_relaxed);
  return Status::OK();
}

Status TabletMetadata::LoadFromSuperBlock(const TabletSuperBlockPB& superblock) {
  BlockIdContainer orphaned_blocks;

  VLOG(2) << "Loading TabletMetadata from SuperBlockPB:" << std::endl
          << SecureDebugString(superblock);

  {
    std::lock_guard<LockType> l(data_lock_);

    // Verify that the tablet id matches with the one in the protobuf
    if (superblock.tablet_id() != tablet_id_) {
      return Status::Corruption("Expected id=" + tablet_id_ +
                                " found " + superblock.tablet_id(),
                                SecureDebugString(superblock));
    }

    last_durable_mrs_id_ = superblock.last_durable_mrs_id();

    table_name_ = superblock.table_name();

    uint32_t schema_version = superblock.schema_version();
    SchemaPtr schema = std::make_shared<Schema>();
    RETURN_NOT_OK_PREPEND(SchemaFromPB(superblock.schema(), schema.get()),
                          "Failed to parse Schema from superblock " +
                          SecureShortDebugString(superblock));
    {
      SchemaPtr old_schema;
      SwapSchemaUnlocked(schema, schema_version, &old_schema);
    }

    if (!superblock.has_partition()) {
      // KUDU-818: Possible backward compatibility issue with tables created
      // with version <= 0.5, throw warning.
      LOG_WITH_PREFIX(WARNING) << "Upgrading from Kudu 0.5.0 directly to this"
          << " version is not supported. Please upgrade to 0.6.0 before"
          << " moving to a higher version.";
      return Status::NotFound("Missing partition in superblock "+
                              SecureDebugString(superblock));
    }

    // Some metadata fields are assumed to be immutable and thus are
    // only read from the protobuf when the tablet metadata is loaded
    // for the very first time. See KUDU-1500 for more details.
    if (state_ == kNotLoadedYet) {
      table_id_ = superblock.table_id();
      RETURN_NOT_OK(PartitionSchema::FromPB(superblock.partition_schema(),
                                            *schema_, &partition_schema_));
      Partition::FromPB(superblock.partition(), &partition_);
    } else {
      CHECK_EQ(table_id_, superblock.table_id());
      PartitionSchema partition_schema;
      RETURN_NOT_OK(PartitionSchema::FromPB(superblock.partition_schema(),
                                            *schema_, &partition_schema));
      CHECK(partition_schema_ == partition_schema);

      Partition partition;
      Partition::FromPB(superblock.partition(), &partition);
      CHECK(partition_ == partition);
    }

    tablet_data_state_ = superblock.tablet_data_state();

    // This field should be parsed before parsing RowSetDataPB.
    supports_live_row_count_ = superblock.supports_live_row_count();

    rowsets_.clear();
    for (const RowSetDataPB& rowset_pb : superblock.rowsets()) {
      unique_ptr<RowSetMetadata> rowset_meta;
      RETURN_NOT_OK(RowSetMetadata::Load(this, rowset_pb, &rowset_meta));
      next_rowset_idx_ = std::max(next_rowset_idx_, rowset_meta->id() + 1);
      rowsets_.push_back(shared_ptr<RowSetMetadata>(rowset_meta.release()));
    }

    // Determine the largest block ID known to the tablet metadata so we can
    // notify the block manager of blocks it may have missed (e.g. if a data
    // directory failed and the blocks on it were not read).
    BlockId max_block_id = GetMaxLiveBlockId();

    for (const BlockIdPB& block_pb : superblock.orphaned_blocks()) {
      BlockId orphaned_block_id = BlockId::FromPB(block_pb);
      max_block_id = std::max(max_block_id, orphaned_block_id);
      orphaned_blocks.push_back(orphaned_block_id);
    }
    AddOrphanedBlocksUnlocked(orphaned_blocks);

    // Notify the block manager of the highest block ID seen.
    fs_manager()->block_manager()->NotifyBlockId(max_block_id);

    if (superblock.has_data_dir_group()) {
      // An error loading the data dir group is non-fatal, it just means the
      // tablet will fail to bootstrap later.
      WARN_NOT_OK(fs_manager_->dd_manager()->LoadDataDirGroupFromPB(
          tablet_id_, superblock.data_dir_group()),
          "failed to load DataDirGroup from superblock");
    } else if (tablet_data_state_ == TABLET_DATA_READY) {
      // If the superblock does not contain a DataDirGroup, this server has
      // likely been upgraded from before 1.5.0. Create a new DataDirGroup for
      // the tablet. If the data is not TABLET_DATA_READY, group creation is
      // pointless, as the tablet metadata will be deleted anyway.
      //
      // Since we don't know what directories the existing blocks are in, we
      // should assume the data is spread across all disks.
      RETURN_NOT_OK(fs_manager_->dd_manager()->CreateDataDirGroup(tablet_id_,
          fs::DataDirManager::DirDistributionMode::ACROSS_ALL_DIRS));
    }

    // Note: Previous versions of Kudu used MinimumOpId() as a "null" value on
    // disk for the last-logged opid, so we special-case it at load time and
    // consider it equal to "not present".
    if (superblock.has_tombstone_last_logged_opid() &&
        superblock.tombstone_last_logged_opid().IsInitialized() &&
        !OpIdEquals(MinimumOpId(), superblock.tombstone_last_logged_opid())) {
      tombstone_last_logged_opid_ = superblock.tombstone_last_logged_opid();
    } else {
      tombstone_last_logged_opid_.reset();
    }

    if (superblock.has_extra_config()) {
      extra_config_ = superblock.extra_config();
    } else {
      extra_config_.reset();
    }

    if (superblock.has_dimension_label()) {
      dimension_label_ = superblock.dimension_label();
    } else {
      dimension_label_.reset();
    }

    if (superblock.has_table_type() && superblock.table_type() != TableTypePB::DEFAULT_TABLE) {
      table_type_ = superblock.table_type();
    }

    std::unordered_map<int64_t, scoped_refptr<TxnMetadata>> txn_metas;
    for (const auto& txn_id_and_metadata : superblock.txn_metadata()) {
      const auto& txn_meta = txn_id_and_metadata.second;
      EmplaceOrDie(&txn_metas, txn_id_and_metadata.first,
          new TxnMetadata(
              txn_meta.has_aborted() && txn_meta.aborted(),
              txn_meta.has_commit_mvcc_op_timestamp() ?
                  make_optional(Timestamp(txn_meta.commit_mvcc_op_timestamp())) :
                  nullopt,
              txn_meta.has_commit_timestamp() ?
                  make_optional(Timestamp(txn_meta.commit_timestamp())) :
                  nullopt,
              txn_meta.has_flushed_committed_mrs() && txn_meta.flushed_committed_mrs()
          ));
    }
    txn_metadata_by_txn_id_ = std::move(txn_metas);
  }

  // Now is a good time to clean up any orphaned blocks that may have been
  // left behind from a crash just after replacing the superblock.
  if (!fs_manager()->read_only()) {
    DeleteOrphanedBlocks(orphaned_blocks);
  }

  return Status::OK();
}

Status TabletMetadata::UpdateAndFlush(const RowSetMetadataIds& to_remove,
                                      const RowSetMetadataVector& to_add,
                                      int64_t last_durable_mrs_id,
                                      const vector<TxnInfoBeingFlushed>& txns_being_flushed) {
  {
    std::lock_guard<LockType> l(data_lock_);
    RETURN_NOT_OK(UpdateUnlocked(to_remove, to_add, last_durable_mrs_id, txns_being_flushed));
  }
  return Flush();
}

void TabletMetadata::AddOrphanedBlocks(const BlockIdContainer& block_ids) {
  std::lock_guard<LockType> l(data_lock_);
  AddOrphanedBlocksUnlocked(block_ids);
}

void TabletMetadata::AddOrphanedBlocksUnlocked(const BlockIdContainer& block_ids) {
  DCHECK(data_lock_.is_locked());
  orphaned_blocks_.insert(block_ids.begin(), block_ids.end());
}

void TabletMetadata::DeleteOrphanedBlocks(const BlockIdContainer& blocks) {
  if (PREDICT_FALSE(!FLAGS_enable_tablet_orphaned_block_deletion)) {
    LOG_WITH_PREFIX(WARNING) << "Not deleting " << blocks.size()
        << " block(s) from disk. Block deletion disabled via "
        << "--enable_tablet_orphaned_block_deletion=false";
    return;
  }

  BlockManager* bm = fs_manager()->block_manager();
  shared_ptr<BlockDeletionTransaction> transaction = bm->NewDeletionTransaction();
  for (const BlockId& b : blocks) {
    transaction->AddDeletedBlock(b);
  }
  vector<BlockId> deleted;
  WARN_NOT_OK(transaction->CommitDeletedBlocks(&deleted),
              "not all orphaned blocks were deleted");

  // Regardless of whether we deleted all the blocks or not, remove them from
  // the orphaned blocks list. If we failed to delete the blocks due to
  // hardware issues, there's not much we can do and we assume the disk isn't
  // coming back. At worst, this leaves some untracked orphaned blocks.
  {
    std::lock_guard<LockType> l(data_lock_);
    for (const BlockId& b : blocks) {
      orphaned_blocks_.erase(b);
    }
  }
}

void TabletMetadata::PinFlush() {
  std::lock_guard<LockType> l(data_lock_);
  CHECK_GE(num_flush_pins_, 0);
  num_flush_pins_++;
  VLOG(1) << "Number of flush pins: " << num_flush_pins_;
}

Status TabletMetadata::UnPinFlush() {
  std::unique_lock<LockType> l(data_lock_);
  CHECK_GT(num_flush_pins_, 0);
  num_flush_pins_--;
  if (needs_flush_) {
    l.unlock();
    RETURN_NOT_OK(Flush());
  }
  return Status::OK();
}

Status TabletMetadata::Flush() {
  TRACE_EVENT1("tablet", "TabletMetadata::Flush",
               "tablet_id", tablet_id_);

  MutexLock l_flush(flush_lock_);
  BlockIdContainer orphaned;
  TabletSuperBlockPB pb;
  vector<unique_ptr<MinLogIndexAnchorer>> anchors_needing_flush;
  {
    std::lock_guard<LockType> l(data_lock_);
    CHECK_GE(num_flush_pins_, 0);
    if (num_flush_pins_ > 0) {
      needs_flush_ = true;
      LOG(INFO) << "Not flushing: waiting for " << num_flush_pins_ << " pins to be released.";
      return Status::OK();
    }
    needs_flush_ = false;

    RETURN_NOT_OK(ToSuperBlockUnlocked(&pb, rowsets_));

    // Make a copy of the orphaned blocks list which corresponds to the superblock
    // that we're writing. It's important to take this local copy to avoid a race
    // in which another thread may add new orphaned blocks to the 'orphaned_blocks_'
    // set while we're in the process of writing the new superblock to disk. We don't
    // want to accidentally delete those blocks before that next metadata update
    // is persisted. See KUDU-701 for details.
    orphaned.assign(orphaned_blocks_.begin(), orphaned_blocks_.end());
    anchors_needing_flush = std::move(anchors_needing_flush_);
  }
  pre_flush_callback_();
  RETURN_NOT_OK(ReplaceSuperBlockUnlocked(pb));
  TRACE("Metadata flushed");
  l_flush.Unlock();

  // Now that we've flushed, we can unanchor our WALs by destructing our
  // anchors.
  anchors_needing_flush.clear();

  // Now that the superblock is written, try to delete the orphaned blocks.
  //
  // If we crash just before the deletion, we'll retry when reloading from
  // disk; the orphaned blocks were persisted as part of the superblock.
  DeleteOrphanedBlocks(orphaned);

  return Status::OK();
}

Status TabletMetadata::UpdateUnlocked(
    const RowSetMetadataIds& to_remove,
    const RowSetMetadataVector& to_add,
    int64_t last_durable_mrs_id,
    const vector<TxnInfoBeingFlushed>& txns_being_flushed) {
  DCHECK(data_lock_.is_locked());
  CHECK_NE(state_, kNotLoadedYet);
  if (last_durable_mrs_id != kNoMrsFlushed) {
    DCHECK_GE(last_durable_mrs_id, last_durable_mrs_id_);
    last_durable_mrs_id_ = last_durable_mrs_id;
  }
  for (const auto& txn_id : txns_being_flushed) {
    auto txn_meta = FindOrDie(txn_metadata_by_txn_id_, txn_id);
    txn_meta->set_flushed_committed_mrs();
  }

  RowSetMetadataVector new_rowsets = rowsets_;
  auto it = new_rowsets.begin();
  while (it != new_rowsets.end()) {
    if (ContainsKey(to_remove, (*it)->id())) {
      AddOrphanedBlocksUnlocked((*it)->GetAllBlocks());
      it = new_rowsets.erase(it);
    } else {
      it++;
    }
  }

  for (const shared_ptr<RowSetMetadata>& meta : to_add) {
    new_rowsets.push_back(meta);
  }
  rowsets_ = new_rowsets;

  TRACE("TabletMetadata updated");
  return Status::OK();
}

Status TabletMetadata::ReplaceSuperBlock(const TabletSuperBlockPB &pb) {
  {
    MutexLock l(flush_lock_);
    RETURN_NOT_OK_PREPEND(ReplaceSuperBlockUnlocked(pb), "Unable to replace superblock");
    fs_manager_->dd_manager()->DeleteDataDirGroup(tablet_id_);
  }

  RETURN_NOT_OK_PREPEND(LoadFromSuperBlock(pb),
                        "Failed to load data from superblock protobuf");

  return Status::OK();
}

Status TabletMetadata::ReplaceSuperBlockUnlocked(const TabletSuperBlockPB &pb) {
  flush_lock_.AssertAcquired();

  string path = fs_manager_->GetTabletMetadataPath(tablet_id_);
  RETURN_NOT_OK_PREPEND(pb_util::WritePBContainerToPath(
                            fs_manager_->env(), path, pb,
                            pb_util::OVERWRITE, pb_util::SYNC,
                            pb_util::SENSITIVE),
                        Substitute("Failed to write tablet metadata $0", tablet_id_));
  flush_count_for_tests_++;
  RETURN_NOT_OK(UpdateOnDiskSize());

  return Status::OK();
}

void TabletMetadata::SetPreFlushCallback(StatusClosure callback) {
  MutexLock l_flush(flush_lock_);
  pre_flush_callback_ = std::move(callback);
}

std::optional<consensus::OpId> TabletMetadata::tombstone_last_logged_opid() const {
  std::lock_guard<LockType> l(data_lock_);
  return tombstone_last_logged_opid_;
}

Status TabletMetadata::ReadSuperBlockFromDisk(TabletSuperBlockPB* superblock) const {
  string path = fs_manager_->GetTabletMetadataPath(tablet_id_);
  RETURN_NOT_OK_PREPEND(
      pb_util::ReadPBContainerFromPath(fs_manager_->env(), path, superblock, pb_util::SENSITIVE),
      Substitute("Could not load tablet metadata from $0", path));
  return Status::OK();
}

Status TabletMetadata::ToSuperBlock(TabletSuperBlockPB* super_block) const {
  // acquire the lock so that rowsets_ doesn't get changed until we're finished.
  std::lock_guard<LockType> l(data_lock_);
  return ToSuperBlockUnlocked(super_block, rowsets_);
}

Status TabletMetadata::ToSuperBlockUnlocked(TabletSuperBlockPB* super_block,
                                            const RowSetMetadataVector& rowsets) const {
  DCHECK(data_lock_.is_locked());
  // Convert to protobuf
  TabletSuperBlockPB pb;
  pb.set_table_id(table_id_);
  pb.set_tablet_id(tablet_id_);
  partition_.ToPB(pb.mutable_partition());
  pb.set_last_durable_mrs_id(last_durable_mrs_id_);
  pb.set_schema_version(schema_version_);
  RETURN_NOT_OK(partition_schema_.ToPB(*schema_, pb.mutable_partition_schema()));
  pb.set_table_name(table_name_);

  for (const shared_ptr<RowSetMetadata>& meta : rowsets) {
    meta->ToProtobuf(pb.add_rowsets());
  }

  for (const auto& txn_id_and_metadata : txn_metadata_by_txn_id_) {
    const auto& txn_meta = txn_id_and_metadata.second;
    TxnMetadataPB meta_pb = txn_meta->ToPB();
    InsertOrDie(pb.mutable_txn_metadata(), txn_id_and_metadata.first, meta_pb);
  }

  DCHECK(schema_->has_column_ids());
  RETURN_NOT_OK_PREPEND(SchemaToPB(*schema_, pb.mutable_schema()),
                        "Couldn't serialize schema into superblock");

  pb.set_tablet_data_state(tablet_data_state_);
  if (tombstone_last_logged_opid_ &&
      !OpIdEquals(MinimumOpId(), *tombstone_last_logged_opid_)) {
    *pb.mutable_tombstone_last_logged_opid() = *tombstone_last_logged_opid_;
  }

  for (const BlockId& block_id : orphaned_blocks_) {
    block_id.CopyToPB(pb.mutable_orphaned_blocks()->Add());
  }

  // Serialize the tablet's DataDirGroupPB if one exists. One may not exist if
  // this is called during a tablet deletion.
  DataDirGroupPB group_pb;
  if (fs_manager_->dd_manager()->GetDataDirGroupPB(tablet_id_, &group_pb).ok()) {
    pb.mutable_data_dir_group()->Swap(&group_pb);
  }

  pb.set_supports_live_row_count(supports_live_row_count_);

  if (extra_config_) {
    *pb.mutable_extra_config() = *extra_config_;
  }

  if (dimension_label_) {
    pb.set_dimension_label(*dimension_label_);
  }

  if (table_type_) {
    DCHECK_NE(TableTypePB::DEFAULT_TABLE, *table_type_);
    pb.set_table_type(*table_type_);
  }

  super_block->Swap(&pb);
  return Status::OK();
}

Status TabletMetadata::CreateRowSet(shared_ptr<RowSetMetadata>* rowset) {
  AtomicWord rowset_idx = Barrier_AtomicIncrement(&next_rowset_idx_, 1) - 1;
  unique_ptr<RowSetMetadata> scoped_rsm;
  RETURN_NOT_OK(RowSetMetadata::CreateNew(this, rowset_idx, &scoped_rsm));
  rowset->reset(DCHECK_NOTNULL(scoped_rsm.release()));
  return Status::OK();
}

void TabletMetadata::AddTxnMetadata(int64_t txn_id, unique_ptr<MinLogIndexAnchorer> log_anchor) {
  std::lock_guard<LockType> l(data_lock_);
  EmplaceOrDie(&txn_metadata_by_txn_id_, txn_id, new TxnMetadata());
  anchors_needing_flush_.emplace_back(std::move(log_anchor));
}

void TabletMetadata::BeginCommitTransaction(int64_t txn_id, Timestamp mvcc_op_timestamp,
                                            unique_ptr<MinLogIndexAnchorer> log_anchor) {
  std::lock_guard<LockType> l(data_lock_);
  auto txn_metadata = FindOrDie(txn_metadata_by_txn_id_, txn_id);
  // NOTE: we may already have an MVCC op timestamp if we are bootstrapping and
  // the timestamp was persisted already, in which case, we don't need to
  // anchor the WAL to ensure the timestamp's persistence in metadata.
  if (!txn_metadata->commit_mvcc_op_timestamp()) {
    txn_metadata->set_commit_mvcc_op_timestamp(mvcc_op_timestamp);
    anchors_needing_flush_.emplace_back(std::move(log_anchor));
  }
}

void TabletMetadata::AddCommitTimestamp(int64_t txn_id, Timestamp commit_timestamp,
                                        unique_ptr<MinLogIndexAnchorer> log_anchor) {
  std::lock_guard<LockType> l(data_lock_);
  auto txn_metadata = FindOrDie(txn_metadata_by_txn_id_, txn_id);
  txn_metadata->set_commit_timestamp(commit_timestamp);
  anchors_needing_flush_.emplace_back(std::move(log_anchor));
}

void TabletMetadata::AbortTransaction(int64_t txn_id, unique_ptr<MinLogIndexAnchorer> log_anchor) {
  std::lock_guard<LockType> l(data_lock_);
  // NOTE: we can't emplace with a raw pointer here; if the lookup succeeds, we
  // wouldn't use it and we'd have a memory leak, so use scoped_refptr.
  auto txn_metadata = LookupOrEmplace(&txn_metadata_by_txn_id_, txn_id,
                                      scoped_refptr<TxnMetadata>(new TxnMetadata));
  CHECK(txn_metadata);
  txn_metadata->set_aborted();
  anchors_needing_flush_.emplace_back(std::move(log_anchor));
}

bool TabletMetadata::HasTxnMetadata(int64_t txn_id, TxnState* state, Timestamp* timestamp) {
  std::lock_guard<LockType> l(data_lock_);
  auto txn_meta = FindPtrOrNull(txn_metadata_by_txn_id_, txn_id);
  if (txn_meta) {
    if (!state) return true;
    if (txn_meta->commit_timestamp()) {
      *state = kCommitted;
      if (timestamp) {
        DCHECK(txn_meta->commit_timestamp());
        *timestamp = *txn_meta->commit_timestamp();
      }
    } else if (txn_meta->aborted()) {
      *state = kAborted;
    } else if (txn_meta->commit_mvcc_op_timestamp()) {
      *state = kCommitInProgress;
      if (timestamp) {
        DCHECK(txn_meta->commit_mvcc_op_timestamp());
        *timestamp = *txn_meta->commit_mvcc_op_timestamp();
      }
    } else {
      *state = kOpen;
    }
    return true;
  }
  return false;
}

void TabletMetadata::GetTxnIds(unordered_set<int64_t>* in_flight_txn_ids,
                               unordered_set<int64_t>* terminal_txn_ids,
                               unordered_set<int64_t>* txn_ids_with_mrs) {
  std::unordered_set<int64_t> in_flights;
  std::unordered_set<int64_t> terminals;
  std::unordered_set<int64_t> needs_mrs;
  std::lock_guard<LockType> l(data_lock_);
  for (const auto& txn_id_and_metadata : txn_metadata_by_txn_id_) {
    const auto& txn_id = txn_id_and_metadata.first;
    const auto& txn_meta = txn_id_and_metadata.second;
    if (txn_meta->commit_timestamp() || txn_meta->aborted()) {
      if (terminal_txn_ids) {
        EmplaceOrDie(&terminals, txn_id);
      }
    } else {
      EmplaceOrDie(&in_flights, txn_id);
    }
    // If we have not flushed the MRS after committing, the bootstrap process
    // will need to create an MRS for it, even if the transaction is committed.
    if (txn_ids_with_mrs &&
        !txn_meta->flushed_committed_mrs() &&
        !txn_meta->aborted()) {
      EmplaceOrDie(&needs_mrs, txn_id);
    }
  }
  *in_flight_txn_ids = std::move(in_flights);
  if (terminal_txn_ids) {
    *terminal_txn_ids = std::move(terminals);
  }
  if (txn_ids_with_mrs) {
    *txn_ids_with_mrs = std::move(needs_mrs);
  }
}

unordered_map<int64_t, scoped_refptr<TxnMetadata>> TabletMetadata::GetTxnMetadata() const {
  std::lock_guard<LockType> l(data_lock_);
  return txn_metadata_by_txn_id_;
}

bool TabletMetadata::GetTxnMetadataPB(int64_t txn_id, TxnMetadataPB* pb) const {
  std::lock_guard<LockType> l(data_lock_);
  auto txn_meta = FindPtrOrNull(txn_metadata_by_txn_id_, txn_id);
  if (!txn_meta) {
    return false;
  }
  *pb = txn_meta->ToPB();
  return true;
}

const RowSetMetadata *TabletMetadata::GetRowSetForTests(int64_t id) const {
  for (const shared_ptr<RowSetMetadata>& rowset_meta : rowsets_) {
    if (rowset_meta->id() == id) {
      return rowset_meta.get();
    }
  }
  return nullptr;
}

RowSetMetadata *TabletMetadata::GetRowSetForTests(int64_t id) {
  std::lock_guard<LockType> l(data_lock_);
  for (const shared_ptr<RowSetMetadata>& rowset_meta : rowsets_) {
    if (rowset_meta->id() == id) {
      return rowset_meta.get();
    }
  }
  return nullptr;
}

void TabletMetadata::SetSchema(const SchemaPtr& schema, uint32_t version) {
  // In case this is the last reference to the schema, destruct the pointer
  // outside the lock.
  SchemaPtr old_schema;
  {
    std::lock_guard<LockType> l(data_lock_);
    SwapSchemaUnlocked(schema, version, &old_schema);
  }
}

void TabletMetadata::SwapSchemaUnlocked(SchemaPtr schema, uint32_t version,
                                        SchemaPtr* old_schema) {
  DCHECK(schema->has_column_ids());
  *old_schema = std::move(schema_);
  schema_ = std::move(schema);
  schema_version_ = version;
}

void TabletMetadata::SetTableName(const string& table_name) {
  std::lock_guard<LockType> l(data_lock_);
  table_name_ = table_name;
}

string TabletMetadata::table_name() const {
  std::lock_guard<LockType> l(data_lock_);
  DCHECK_NE(state_, kNotLoadedYet);
  return table_name_;
}

uint32_t TabletMetadata::schema_version() const {
  std::lock_guard<LockType> l(data_lock_);
  DCHECK_NE(state_, kNotLoadedYet);
  return schema_version_;
}

void TabletMetadata::set_tablet_data_state(TabletDataState state) {
  std::lock_guard<LockType> l(data_lock_);
  if (state == TABLET_DATA_READY) {
    tombstone_last_logged_opid_.reset();
  }
  tablet_data_state_ = state;
}

TabletDataState TabletMetadata::tablet_data_state() const {
  std::lock_guard<LockType> l(data_lock_);
  return tablet_data_state_;
}

void TabletMetadata::SetExtraConfig(TableExtraConfigPB extra_config) {
  std::lock_guard<LockType> l(data_lock_);
  extra_config_ = std::move(extra_config);
}

optional<TableExtraConfigPB> TabletMetadata::extra_config() const {
  std::lock_guard<LockType> l(data_lock_);
  return extra_config_;
}

optional<string> TabletMetadata::dimension_label() const {
  std::lock_guard<LockType> l(data_lock_);
  return dimension_label_;
}

const optional<TableTypePB>& TabletMetadata::table_type() const {
  std::lock_guard<LockType> l(data_lock_);
  return table_type_;
}

} // namespace tablet
} // namespace kudu
