| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/tablet/tablet_metadata.h" |
| |
| #include <algorithm> |
| #include <functional> |
| #include <mutex> |
| #include <optional> |
| #include <ostream> |
| #include <string> |
| #include <type_traits> |
| #include <unordered_map> |
| #include <utility> |
| |
| #include <gflags/gflags.h> |
| |
| #include "kudu/common/common.pb.h" |
| #include "kudu/common/schema.h" |
| #include "kudu/common/timestamp.h" |
| #include "kudu/common/wire_protocol.h" |
| #include "kudu/consensus/log_anchor_registry.h" |
| #include "kudu/consensus/opid.pb.h" |
| #include "kudu/consensus/opid_util.h" |
| #include "kudu/fs/block_id.h" |
| #include "kudu/fs/block_manager.h" |
| #include "kudu/fs/data_dirs.h" |
| #include "kudu/fs/fs.pb.h" |
| #include "kudu/fs/fs_manager.h" |
| #include "kudu/gutil/atomicops.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/port.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/tablet/rowset_metadata.h" |
| #include "kudu/tablet/txn_metadata.h" |
| #include "kudu/tablet/txn_participant.h" |
| #include "kudu/util/debug/trace_event.h" |
| #include "kudu/util/env.h" |
| #include "kudu/util/fault_injection.h" |
| #include "kudu/util/flag_tags.h" |
| #include "kudu/util/logging.h" |
| #include "kudu/util/pb_util.h" |
| #include "kudu/util/scoped_cleanup.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/trace.h" |
| |
| DEFINE_bool(enable_tablet_orphaned_block_deletion, true, |
| "Whether to enable deletion of orphaned blocks from disk. " |
| "Note: This is only exposed for debugging purposes!"); |
| TAG_FLAG(enable_tablet_orphaned_block_deletion, advanced); |
| TAG_FLAG(enable_tablet_orphaned_block_deletion, hidden); |
| TAG_FLAG(enable_tablet_orphaned_block_deletion, runtime); |
| |
| DEFINE_int32(tablet_metadata_load_inject_latency_ms, 0, |
| "Amount of latency in ms to inject when load tablet metadata file. " |
| "Only for testing."); |
| TAG_FLAG(tablet_metadata_load_inject_latency_ms, hidden); |
| |
| using base::subtle::Barrier_AtomicIncrement; |
| using kudu::consensus::MinimumOpId; |
| using kudu::consensus::OpId; |
| using kudu::fs::BlockDeletionTransaction; |
| using kudu::fs::BlockManager; |
| using kudu::log::MinLogIndexAnchorer; |
| using kudu::pb_util::SecureDebugString; |
| using kudu::pb_util::SecureShortDebugString; |
| using std::memory_order_relaxed; |
| using std::make_optional; |
| using std::nullopt; |
| using std::optional; |
| using std::shared_ptr; |
| using std::string; |
| using std::unique_ptr; |
| using std::unordered_map; |
| using std::unordered_set; |
| using std::vector; |
| using strings::Substitute; |
| |
| namespace kudu { |
| namespace tablet { |
| |
| // ============================================================================ |
| // Tablet Metadata |
| // ============================================================================ |
| |
| Status TabletMetadata::CreateNew(FsManager* fs_manager, |
| const string& tablet_id, |
| const string& table_name, |
| const string& table_id, |
| const Schema& schema, |
| const PartitionSchema& partition_schema, |
| const Partition& partition, |
| const TabletDataState& initial_tablet_data_state, |
| optional<OpId> tombstone_last_logged_opid, |
| bool supports_live_row_count, |
| optional<TableExtraConfigPB> extra_config, |
| optional<string> dimension_label, |
| optional<TableTypePB> table_type, |
| scoped_refptr<TabletMetadata>* metadata) { |
| |
| // Verify that no existing tablet exists with the same ID. |
| if (fs_manager->env()->FileExists(fs_manager->GetTabletMetadataPath(tablet_id))) { |
| return Status::AlreadyPresent("Tablet already exists", tablet_id); |
| } |
| |
| RETURN_NOT_OK_PREPEND(fs_manager->dd_manager()->CreateDataDirGroup(tablet_id), |
| "Failed to create TabletMetadata"); |
| auto dir_group_cleanup = MakeScopedCleanup([&]() { |
| fs_manager->dd_manager()->DeleteDataDirGroup(tablet_id); |
| }); |
| scoped_refptr<TabletMetadata> ret(new TabletMetadata( |
| fs_manager, |
| tablet_id, |
| table_name, |
| table_id, |
| schema, |
| partition_schema, |
| partition, |
| initial_tablet_data_state, |
| std::move(tombstone_last_logged_opid), |
| supports_live_row_count, |
| std::move(extra_config), |
| std::move(dimension_label), |
| !table_type || |
| *table_type == TableTypePB::DEFAULT_TABLE ? nullopt |
| : std::move(table_type))); |
| RETURN_NOT_OK(ret->Flush()); |
| dir_group_cleanup.cancel(); |
| |
| metadata->swap(ret); |
| return Status::OK(); |
| } |
| |
| Status TabletMetadata::Load(FsManager* fs_manager, |
| const string& tablet_id, |
| scoped_refptr<TabletMetadata>* metadata) { |
| MAYBE_INJECT_FIXED_LATENCY(FLAGS_tablet_metadata_load_inject_latency_ms); |
| scoped_refptr<TabletMetadata> ret(new TabletMetadata(fs_manager, tablet_id)); |
| RETURN_NOT_OK(ret->LoadFromDisk()); |
| metadata->swap(ret); |
| return Status::OK(); |
| } |
| |
| Status TabletMetadata::LoadOrCreate(FsManager* fs_manager, |
| const string& tablet_id, |
| const string& table_name, |
| const string& table_id, |
| const Schema& schema, |
| const PartitionSchema& partition_schema, |
| const Partition& partition, |
| const TabletDataState& initial_tablet_data_state, |
| optional<OpId> tombstone_last_logged_opid, |
| optional<TableExtraConfigPB> extra_config, |
| optional<string> dimension_label, |
| optional<TableTypePB> table_type, |
| scoped_refptr<TabletMetadata>* metadata) { |
| Status s = Load(fs_manager, tablet_id, metadata); |
| if (s.ok()) { |
| const SchemaPtr schema_ptr = (*metadata)->schema(); |
| if (*schema_ptr != schema) { |
| return Status::Corruption(Substitute("Schema on disk ($0) does not " |
| "match expected schema ($1)", schema_ptr->ToString(), |
| schema.ToString())); |
| } |
| return Status::OK(); |
| } |
| if (s.IsNotFound()) { |
| return CreateNew(fs_manager, tablet_id, table_name, table_id, schema, |
| partition_schema, partition, initial_tablet_data_state, |
| std::move(tombstone_last_logged_opid), |
| /*supports_live_row_count=*/ true, |
| std::move(extra_config), |
| std::move(dimension_label), |
| std::move(table_type), |
| metadata); |
| } |
| return s; |
| } |
| |
| vector<BlockIdPB> TabletMetadata::CollectBlockIdPBs(const TabletSuperBlockPB& superblock) { |
| vector<BlockIdPB> block_ids; |
| for (const RowSetDataPB& rowset : superblock.rowsets()) { |
| for (const ColumnDataPB& column : rowset.columns()) { |
| block_ids.push_back(column.block()); |
| } |
| for (const DeltaDataPB& redo : rowset.redo_deltas()) { |
| block_ids.push_back(redo.block()); |
| } |
| for (const DeltaDataPB& undo : rowset.undo_deltas()) { |
| block_ids.push_back(undo.block()); |
| } |
| if (rowset.has_bloom_block()) { |
| block_ids.push_back(rowset.bloom_block()); |
| } |
| if (rowset.has_adhoc_index_block()) { |
| block_ids.push_back(rowset.adhoc_index_block()); |
| } |
| } |
| return block_ids; |
| } |
| |
| BlockIdContainer TabletMetadata::CollectBlockIds() const { |
| BlockIdContainer block_ids; |
| for (const auto& r : rowsets_) { |
| BlockIdContainer rowset_block_ids = r->GetAllBlocks(); |
| block_ids.insert(block_ids.end(), |
| rowset_block_ids.begin(), |
| rowset_block_ids.end()); |
| } |
| return block_ids; |
| } |
| |
| BlockId TabletMetadata::GetMaxLiveBlockId() const { |
| BlockId max_block_id; |
| for (const auto& r : rowsets_) { |
| max_block_id = std::max(max_block_id, r->GetMaxLiveBlockId()); |
| } |
| return max_block_id; |
| } |
| |
| Status TabletMetadata::DeleteTabletData(TabletDataState delete_type, |
| const optional<OpId>& last_logged_opid) { |
| DCHECK(!last_logged_opid || last_logged_opid->IsInitialized()); |
| CHECK(delete_type == TABLET_DATA_DELETED || |
| delete_type == TABLET_DATA_TOMBSTONED || |
| delete_type == TABLET_DATA_COPYING) |
| << "DeleteTabletData() called with unsupported delete_type on tablet " |
| << tablet_id_ << ": " << TabletDataState_Name(delete_type) |
| << " (" << delete_type << ")"; |
| |
| // First add all of our blocks to the orphan list |
| // and clear our rowsets. This serves to erase all the data. |
| // |
| // We also set the state in our persisted metadata to indicate that |
| // we have been deleted. |
| { |
| std::lock_guard<LockType> l(data_lock_); |
| for (const shared_ptr<RowSetMetadata>& rsmd : rowsets_) { |
| AddOrphanedBlocksUnlocked(rsmd->GetAllBlocks()); |
| } |
| rowsets_.clear(); |
| tablet_data_state_ = delete_type; |
| if (last_logged_opid) { |
| tombstone_last_logged_opid_ = last_logged_opid; |
| } |
| } |
| |
| // Unregister the tablet's data dir group in memory (it is stored on disk in |
| // the superblock). Even if we fail to flush below, the expectation is that |
| // we will no longer be writing to the tablet, and therefore don't need its |
| // data dir group. |
| fs_manager_->dd_manager()->DeleteDataDirGroup(tablet_id_); |
| |
| // Flushing will sync the new tablet_data_state_ to disk and will now also |
| // delete all the data. |
| RETURN_NOT_OK(Flush()); |
| |
| // Re-sync to disk one more time. |
| // This call will typically re-sync with an empty orphaned blocks list |
| // (unless deleting any orphans failed during the last Flush()), so that we |
| // don't try to re-delete the deleted orphaned blocks on every startup. |
| return Flush(); |
| } |
| |
| bool TabletMetadata::IsTombstonedWithNoBlocks() const { |
| std::lock_guard<LockType> l(data_lock_); |
| return tablet_data_state_ == TABLET_DATA_TOMBSTONED && |
| rowsets_.empty() && |
| orphaned_blocks_.empty(); |
| } |
| |
| Status TabletMetadata::DeleteSuperBlock() { |
| std::lock_guard<LockType> l(data_lock_); |
| if (!orphaned_blocks_.empty()) { |
| return Status::InvalidArgument("The metadata for tablet " + tablet_id_ + |
| " still references orphaned blocks. " |
| "Call DeleteTabletData() first"); |
| } |
| if (tablet_data_state_ != TABLET_DATA_DELETED) { |
| return Status::IllegalState( |
| Substitute("Tablet $0 is not in TABLET_DATA_DELETED state. " |
| "Call DeleteTabletData(TABLET_DATA_DELETED) first. " |
| "Tablet data state: $1 ($2)", |
| tablet_id_, |
| TabletDataState_Name(tablet_data_state_), |
| tablet_data_state_)); |
| } |
| |
| string path = fs_manager_->GetTabletMetadataPath(tablet_id_); |
| RETURN_NOT_OK_PREPEND(fs_manager_->env()->DeleteFile(path), |
| "Unable to delete superblock for tablet " + tablet_id_); |
| return Status::OK(); |
| } |
| |
| TabletMetadata::TabletMetadata(FsManager* fs_manager, string tablet_id, |
| string table_name, string table_id, |
| const Schema& schema, PartitionSchema partition_schema, |
| Partition partition, |
| const TabletDataState& tablet_data_state, |
| optional<OpId> tombstone_last_logged_opid, |
| bool supports_live_row_count, |
| optional<TableExtraConfigPB> extra_config, |
| optional<string> dimension_label, |
| optional<TableTypePB> table_type) |
| : state_(kNotWrittenYet), |
| tablet_id_(std::move(tablet_id)), |
| table_id_(std::move(table_id)), |
| partition_(std::move(partition)), |
| fs_manager_(fs_manager), |
| log_prefix_(Substitute("T $0 P $1: ", tablet_id_, fs_manager_->uuid())), |
| next_rowset_idx_(0), |
| last_durable_mrs_id_(kNoDurableMemStore), |
| schema_(std::make_shared<Schema>(schema)), |
| schema_version_(0), |
| table_name_(std::move(table_name)), |
| partition_schema_(std::move(partition_schema)), |
| tablet_data_state_(tablet_data_state), |
| tombstone_last_logged_opid_(std::move(tombstone_last_logged_opid)), |
| extra_config_(std::move(extra_config)), |
| dimension_label_(std::move(dimension_label)), |
| table_type_(std::move(table_type)), |
| num_flush_pins_(0), |
| needs_flush_(false), |
| flush_count_for_tests_(0), |
| pre_flush_callback_(&DoNothingStatusClosure), |
| supports_live_row_count_(supports_live_row_count) { |
| CHECK(schema_->has_column_ids()); |
| CHECK_GT(schema_->num_key_columns(), 0); |
| } |
| |
| TabletMetadata::~TabletMetadata() { |
| } |
| |
| TabletMetadata::TabletMetadata(FsManager* fs_manager, string tablet_id) |
| : state_(kNotLoadedYet), |
| tablet_id_(std::move(tablet_id)), |
| fs_manager_(fs_manager), |
| next_rowset_idx_(0), |
| num_flush_pins_(0), |
| needs_flush_(false), |
| flush_count_for_tests_(0), |
| pre_flush_callback_(&DoNothingStatusClosure), |
| supports_live_row_count_(false) {} |
| |
| Status TabletMetadata::LoadFromDisk() { |
| TRACE_EVENT1("tablet", "TabletMetadata::LoadFromDisk", |
| "tablet_id", tablet_id_); |
| |
| CHECK_EQ(state_, kNotLoadedYet); |
| |
| TabletSuperBlockPB superblock; |
| RETURN_NOT_OK(ReadSuperBlockFromDisk(&superblock)); |
| RETURN_NOT_OK_PREPEND(LoadFromSuperBlock(superblock), |
| "Failed to load data from superblock protobuf"); |
| RETURN_NOT_OK(UpdateOnDiskSize()); |
| state_ = kInitialized; |
| return Status::OK(); |
| } |
| |
| Status TabletMetadata::UpdateOnDiskSize() { |
| string path = fs_manager_->GetTabletMetadataPath(tablet_id_); |
| uint64_t on_disk_size; |
| RETURN_NOT_OK(fs_manager()->env()->GetFileSize(path, &on_disk_size)); |
| on_disk_size_.store(on_disk_size, memory_order_relaxed); |
| return Status::OK(); |
| } |
| |
| Status TabletMetadata::LoadFromSuperBlock(const TabletSuperBlockPB& superblock) { |
| BlockIdContainer orphaned_blocks; |
| |
| VLOG(2) << "Loading TabletMetadata from SuperBlockPB:" << std::endl |
| << SecureDebugString(superblock); |
| |
| { |
| std::lock_guard<LockType> l(data_lock_); |
| |
| // Verify that the tablet id matches with the one in the protobuf |
| if (superblock.tablet_id() != tablet_id_) { |
| return Status::Corruption("Expected id=" + tablet_id_ + |
| " found " + superblock.tablet_id(), |
| SecureDebugString(superblock)); |
| } |
| |
| last_durable_mrs_id_ = superblock.last_durable_mrs_id(); |
| |
| table_name_ = superblock.table_name(); |
| |
| uint32_t schema_version = superblock.schema_version(); |
| SchemaPtr schema = std::make_shared<Schema>(); |
| RETURN_NOT_OK_PREPEND(SchemaFromPB(superblock.schema(), schema.get()), |
| "Failed to parse Schema from superblock " + |
| SecureShortDebugString(superblock)); |
| { |
| SchemaPtr old_schema; |
| SwapSchemaUnlocked(schema, schema_version, &old_schema); |
| } |
| |
| if (!superblock.has_partition()) { |
| // KUDU-818: Possible backward compatibility issue with tables created |
| // with version <= 0.5, throw warning. |
| LOG_WITH_PREFIX(WARNING) << "Upgrading from Kudu 0.5.0 directly to this" |
| << " version is not supported. Please upgrade to 0.6.0 before" |
| << " moving to a higher version."; |
| return Status::NotFound("Missing partition in superblock "+ |
| SecureDebugString(superblock)); |
| } |
| |
| // Some metadata fields are assumed to be immutable and thus are |
| // only read from the protobuf when the tablet metadata is loaded |
| // for the very first time. See KUDU-1500 for more details. |
| if (state_ == kNotLoadedYet) { |
| table_id_ = superblock.table_id(); |
| RETURN_NOT_OK(PartitionSchema::FromPB(superblock.partition_schema(), |
| *schema_, &partition_schema_)); |
| Partition::FromPB(superblock.partition(), &partition_); |
| } else { |
| CHECK_EQ(table_id_, superblock.table_id()); |
| PartitionSchema partition_schema; |
| RETURN_NOT_OK(PartitionSchema::FromPB(superblock.partition_schema(), |
| *schema_, &partition_schema)); |
| CHECK(partition_schema_ == partition_schema); |
| |
| Partition partition; |
| Partition::FromPB(superblock.partition(), &partition); |
| CHECK(partition_ == partition); |
| } |
| |
| tablet_data_state_ = superblock.tablet_data_state(); |
| |
| // This field should be parsed before parsing RowSetDataPB. |
| supports_live_row_count_ = superblock.supports_live_row_count(); |
| |
| rowsets_.clear(); |
| for (const RowSetDataPB& rowset_pb : superblock.rowsets()) { |
| unique_ptr<RowSetMetadata> rowset_meta; |
| RETURN_NOT_OK(RowSetMetadata::Load(this, rowset_pb, &rowset_meta)); |
| next_rowset_idx_ = std::max(next_rowset_idx_, rowset_meta->id() + 1); |
| rowsets_.push_back(shared_ptr<RowSetMetadata>(rowset_meta.release())); |
| } |
| |
| // Determine the largest block ID known to the tablet metadata so we can |
| // notify the block manager of blocks it may have missed (e.g. if a data |
| // directory failed and the blocks on it were not read). |
| BlockId max_block_id = GetMaxLiveBlockId(); |
| |
| for (const BlockIdPB& block_pb : superblock.orphaned_blocks()) { |
| BlockId orphaned_block_id = BlockId::FromPB(block_pb); |
| max_block_id = std::max(max_block_id, orphaned_block_id); |
| orphaned_blocks.push_back(orphaned_block_id); |
| } |
| AddOrphanedBlocksUnlocked(orphaned_blocks); |
| |
| // Notify the block manager of the highest block ID seen. |
| fs_manager()->block_manager()->NotifyBlockId(max_block_id); |
| |
| if (superblock.has_data_dir_group()) { |
| // An error loading the data dir group is non-fatal, it just means the |
| // tablet will fail to bootstrap later. |
| WARN_NOT_OK(fs_manager_->dd_manager()->LoadDataDirGroupFromPB( |
| tablet_id_, superblock.data_dir_group()), |
| "failed to load DataDirGroup from superblock"); |
| } else if (tablet_data_state_ == TABLET_DATA_READY) { |
| // If the superblock does not contain a DataDirGroup, this server has |
| // likely been upgraded from before 1.5.0. Create a new DataDirGroup for |
| // the tablet. If the data is not TABLET_DATA_READY, group creation is |
| // pointless, as the tablet metadata will be deleted anyway. |
| // |
| // Since we don't know what directories the existing blocks are in, we |
| // should assume the data is spread across all disks. |
| RETURN_NOT_OK(fs_manager_->dd_manager()->CreateDataDirGroup(tablet_id_, |
| fs::DataDirManager::DirDistributionMode::ACROSS_ALL_DIRS)); |
| } |
| |
| // Note: Previous versions of Kudu used MinimumOpId() as a "null" value on |
| // disk for the last-logged opid, so we special-case it at load time and |
| // consider it equal to "not present". |
| if (superblock.has_tombstone_last_logged_opid() && |
| superblock.tombstone_last_logged_opid().IsInitialized() && |
| !OpIdEquals(MinimumOpId(), superblock.tombstone_last_logged_opid())) { |
| tombstone_last_logged_opid_ = superblock.tombstone_last_logged_opid(); |
| } else { |
| tombstone_last_logged_opid_.reset(); |
| } |
| |
| if (superblock.has_extra_config()) { |
| extra_config_ = superblock.extra_config(); |
| } else { |
| extra_config_.reset(); |
| } |
| |
| if (superblock.has_dimension_label()) { |
| dimension_label_ = superblock.dimension_label(); |
| } else { |
| dimension_label_.reset(); |
| } |
| |
| if (superblock.has_table_type() && superblock.table_type() != TableTypePB::DEFAULT_TABLE) { |
| table_type_ = superblock.table_type(); |
| } |
| |
| std::unordered_map<int64_t, scoped_refptr<TxnMetadata>> txn_metas; |
| for (const auto& txn_id_and_metadata : superblock.txn_metadata()) { |
| const auto& txn_meta = txn_id_and_metadata.second; |
| EmplaceOrDie(&txn_metas, txn_id_and_metadata.first, |
| new TxnMetadata( |
| txn_meta.has_aborted() && txn_meta.aborted(), |
| txn_meta.has_commit_mvcc_op_timestamp() ? |
| make_optional(Timestamp(txn_meta.commit_mvcc_op_timestamp())) : |
| nullopt, |
| txn_meta.has_commit_timestamp() ? |
| make_optional(Timestamp(txn_meta.commit_timestamp())) : |
| nullopt, |
| txn_meta.has_flushed_committed_mrs() && txn_meta.flushed_committed_mrs() |
| )); |
| } |
| txn_metadata_by_txn_id_ = std::move(txn_metas); |
| } |
| |
| // Now is a good time to clean up any orphaned blocks that may have been |
| // left behind from a crash just after replacing the superblock. |
| if (!fs_manager()->read_only()) { |
| DeleteOrphanedBlocks(orphaned_blocks); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status TabletMetadata::UpdateAndFlush(const RowSetMetadataIds& to_remove, |
| const RowSetMetadataVector& to_add, |
| int64_t last_durable_mrs_id, |
| const vector<TxnInfoBeingFlushed>& txns_being_flushed) { |
| { |
| std::lock_guard<LockType> l(data_lock_); |
| RETURN_NOT_OK(UpdateUnlocked(to_remove, to_add, last_durable_mrs_id, txns_being_flushed)); |
| } |
| return Flush(); |
| } |
| |
| void TabletMetadata::AddOrphanedBlocks(const BlockIdContainer& block_ids) { |
| std::lock_guard<LockType> l(data_lock_); |
| AddOrphanedBlocksUnlocked(block_ids); |
| } |
| |
| void TabletMetadata::AddOrphanedBlocksUnlocked(const BlockIdContainer& block_ids) { |
| DCHECK(data_lock_.is_locked()); |
| orphaned_blocks_.insert(block_ids.begin(), block_ids.end()); |
| } |
| |
| void TabletMetadata::DeleteOrphanedBlocks(const BlockIdContainer& blocks) { |
| if (PREDICT_FALSE(!FLAGS_enable_tablet_orphaned_block_deletion)) { |
| LOG_WITH_PREFIX(WARNING) << "Not deleting " << blocks.size() |
| << " block(s) from disk. Block deletion disabled via " |
| << "--enable_tablet_orphaned_block_deletion=false"; |
| return; |
| } |
| |
| BlockManager* bm = fs_manager()->block_manager(); |
| shared_ptr<BlockDeletionTransaction> transaction = bm->NewDeletionTransaction(); |
| for (const BlockId& b : blocks) { |
| transaction->AddDeletedBlock(b); |
| } |
| vector<BlockId> deleted; |
| WARN_NOT_OK(transaction->CommitDeletedBlocks(&deleted), |
| "not all orphaned blocks were deleted"); |
| |
| // Regardless of whether we deleted all the blocks or not, remove them from |
| // the orphaned blocks list. If we failed to delete the blocks due to |
| // hardware issues, there's not much we can do and we assume the disk isn't |
| // coming back. At worst, this leaves some untracked orphaned blocks. |
| { |
| std::lock_guard<LockType> l(data_lock_); |
| for (const BlockId& b : blocks) { |
| orphaned_blocks_.erase(b); |
| } |
| } |
| } |
| |
| void TabletMetadata::PinFlush() { |
| std::lock_guard<LockType> l(data_lock_); |
| CHECK_GE(num_flush_pins_, 0); |
| num_flush_pins_++; |
| VLOG(1) << "Number of flush pins: " << num_flush_pins_; |
| } |
| |
| Status TabletMetadata::UnPinFlush() { |
| std::unique_lock<LockType> l(data_lock_); |
| CHECK_GT(num_flush_pins_, 0); |
| num_flush_pins_--; |
| if (needs_flush_) { |
| l.unlock(); |
| RETURN_NOT_OK(Flush()); |
| } |
| return Status::OK(); |
| } |
| |
| Status TabletMetadata::Flush() { |
| TRACE_EVENT1("tablet", "TabletMetadata::Flush", |
| "tablet_id", tablet_id_); |
| |
| MutexLock l_flush(flush_lock_); |
| BlockIdContainer orphaned; |
| TabletSuperBlockPB pb; |
| vector<unique_ptr<MinLogIndexAnchorer>> anchors_needing_flush; |
| { |
| std::lock_guard<LockType> l(data_lock_); |
| CHECK_GE(num_flush_pins_, 0); |
| if (num_flush_pins_ > 0) { |
| needs_flush_ = true; |
| LOG(INFO) << "Not flushing: waiting for " << num_flush_pins_ << " pins to be released."; |
| return Status::OK(); |
| } |
| needs_flush_ = false; |
| |
| RETURN_NOT_OK(ToSuperBlockUnlocked(&pb, rowsets_)); |
| |
| // Make a copy of the orphaned blocks list which corresponds to the superblock |
| // that we're writing. It's important to take this local copy to avoid a race |
| // in which another thread may add new orphaned blocks to the 'orphaned_blocks_' |
| // set while we're in the process of writing the new superblock to disk. We don't |
| // want to accidentally delete those blocks before that next metadata update |
| // is persisted. See KUDU-701 for details. |
| orphaned.assign(orphaned_blocks_.begin(), orphaned_blocks_.end()); |
| anchors_needing_flush = std::move(anchors_needing_flush_); |
| } |
| pre_flush_callback_(); |
| RETURN_NOT_OK(ReplaceSuperBlockUnlocked(pb)); |
| TRACE("Metadata flushed"); |
| l_flush.Unlock(); |
| |
| // Now that we've flushed, we can unanchor our WALs by destructing our |
| // anchors. |
| anchors_needing_flush.clear(); |
| |
| // Now that the superblock is written, try to delete the orphaned blocks. |
| // |
| // If we crash just before the deletion, we'll retry when reloading from |
| // disk; the orphaned blocks were persisted as part of the superblock. |
| DeleteOrphanedBlocks(orphaned); |
| |
| return Status::OK(); |
| } |
| |
| Status TabletMetadata::UpdateUnlocked( |
| const RowSetMetadataIds& to_remove, |
| const RowSetMetadataVector& to_add, |
| int64_t last_durable_mrs_id, |
| const vector<TxnInfoBeingFlushed>& txns_being_flushed) { |
| DCHECK(data_lock_.is_locked()); |
| CHECK_NE(state_, kNotLoadedYet); |
| if (last_durable_mrs_id != kNoMrsFlushed) { |
| DCHECK_GE(last_durable_mrs_id, last_durable_mrs_id_); |
| last_durable_mrs_id_ = last_durable_mrs_id; |
| } |
| for (const auto& txn_id : txns_being_flushed) { |
| auto txn_meta = FindOrDie(txn_metadata_by_txn_id_, txn_id); |
| txn_meta->set_flushed_committed_mrs(); |
| } |
| |
| RowSetMetadataVector new_rowsets = rowsets_; |
| auto it = new_rowsets.begin(); |
| while (it != new_rowsets.end()) { |
| if (ContainsKey(to_remove, (*it)->id())) { |
| AddOrphanedBlocksUnlocked((*it)->GetAllBlocks()); |
| it = new_rowsets.erase(it); |
| } else { |
| it++; |
| } |
| } |
| |
| for (const shared_ptr<RowSetMetadata>& meta : to_add) { |
| new_rowsets.push_back(meta); |
| } |
| rowsets_ = new_rowsets; |
| |
| TRACE("TabletMetadata updated"); |
| return Status::OK(); |
| } |
| |
| Status TabletMetadata::ReplaceSuperBlock(const TabletSuperBlockPB &pb) { |
| { |
| MutexLock l(flush_lock_); |
| RETURN_NOT_OK_PREPEND(ReplaceSuperBlockUnlocked(pb), "Unable to replace superblock"); |
| fs_manager_->dd_manager()->DeleteDataDirGroup(tablet_id_); |
| } |
| |
| RETURN_NOT_OK_PREPEND(LoadFromSuperBlock(pb), |
| "Failed to load data from superblock protobuf"); |
| |
| return Status::OK(); |
| } |
| |
| Status TabletMetadata::ReplaceSuperBlockUnlocked(const TabletSuperBlockPB &pb) { |
| flush_lock_.AssertAcquired(); |
| |
| string path = fs_manager_->GetTabletMetadataPath(tablet_id_); |
| RETURN_NOT_OK_PREPEND(pb_util::WritePBContainerToPath( |
| fs_manager_->env(), path, pb, |
| pb_util::OVERWRITE, pb_util::SYNC, |
| pb_util::SENSITIVE), |
| Substitute("Failed to write tablet metadata $0", tablet_id_)); |
| flush_count_for_tests_++; |
| RETURN_NOT_OK(UpdateOnDiskSize()); |
| |
| return Status::OK(); |
| } |
| |
| void TabletMetadata::SetPreFlushCallback(StatusClosure callback) { |
| MutexLock l_flush(flush_lock_); |
| pre_flush_callback_ = std::move(callback); |
| } |
| |
| std::optional<consensus::OpId> TabletMetadata::tombstone_last_logged_opid() const { |
| std::lock_guard<LockType> l(data_lock_); |
| return tombstone_last_logged_opid_; |
| } |
| |
| Status TabletMetadata::ReadSuperBlockFromDisk(TabletSuperBlockPB* superblock) const { |
| string path = fs_manager_->GetTabletMetadataPath(tablet_id_); |
| RETURN_NOT_OK_PREPEND( |
| pb_util::ReadPBContainerFromPath(fs_manager_->env(), path, superblock, pb_util::SENSITIVE), |
| Substitute("Could not load tablet metadata from $0", path)); |
| return Status::OK(); |
| } |
| |
| Status TabletMetadata::ToSuperBlock(TabletSuperBlockPB* super_block) const { |
| // acquire the lock so that rowsets_ doesn't get changed until we're finished. |
| std::lock_guard<LockType> l(data_lock_); |
| return ToSuperBlockUnlocked(super_block, rowsets_); |
| } |
| |
| Status TabletMetadata::ToSuperBlockUnlocked(TabletSuperBlockPB* super_block, |
| const RowSetMetadataVector& rowsets) const { |
| DCHECK(data_lock_.is_locked()); |
| // Convert to protobuf |
| TabletSuperBlockPB pb; |
| pb.set_table_id(table_id_); |
| pb.set_tablet_id(tablet_id_); |
| partition_.ToPB(pb.mutable_partition()); |
| pb.set_last_durable_mrs_id(last_durable_mrs_id_); |
| pb.set_schema_version(schema_version_); |
| RETURN_NOT_OK(partition_schema_.ToPB(*schema_, pb.mutable_partition_schema())); |
| pb.set_table_name(table_name_); |
| |
| for (const shared_ptr<RowSetMetadata>& meta : rowsets) { |
| meta->ToProtobuf(pb.add_rowsets()); |
| } |
| |
| for (const auto& txn_id_and_metadata : txn_metadata_by_txn_id_) { |
| const auto& txn_meta = txn_id_and_metadata.second; |
| TxnMetadataPB meta_pb = txn_meta->ToPB(); |
| InsertOrDie(pb.mutable_txn_metadata(), txn_id_and_metadata.first, meta_pb); |
| } |
| |
| DCHECK(schema_->has_column_ids()); |
| RETURN_NOT_OK_PREPEND(SchemaToPB(*schema_, pb.mutable_schema()), |
| "Couldn't serialize schema into superblock"); |
| |
| pb.set_tablet_data_state(tablet_data_state_); |
| if (tombstone_last_logged_opid_ && |
| !OpIdEquals(MinimumOpId(), *tombstone_last_logged_opid_)) { |
| *pb.mutable_tombstone_last_logged_opid() = *tombstone_last_logged_opid_; |
| } |
| |
| for (const BlockId& block_id : orphaned_blocks_) { |
| block_id.CopyToPB(pb.mutable_orphaned_blocks()->Add()); |
| } |
| |
| // Serialize the tablet's DataDirGroupPB if one exists. One may not exist if |
| // this is called during a tablet deletion. |
| DataDirGroupPB group_pb; |
| if (fs_manager_->dd_manager()->GetDataDirGroupPB(tablet_id_, &group_pb).ok()) { |
| pb.mutable_data_dir_group()->Swap(&group_pb); |
| } |
| |
| pb.set_supports_live_row_count(supports_live_row_count_); |
| |
| if (extra_config_) { |
| *pb.mutable_extra_config() = *extra_config_; |
| } |
| |
| if (dimension_label_) { |
| pb.set_dimension_label(*dimension_label_); |
| } |
| |
| if (table_type_) { |
| DCHECK_NE(TableTypePB::DEFAULT_TABLE, *table_type_); |
| pb.set_table_type(*table_type_); |
| } |
| |
| super_block->Swap(&pb); |
| return Status::OK(); |
| } |
| |
| Status TabletMetadata::CreateRowSet(shared_ptr<RowSetMetadata>* rowset) { |
| AtomicWord rowset_idx = Barrier_AtomicIncrement(&next_rowset_idx_, 1) - 1; |
| unique_ptr<RowSetMetadata> scoped_rsm; |
| RETURN_NOT_OK(RowSetMetadata::CreateNew(this, rowset_idx, &scoped_rsm)); |
| rowset->reset(DCHECK_NOTNULL(scoped_rsm.release())); |
| return Status::OK(); |
| } |
| |
| void TabletMetadata::AddTxnMetadata(int64_t txn_id, unique_ptr<MinLogIndexAnchorer> log_anchor) { |
| std::lock_guard<LockType> l(data_lock_); |
| EmplaceOrDie(&txn_metadata_by_txn_id_, txn_id, new TxnMetadata()); |
| anchors_needing_flush_.emplace_back(std::move(log_anchor)); |
| } |
| |
| void TabletMetadata::BeginCommitTransaction(int64_t txn_id, Timestamp mvcc_op_timestamp, |
| unique_ptr<MinLogIndexAnchorer> log_anchor) { |
| std::lock_guard<LockType> l(data_lock_); |
| auto txn_metadata = FindOrDie(txn_metadata_by_txn_id_, txn_id); |
| // NOTE: we may already have an MVCC op timestamp if we are bootstrapping and |
| // the timestamp was persisted already, in which case, we don't need to |
| // anchor the WAL to ensure the timestamp's persistence in metadata. |
| if (!txn_metadata->commit_mvcc_op_timestamp()) { |
| txn_metadata->set_commit_mvcc_op_timestamp(mvcc_op_timestamp); |
| anchors_needing_flush_.emplace_back(std::move(log_anchor)); |
| } |
| } |
| |
| void TabletMetadata::AddCommitTimestamp(int64_t txn_id, Timestamp commit_timestamp, |
| unique_ptr<MinLogIndexAnchorer> log_anchor) { |
| std::lock_guard<LockType> l(data_lock_); |
| auto txn_metadata = FindOrDie(txn_metadata_by_txn_id_, txn_id); |
| txn_metadata->set_commit_timestamp(commit_timestamp); |
| anchors_needing_flush_.emplace_back(std::move(log_anchor)); |
| } |
| |
| void TabletMetadata::AbortTransaction(int64_t txn_id, unique_ptr<MinLogIndexAnchorer> log_anchor) { |
| std::lock_guard<LockType> l(data_lock_); |
| // NOTE: we can't emplace with a raw pointer here; if the lookup succeeds, we |
| // wouldn't use it and we'd have a memory leak, so use scoped_refptr. |
| auto txn_metadata = LookupOrEmplace(&txn_metadata_by_txn_id_, txn_id, |
| scoped_refptr<TxnMetadata>(new TxnMetadata)); |
| CHECK(txn_metadata); |
| txn_metadata->set_aborted(); |
| anchors_needing_flush_.emplace_back(std::move(log_anchor)); |
| } |
| |
| bool TabletMetadata::HasTxnMetadata(int64_t txn_id, TxnState* state, Timestamp* timestamp) { |
| std::lock_guard<LockType> l(data_lock_); |
| auto txn_meta = FindPtrOrNull(txn_metadata_by_txn_id_, txn_id); |
| if (txn_meta) { |
| if (!state) return true; |
| if (txn_meta->commit_timestamp()) { |
| *state = kCommitted; |
| if (timestamp) { |
| DCHECK(txn_meta->commit_timestamp()); |
| *timestamp = *txn_meta->commit_timestamp(); |
| } |
| } else if (txn_meta->aborted()) { |
| *state = kAborted; |
| } else if (txn_meta->commit_mvcc_op_timestamp()) { |
| *state = kCommitInProgress; |
| if (timestamp) { |
| DCHECK(txn_meta->commit_mvcc_op_timestamp()); |
| *timestamp = *txn_meta->commit_mvcc_op_timestamp(); |
| } |
| } else { |
| *state = kOpen; |
| } |
| return true; |
| } |
| return false; |
| } |
| |
| void TabletMetadata::GetTxnIds(unordered_set<int64_t>* in_flight_txn_ids, |
| unordered_set<int64_t>* terminal_txn_ids, |
| unordered_set<int64_t>* txn_ids_with_mrs) { |
| std::unordered_set<int64_t> in_flights; |
| std::unordered_set<int64_t> terminals; |
| std::unordered_set<int64_t> needs_mrs; |
| std::lock_guard<LockType> l(data_lock_); |
| for (const auto& txn_id_and_metadata : txn_metadata_by_txn_id_) { |
| const auto& txn_id = txn_id_and_metadata.first; |
| const auto& txn_meta = txn_id_and_metadata.second; |
| if (txn_meta->commit_timestamp() || txn_meta->aborted()) { |
| if (terminal_txn_ids) { |
| EmplaceOrDie(&terminals, txn_id); |
| } |
| } else { |
| EmplaceOrDie(&in_flights, txn_id); |
| } |
| // If we have not flushed the MRS after committing, the bootstrap process |
| // will need to create an MRS for it, even if the transaction is committed. |
| if (txn_ids_with_mrs && |
| !txn_meta->flushed_committed_mrs() && |
| !txn_meta->aborted()) { |
| EmplaceOrDie(&needs_mrs, txn_id); |
| } |
| } |
| *in_flight_txn_ids = std::move(in_flights); |
| if (terminal_txn_ids) { |
| *terminal_txn_ids = std::move(terminals); |
| } |
| if (txn_ids_with_mrs) { |
| *txn_ids_with_mrs = std::move(needs_mrs); |
| } |
| } |
| |
| unordered_map<int64_t, scoped_refptr<TxnMetadata>> TabletMetadata::GetTxnMetadata() const { |
| std::lock_guard<LockType> l(data_lock_); |
| return txn_metadata_by_txn_id_; |
| } |
| |
| bool TabletMetadata::GetTxnMetadataPB(int64_t txn_id, TxnMetadataPB* pb) const { |
| std::lock_guard<LockType> l(data_lock_); |
| auto txn_meta = FindPtrOrNull(txn_metadata_by_txn_id_, txn_id); |
| if (!txn_meta) { |
| return false; |
| } |
| *pb = txn_meta->ToPB(); |
| return true; |
| } |
| |
| const RowSetMetadata *TabletMetadata::GetRowSetForTests(int64_t id) const { |
| for (const shared_ptr<RowSetMetadata>& rowset_meta : rowsets_) { |
| if (rowset_meta->id() == id) { |
| return rowset_meta.get(); |
| } |
| } |
| return nullptr; |
| } |
| |
| RowSetMetadata *TabletMetadata::GetRowSetForTests(int64_t id) { |
| std::lock_guard<LockType> l(data_lock_); |
| for (const shared_ptr<RowSetMetadata>& rowset_meta : rowsets_) { |
| if (rowset_meta->id() == id) { |
| return rowset_meta.get(); |
| } |
| } |
| return nullptr; |
| } |
| |
| void TabletMetadata::SetSchema(const SchemaPtr& schema, uint32_t version) { |
| // In case this is the last reference to the schema, destruct the pointer |
| // outside the lock. |
| SchemaPtr old_schema; |
| { |
| std::lock_guard<LockType> l(data_lock_); |
| SwapSchemaUnlocked(schema, version, &old_schema); |
| } |
| } |
| |
| void TabletMetadata::SwapSchemaUnlocked(SchemaPtr schema, uint32_t version, |
| SchemaPtr* old_schema) { |
| DCHECK(schema->has_column_ids()); |
| *old_schema = std::move(schema_); |
| schema_ = std::move(schema); |
| schema_version_ = version; |
| } |
| |
| void TabletMetadata::SetTableName(const string& table_name) { |
| std::lock_guard<LockType> l(data_lock_); |
| table_name_ = table_name; |
| } |
| |
| string TabletMetadata::table_name() const { |
| std::lock_guard<LockType> l(data_lock_); |
| DCHECK_NE(state_, kNotLoadedYet); |
| return table_name_; |
| } |
| |
| uint32_t TabletMetadata::schema_version() const { |
| std::lock_guard<LockType> l(data_lock_); |
| DCHECK_NE(state_, kNotLoadedYet); |
| return schema_version_; |
| } |
| |
| void TabletMetadata::set_tablet_data_state(TabletDataState state) { |
| std::lock_guard<LockType> l(data_lock_); |
| if (state == TABLET_DATA_READY) { |
| tombstone_last_logged_opid_.reset(); |
| } |
| tablet_data_state_ = state; |
| } |
| |
| TabletDataState TabletMetadata::tablet_data_state() const { |
| std::lock_guard<LockType> l(data_lock_); |
| return tablet_data_state_; |
| } |
| |
| void TabletMetadata::SetExtraConfig(TableExtraConfigPB extra_config) { |
| std::lock_guard<LockType> l(data_lock_); |
| extra_config_ = std::move(extra_config); |
| } |
| |
| optional<TableExtraConfigPB> TabletMetadata::extra_config() const { |
| std::lock_guard<LockType> l(data_lock_); |
| return extra_config_; |
| } |
| |
| optional<string> TabletMetadata::dimension_label() const { |
| std::lock_guard<LockType> l(data_lock_); |
| return dimension_label_; |
| } |
| |
| const optional<TableTypePB>& TabletMetadata::table_type() const { |
| std::lock_guard<LockType> l(data_lock_); |
| return table_type_; |
| } |
| |
| } // namespace tablet |
| } // namespace kudu |