| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/tablet/tablet.h" |
| |
| #include <algorithm> |
| #include <ctime> |
| #include <functional> |
| #include <iterator> |
| #include <memory> |
| #include <mutex> |
| #include <optional> |
| #include <ostream> |
| #include <random> |
| #include <type_traits> |
| #include <unordered_map> |
| #include <unordered_set> |
| #include <utility> |
| #include <vector> |
| |
| #include <gflags/gflags.h> |
| #include <glog/logging.h> |
| #include <google/protobuf/arena.h> |
| |
| #include "kudu/clock/clock.h" |
| #include "kudu/clock/hybrid_clock.h" |
| #include "kudu/common/common.pb.h" |
| #include "kudu/common/encoded_key.h" |
| #include "kudu/common/generic_iterators.h" |
| #include "kudu/common/iterator.h" |
| #include "kudu/common/partition.h" |
| #include "kudu/common/row.h" |
| #include "kudu/common/row_changelist.h" |
| #include "kudu/common/row_operations.h" |
| #include "kudu/common/row_operations.pb.h" |
| #include "kudu/common/rowid.h" |
| #include "kudu/common/scan_spec.h" |
| #include "kudu/common/schema.h" |
| #include "kudu/common/timestamp.h" |
| #include "kudu/consensus/log_anchor_registry.h" |
| #include "kudu/consensus/opid.pb.h" |
| #include "kudu/fs/block_manager.h" |
| #include "kudu/fs/fs_manager.h" |
| #include "kudu/fs/io_context.h" |
| #include "kudu/gutil/casts.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/stl_util.h" |
| #include "kudu/gutil/strings/human_readable.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/gutil/threading/thread_collision_warner.h" |
| #include "kudu/tablet/compaction.h" |
| #include "kudu/tablet/compaction_policy.h" |
| #include "kudu/tablet/delta_tracker.h" |
| #include "kudu/tablet/diskrowset.h" |
| #include "kudu/tablet/memrowset.h" |
| #include "kudu/tablet/ops/alter_schema_op.h" |
| #include "kudu/tablet/ops/participant_op.h" |
| #include "kudu/tablet/ops/write_op.h" |
| #include "kudu/tablet/row_op.h" |
| #include "kudu/tablet/rowset_info.h" |
| #include "kudu/tablet/rowset_tree.h" |
| #include "kudu/tablet/svg_dump.h" |
| #include "kudu/tablet/tablet.pb.h" |
| #include "kudu/tablet/tablet_metrics.h" |
| #include "kudu/tablet/tablet_mm_ops.h" |
| #include "kudu/tablet/txn_metadata.h" |
| #include "kudu/tserver/tserver.pb.h" |
| #include "kudu/tserver/tserver_admin.pb.h" |
| #include "kudu/util/bitmap.h" |
| #include "kudu/util/bloom_filter.h" |
| #include "kudu/util/debug/trace_event.h" |
| #include "kudu/util/faststring.h" |
| #include "kudu/util/fault_injection.h" |
| #include "kudu/util/flag_tags.h" |
| #include "kudu/util/locks.h" |
| #include "kudu/util/logging.h" |
| #include "kudu/util/memory/arena.h" |
| #include "kudu/util/metrics.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/process_memory.h" |
| #include "kudu/util/slice.h" |
| #include "kudu/util/status_callback.h" |
| #include "kudu/util/throttler.h" |
| #include "kudu/util/trace.h" |
| #include "kudu/util/url-coding.h" |
| |
| namespace kudu { |
| namespace tablet { |
| class RowSetMetadata; |
| } // namespace tablet |
| } // namespace kudu |
| |
| DEFINE_bool(prevent_kudu_2233_corruption, true, |
| "Whether or not to prevent KUDU-2233 corruptions. Used for testing only!"); |
| TAG_FLAG(prevent_kudu_2233_corruption, unsafe); |
| |
| DEFINE_int32(tablet_compaction_budget_mb, 128, |
| "Budget for a single compaction"); |
| TAG_FLAG(tablet_compaction_budget_mb, experimental); |
| |
| DEFINE_int32(tablet_bloom_block_size, 4096, |
| "Block size of the bloom filters used for tablet keys."); |
| TAG_FLAG(tablet_bloom_block_size, advanced); |
| |
| DEFINE_double(tablet_bloom_target_fp_rate, 0.0001f, |
| "Target false-positive rate (between 0 and 1) to size tablet key bloom filters. " |
| "A lower false positive rate may reduce the number of disk seeks required " |
| "in heavy insert workloads, at the expense of more space and RAM " |
| "required for bloom filters."); |
| TAG_FLAG(tablet_bloom_target_fp_rate, advanced); |
| |
| |
| DEFINE_double(fault_crash_before_flush_tablet_meta_after_compaction, 0.0, |
| "Fraction of the time, during compaction, to crash before flushing metadata"); |
| TAG_FLAG(fault_crash_before_flush_tablet_meta_after_compaction, unsafe); |
| |
| DEFINE_double(fault_crash_before_flush_tablet_meta_after_flush_mrs, 0.0, |
| "Fraction of the time, while flushing an MRS, to crash before flushing metadata"); |
| TAG_FLAG(fault_crash_before_flush_tablet_meta_after_flush_mrs, unsafe); |
| |
| DEFINE_int64(tablet_throttler_rpc_per_sec, 0, |
| "Maximum write RPC rate (op/s) allowed for a tablet, write RPC exceeding this " |
| "limit will be throttled. 0 means no limit."); |
| TAG_FLAG(tablet_throttler_rpc_per_sec, experimental); |
| |
| DEFINE_int64(tablet_throttler_bytes_per_sec, 0, |
| "Maximum write RPC IO rate (byte/s) allowed for a tablet, write RPC exceeding " |
| "this limit will be throttled. 0 means no limit."); |
| TAG_FLAG(tablet_throttler_bytes_per_sec, experimental); |
| |
| DEFINE_double(tablet_throttler_burst_factor, 1.0f, |
| "Burst factor for write RPC throttling. The maximum rate the throttler " |
| "allows within a token refill period (100ms) equals burst factor multiply " |
| "base rate."); |
| TAG_FLAG(tablet_throttler_burst_factor, experimental); |
| |
| DEFINE_int32(tablet_history_max_age_sec, 60 * 60 * 24 * 7, |
| "Number of seconds to retain tablet history, including history " |
| "required to perform diff scans and incremental backups. Reads " |
| "initiated at a snapshot that is older than this age will be " |
| "rejected. To disable history removal, set to -1."); |
| TAG_FLAG(tablet_history_max_age_sec, advanced); |
| TAG_FLAG(tablet_history_max_age_sec, stable); |
| |
| // Large encoded keys cause problems because we store the min/max encoded key in the |
| // CFile footer for the composite key column. The footer has a max length of 64K, so |
| // the default here comfortably fits two of them with room for other metadata. |
| DEFINE_int32(max_encoded_key_size_bytes, 16 * 1024, |
| "The maximum size of a row's encoded composite primary key. This length is " |
| "approximately the sum of the sizes of the component columns, though it can " |
| "be larger in cases where the components contain embedded NULL bytes. " |
| "Attempting to insert a row with a larger encoded composite key will " |
| "result in an error."); |
| TAG_FLAG(max_encoded_key_size_bytes, unsafe); |
| |
| DEFINE_int32(workload_stats_rate_collection_min_interval_ms, 60 * 1000, |
| "The minimal interval in milliseconds at which we collect read/write rates."); |
| TAG_FLAG(workload_stats_rate_collection_min_interval_ms, experimental); |
| TAG_FLAG(workload_stats_rate_collection_min_interval_ms, runtime); |
| |
| DEFINE_int32(workload_stats_metric_collection_interval_ms, 5 * 60 * 1000, |
| "The interval in milliseconds at which we collect workload metrics."); |
| TAG_FLAG(workload_stats_metric_collection_interval_ms, experimental); |
| TAG_FLAG(workload_stats_metric_collection_interval_ms, runtime); |
| |
| DEFINE_double(workload_score_upper_bound, 1.0, "Upper bound for workload score."); |
| TAG_FLAG(workload_score_upper_bound, experimental); |
| TAG_FLAG(workload_score_upper_bound, runtime); |
| |
| DEFINE_int32(scans_started_per_sec_for_hot_tablets, 1, |
| "Minimum read rate for tablets to be considered 'hot' (scans/sec). If a tablet's " |
| "read rate exceeds this value, flush/compaction ops for it will be assigned the highest " |
| "possible workload score, which is defined by --workload_score_upper_bound."); |
| TAG_FLAG(scans_started_per_sec_for_hot_tablets, experimental); |
| TAG_FLAG(scans_started_per_sec_for_hot_tablets, runtime); |
| |
| DEFINE_int32(rows_writed_per_sec_for_hot_tablets, 1000, |
| "Minimum write rate for tablets to be considered 'hot' (rows/sec). If a tablet's " |
| "write rate exceeds this value, compaction ops for it will be assigned the highest " |
| "possible workload score, which is defined by --workload_score_upper_bound."); |
| TAG_FLAG(rows_writed_per_sec_for_hot_tablets, experimental); |
| TAG_FLAG(rows_writed_per_sec_for_hot_tablets, runtime); |
| |
| METRIC_DEFINE_entity(tablet); |
| METRIC_DEFINE_gauge_size(tablet, memrowset_size, "MemRowSet Memory Usage", |
| kudu::MetricUnit::kBytes, |
| "Size of this tablet's memrowset", |
| kudu::MetricLevel::kInfo); |
| METRIC_DEFINE_gauge_size(tablet, on_disk_data_size, "Tablet Data Size On Disk", |
| kudu::MetricUnit::kBytes, |
| "Space used by this tablet's data blocks.", |
| kudu::MetricLevel::kInfo); |
| METRIC_DEFINE_gauge_size(tablet, num_rowsets_on_disk, "Tablet Number of Rowsets on Disk", |
| kudu::MetricUnit::kUnits, |
| "Number of diskrowsets in this tablet", |
| kudu::MetricLevel::kInfo); |
| METRIC_DEFINE_gauge_uint64(tablet, last_read_elapsed_time, "Seconds Since Last Read", |
| kudu::MetricUnit::kSeconds, |
| "The elapsed time, in seconds, since the last read operation on this " |
| "tablet, or since this Tablet object was created on current tserver if " |
| "it hasn't been read since then.", |
| kudu::MetricLevel::kDebug); |
| METRIC_DEFINE_gauge_uint64(tablet, last_write_elapsed_time, "Seconds Since Last Write", |
| kudu::MetricUnit::kSeconds, |
| "The elapsed time, in seconds, since the last write operation on this " |
| "tablet, or since this Tablet object was created on current tserver if " |
| "it hasn't been written to since then.", |
| kudu::MetricLevel::kDebug); |
| |
| using kudu::MaintenanceManager; |
| using kudu::clock::HybridClock; |
| using kudu::consensus::OpId; |
| using kudu::fs::IOContext; |
| using kudu::log::LogAnchorRegistry; |
| using kudu::log::MinLogIndexAnchorer; |
| using std::endl; |
| using std::make_shared; |
| using std::nullopt; |
| using std::optional; |
| using std::ostream; |
| using std::pair; |
| using std::shared_ptr; |
| using std::string; |
| using std::unique_ptr; |
| using std::unordered_set; |
| using std::vector; |
| using strings::Substitute; |
| |
| namespace kudu { |
| |
| class RowBlock; |
| struct IteratorStats; |
| |
| namespace tablet { |
| |
| static CompactionPolicy *CreateCompactionPolicy() { |
| return new BudgetedCompactionPolicy(FLAGS_tablet_compaction_budget_mb); |
| } |
| |
| //////////////////////////////////////////////////////////// |
| // TabletComponents |
| //////////////////////////////////////////////////////////// |
| |
| TabletComponents::TabletComponents(shared_ptr<MemRowSet> mrs, |
| std::vector<std::shared_ptr<MemRowSet>> txn_mrss, |
| shared_ptr<RowSetTree> rs_tree) |
| : memrowset(std::move(mrs)), |
| txn_memrowsets(std::move(txn_mrss)), |
| rowsets(std::move(rs_tree)) {} |
| |
| //////////////////////////////////////////////////////////// |
| // Tablet |
| //////////////////////////////////////////////////////////// |
| |
| Tablet::Tablet(scoped_refptr<TabletMetadata> metadata, |
| clock::Clock* clock, |
| shared_ptr<MemTracker> parent_mem_tracker, |
| MetricRegistry* metric_registry, |
| scoped_refptr<LogAnchorRegistry> log_anchor_registry) |
| : key_schema_(metadata->schema()->CreateKeyProjection()), |
| metadata_(std::move(metadata)), |
| log_anchor_registry_(std::move(log_anchor_registry)), |
| mem_trackers_(tablet_id(), std::move(parent_mem_tracker)), |
| next_mrs_id_(0), |
| clock_(clock), |
| txn_participant_(metadata_), |
| rowsets_flush_sem_(1), |
| state_(kInitialized), |
| last_write_time_(MonoTime::Now()), |
| last_read_time_(MonoTime::Now()), |
| last_update_workload_stats_time_(MonoTime::Now()), |
| last_scans_started_(0), |
| last_rows_mutated_(0), |
| last_read_score_(0.0), |
| last_write_score_(0.0) { |
| CHECK(schema()->has_column_ids()); |
| compaction_policy_.reset(CreateCompactionPolicy()); |
| |
| if (metric_registry) { |
| MetricEntity::AttributeMap attrs; |
| attrs["table_id"] = metadata_->table_id(); |
| attrs["table_name"] = metadata_->table_name(); |
| attrs["partition"] = metadata_->partition_schema().PartitionDebugString(metadata_->partition(), |
| *schema()); |
| metric_entity_ = METRIC_ENTITY_tablet.Instantiate(metric_registry, tablet_id(), attrs); |
| metrics_.reset(new TabletMetrics(metric_entity_)); |
| METRIC_memrowset_size.InstantiateFunctionGauge( |
| metric_entity_, [this]() { return this->MemRowSetSize(); }) |
| ->AutoDetach(&metric_detacher_); |
| METRIC_on_disk_data_size.InstantiateFunctionGauge( |
| metric_entity_, [this]() { return this->OnDiskDataSize(); }) |
| ->AutoDetach(&metric_detacher_); |
| METRIC_num_rowsets_on_disk.InstantiateFunctionGauge( |
| metric_entity_, [this]() { return this->num_rowsets(); }) |
| ->AutoDetach(&metric_detacher_); |
| METRIC_last_read_elapsed_time.InstantiateFunctionGauge( |
| metric_entity_, [this]() { return this->LastReadElapsedSeconds(); }, |
| MergeType::kMin) |
| ->AutoDetach(&metric_detacher_); |
| METRIC_last_write_elapsed_time.InstantiateFunctionGauge( |
| metric_entity_, [this]() { return this->LastWriteElapsedSeconds(); }, |
| MergeType::kMin) |
| ->AutoDetach(&metric_detacher_); |
| } |
| |
| if (FLAGS_tablet_throttler_rpc_per_sec > 0 || FLAGS_tablet_throttler_bytes_per_sec > 0) { |
| throttler_.reset(new Throttler(MonoTime::Now(), |
| FLAGS_tablet_throttler_rpc_per_sec, |
| FLAGS_tablet_throttler_bytes_per_sec, |
| FLAGS_tablet_throttler_burst_factor)); |
| } |
| } |
| |
| Tablet::~Tablet() { |
| Shutdown(); |
| } |
| |
| // Returns an error if the Tablet has been stopped, i.e. is 'kStopped' or |
| // 'kShutdown', and otherwise checks that 'expected_state' matches 'state_'. |
| #define RETURN_IF_STOPPED_OR_CHECK_STATE(expected_state) do { \ |
| State _local_state; \ |
| RETURN_NOT_OK(CheckHasNotBeenStopped(&_local_state)); \ |
| CHECK_EQ(expected_state, _local_state); \ |
| } while (0) |
| |
| Status Tablet::Open(const unordered_set<int64_t>& in_flight_txn_ids, |
| const unordered_set<int64_t>& txn_ids_with_mrs) { |
| TRACE_EVENT0("tablet", "Tablet::Open"); |
| RETURN_IF_STOPPED_OR_CHECK_STATE(kInitialized); |
| |
| CHECK(schema()->has_column_ids()); |
| |
| next_mrs_id_ = metadata_->last_durable_mrs_id() + 1; |
| |
| RowSetVector rowsets_opened; |
| |
| // If we persisted the state of any transaction IDs before shutting down, |
| // initialize those that were in-flight here as kOpen. If there were any ops |
| // applied that didn't get persisted to the tablet metadata, the bootstrap |
| // process will replay those ops. |
| for (const auto& txn_id : in_flight_txn_ids) { |
| txn_participant_.CreateOpenTransaction(txn_id, log_anchor_registry_.get()); |
| } |
| |
| fs::IOContext io_context({ tablet_id() }); |
| // open the tablet row-sets |
| for (const shared_ptr<RowSetMetadata>& rowset_meta : metadata_->rowsets()) { |
| shared_ptr<DiskRowSet> rowset; |
| Status s = DiskRowSet::Open(rowset_meta, |
| log_anchor_registry_.get(), |
| mem_trackers_, |
| &io_context, |
| &rowset); |
| if (!s.ok()) { |
| LOG_WITH_PREFIX(ERROR) << "Failed to open rowset " << rowset_meta->ToString() << ": " |
| << s.ToString(); |
| return s; |
| } |
| |
| rowsets_opened.push_back(rowset); |
| } |
| |
| { |
| auto new_rowset_tree(make_shared<RowSetTree>()); |
| CHECK_OK(new_rowset_tree->Reset(rowsets_opened)); |
| |
| // Now that the current state is loaded, create the new MemRowSet with the next id. |
| shared_ptr<MemRowSet> new_mrs; |
| const SchemaPtr schema_ptr = schema(); |
| RETURN_NOT_OK(MemRowSet::Create(next_mrs_id_++, *schema_ptr, |
| log_anchor_registry_.get(), |
| mem_trackers_.tablet_tracker, |
| &new_mrs)); |
| |
| // Create MRSs for any in-flight transactions there might be. |
| // NOTE: we may also have to create MRSs for committed transactions; that |
| // will happen upon bootstrapping. |
| std::unordered_map<int64_t, scoped_refptr<TxnRowSets>> uncommitted_rs_by_txn_id; |
| const auto txn_meta_by_id = metadata_->GetTxnMetadata(); |
| for (const auto& txn_id : txn_ids_with_mrs) { |
| shared_ptr<MemRowSet> txn_mrs; |
| // NOTE: we are able to FindOrDie() on these IDs because |
| // 'txn_ids_with_mrs' is a subset of the transaction IDs known by the |
| // metadata. |
| RETURN_NOT_OK(MemRowSet::Create(0, *schema_ptr, txn_id, FindOrDie(txn_meta_by_id, txn_id), |
| log_anchor_registry_.get(), |
| mem_trackers_.tablet_tracker, |
| &txn_mrs)); |
| EmplaceOrDie(&uncommitted_rs_by_txn_id, txn_id, new TxnRowSets(std::move(txn_mrs))); |
| } |
| std::lock_guard<rw_spinlock> lock(component_lock_); |
| components_.reset(new TabletComponents( |
| std::move(new_mrs), {}, std::move(new_rowset_tree))); |
| uncommitted_rowsets_by_txn_id_ = std::move(uncommitted_rs_by_txn_id); |
| } |
| |
| // Compute the initial average rowset height. |
| UpdateAverageRowsetHeight(); |
| |
| { |
| std::lock_guard<simple_spinlock> l(state_lock_); |
| if (state_ != kInitialized) { |
| DCHECK(state_ == kStopped || state_ == kShutdown); |
| return Status::IllegalState("Expected the Tablet to be initialized"); |
| } |
| set_state_unlocked(kBootstrapping); |
| } |
| return Status::OK(); |
| } |
| |
| void Tablet::Stop() { |
| { |
| std::lock_guard<simple_spinlock> l(state_lock_); |
| if (state_ == kStopped || state_ == kShutdown) { |
| return; |
| } |
| set_state_unlocked(kStopped); |
| } |
| |
| // Close MVCC so Applying ops will not complete and will not be waited on. |
| // This prevents further snapshotting of the tablet. |
| mvcc_.Close(); |
| |
| // Stop tablet ops from being scheduled by the maintenance manager. |
| CancelMaintenanceOps(); |
| } |
| |
| Status Tablet::MarkFinishedBootstrapping() { |
| std::lock_guard<simple_spinlock> l(state_lock_); |
| if (state_ != kBootstrapping) { |
| DCHECK(state_ == kStopped || state_ == kShutdown); |
| return Status::IllegalState("The tablet has been stopped"); |
| } |
| set_state_unlocked(kOpen); |
| return Status::OK(); |
| } |
| |
| void Tablet::Shutdown() { |
| Stop(); |
| UnregisterMaintenanceOps(); |
| |
| std::lock_guard<rw_spinlock> lock(component_lock_); |
| components_ = nullptr; |
| { |
| std::lock_guard<simple_spinlock> l(state_lock_); |
| set_state_unlocked(kShutdown); |
| } |
| if (metric_entity_) { |
| metric_entity_->Unpublish(); |
| } |
| |
| // In the case of deleting a tablet, we still keep the metadata around after |
| // ShutDown(), and need to flush the metadata to indicate that the tablet is deleted. |
| // During that flush, we don't want metadata to call back into the Tablet, so we |
| // have to unregister the pre-flush callback. |
| metadata_->SetPreFlushCallback(&DoNothingStatusClosure); |
| } |
| |
| Status Tablet::GetMappedReadProjection(const Schema& projection, |
| Schema *mapped_projection) const { |
| const SchemaPtr cur_schema = schema(); |
| return cur_schema->GetMappedReadProjection(projection, mapped_projection); |
| } |
| |
| BloomFilterSizing Tablet::DefaultBloomSizing() { |
| return BloomFilterSizing::BySizeAndFPRate(FLAGS_tablet_bloom_block_size, |
| FLAGS_tablet_bloom_target_fp_rate); |
| } |
| |
| void Tablet::SplitKeyRange(const EncodedKey* start_key, |
| const EncodedKey* stop_key, |
| const std::vector<ColumnId>& column_ids, |
| uint64 target_chunk_size, |
| std::vector<KeyRange>* key_range_info) { |
| shared_ptr<RowSetTree> rowsets_copy; |
| { |
| shared_lock<rw_spinlock> l(component_lock_); |
| rowsets_copy = components_->rowsets; |
| } |
| |
| Slice start, stop; |
| if (start_key != nullptr) { |
| start = start_key->encoded_key(); |
| } |
| if (stop_key != nullptr) { |
| stop = stop_key->encoded_key(); |
| } |
| RowSetInfo::SplitKeyRange(*rowsets_copy, start, stop, |
| column_ids, target_chunk_size, key_range_info); |
| } |
| |
| Status Tablet::NewRowIterator(const Schema& projection, |
| unique_ptr<RowwiseIterator>* iter) const { |
| RowIteratorOptions opts; |
| // Yield current rows. |
| opts.snap_to_include = MvccSnapshot(mvcc_); |
| opts.projection = &projection; |
| return NewRowIterator(std::move(opts), iter); |
| } |
| |
| Status Tablet::NewOrderedRowIterator(const Schema& projection, |
| unique_ptr<RowwiseIterator>* iter) const { |
| RowIteratorOptions opts; |
| // Yield current rows. |
| opts.snap_to_include = MvccSnapshot(mvcc_); |
| opts.projection = &projection; |
| opts.order = ORDERED; |
| return NewRowIterator(std::move(opts), iter); |
| } |
| |
| Status Tablet::NewRowIterator(RowIteratorOptions opts, |
| unique_ptr<RowwiseIterator>* iter) const { |
| RETURN_IF_STOPPED_OR_CHECK_STATE(kOpen); |
| if (metrics_) { |
| metrics_->scans_started->Increment(); |
| } |
| |
| VLOG_WITH_PREFIX(2) << "Created new Iterator for snapshot range: (" |
| << (opts.snap_to_exclude ? opts.snap_to_exclude->ToString() : "-Inf") |
| << ", " << opts.snap_to_include.ToString() << ")"; |
| iter->reset(new Iterator(this, std::move(opts))); |
| return Status::OK(); |
| } |
| |
| Status Tablet::DecodeWriteOperations(const Schema* client_schema, |
| WriteOpState* op_state) { |
| TRACE_EVENT0("tablet", "Tablet::DecodeWriteOperations"); |
| |
| DCHECK(op_state->row_ops().empty()); |
| |
| // Acquire the schema lock in shared mode, so that the schema doesn't change |
| // while this op is in-flight. |
| op_state->AcquireSchemaLock(&schema_lock_); |
| |
| // The Schema needs to be held constant while any ops are between PREPARE and |
| // APPLY stages |
| TRACE("Decoding operations"); |
| vector<DecodedRowOperation> ops; |
| |
| SchemaPtr schema_ptr = schema(); |
| // Decode the ops |
| RowOperationsPBDecoder dec(&op_state->request()->row_operations(), |
| client_schema, |
| schema_ptr.get(), |
| op_state->arena()); |
| RETURN_NOT_OK(dec.DecodeOperations<DecoderMode::WRITE_OPS>(&ops)); |
| TRACE_COUNTER_INCREMENT("num_ops", ops.size()); |
| |
| // Important to set the schema before the ops -- we need the |
| // schema in order to stringify the ops. |
| op_state->set_schema_at_decode_time(schema_ptr); |
| op_state->SetRowOps(std::move(ops)); |
| |
| return Status::OK(); |
| } |
| |
| Status Tablet::AcquireRowLocks(WriteOpState* op_state) { |
| TRACE_EVENT1("tablet", "Tablet::AcquireRowLocks", |
| "num_locks", op_state->row_ops().size()); |
| TRACE("Acquiring locks for $0 operations", op_state->row_ops().size()); |
| |
| for (RowOp* op : op_state->row_ops()) { |
| if (op->has_result()) continue; |
| |
| ConstContiguousRow row_key(&key_schema_, op->decoded_op.row_data); |
| Arena* arena = op_state->arena(); |
| op->key_probe = arena->NewObject<RowSetKeyProbe>(row_key, arena); |
| if (PREDICT_FALSE(!ValidateOpOrMarkFailed(op))) { |
| continue; |
| } |
| RETURN_NOT_OK(CheckRowInTablet(row_key)); |
| } |
| |
| op_state->AcquireRowLocks(&lock_manager_); |
| |
| TRACE("Row locks acquired"); |
| return Status::OK(); |
| } |
| |
| Status Tablet::AcquirePartitionLock(WriteOpState* op_state, |
| LockManager::LockWaitMode wait_mode) { |
| return op_state->AcquirePartitionLock(&lock_manager_, wait_mode); |
| } |
| |
| Status Tablet::AcquireTxnLock(int64_t txn_id, WriteOpState* op_state) { |
| auto txn = txn_participant_.GetTransaction(txn_id); |
| if (!txn) { |
| // While we might not have an in-flight transaction, we still might have a |
| // finished transaction that has metadata. |
| if (metadata_->HasTxnMetadata(txn_id)) { |
| // TODO(awong): IllegalState or Aborted seem more intuitive, but clients |
| // currently retry on both of those errors, assuming the error comes from |
| // Raft followers complaining about the lack of leadership. |
| return Status::InvalidArgument(Substitute("txn $0 is not open", txn_id)); |
| } |
| return Status::NotFound(Substitute("txn $0 not found on tablet $1", txn_id, tablet_id())); |
| } |
| return op_state->AcquireTxnLockCheckOpen(std::move(txn)); |
| } |
| |
| Status Tablet::CheckRowInTablet(const ConstContiguousRow& row) const { |
| const auto& ps = metadata_->partition_schema(); |
| if (PREDICT_FALSE(!ps.PartitionContainsRow(metadata_->partition(), row))) { |
| return Status::NotFound( |
| Substitute("Row not in tablet partition. Partition: '$0', row: '$1'.", |
| ps.PartitionDebugString(metadata_->partition(), *schema().get()), |
| ps.PartitionKeyDebugString(row))); |
| } |
| return Status::OK(); |
| } |
| |
| void Tablet::AssignTimestampAndStartOpForTests(WriteOpState* op_state) { |
| CHECK(!op_state->has_timestamp()); |
| // Don't support COMMIT_WAIT for tests that don't boot a tablet server. |
| CHECK_NE(op_state->external_consistency_mode(), COMMIT_WAIT); |
| |
| // Make sure timestamp assignment and op start are atomic, for tests. |
| // |
| // This is to make sure that when test ops advance safe time later, we don't have |
| // any op in-flight between getting a timestamp and being started. Otherwise we |
| // might run the risk of assigning a timestamp to op1, and have another op |
| // get a timestamp/start/advance safe time before op1 starts making op1's timestamp |
| // invalid on start. |
| { |
| std::lock_guard<simple_spinlock> l(test_start_op_lock_); |
| op_state->set_timestamp(clock_->Now()); |
| StartOp(op_state); |
| } |
| } |
| |
| void Tablet::StartOp(WriteOpState* op_state) { |
| unique_ptr<ScopedOp> mvcc_op; |
| DCHECK(op_state->has_timestamp()); |
| mvcc_op.reset(new ScopedOp(&mvcc_, op_state->timestamp())); |
| op_state->SetMvccOp(std::move(mvcc_op)); |
| } |
| |
| void Tablet::StartOp(ParticipantOpState* op_state) { |
| if (op_state->request()->op().type() == tserver::ParticipantOpPB::BEGIN_COMMIT) { |
| DCHECK(op_state->has_timestamp()); |
| unique_ptr<ScopedOp> mvcc_op(new ScopedOp(&mvcc_, op_state->timestamp())); |
| op_state->SetMvccOp(std::move(mvcc_op)); |
| } |
| } |
| |
| bool Tablet::ValidateOpOrMarkFailed(RowOp* op) { |
| if (op->valid) return true; |
| if (PREDICT_FALSE(op->has_result())) { |
| DCHECK(op->result->has_failed_status()); |
| return false; |
| } |
| |
| Status s = ValidateOp(*op); |
| if (PREDICT_FALSE(!s.ok())) { |
| // TODO(todd): add a metric tracking the number of invalid ops. |
| op->SetFailed(s); |
| return false; |
| } |
| op->valid = true; |
| return true; |
| } |
| |
| Status Tablet::ValidateOp(const RowOp& op) { |
| switch (op.decoded_op.type) { |
| case RowOperationsPB::INSERT: |
| case RowOperationsPB::INSERT_IGNORE: |
| case RowOperationsPB::UPSERT: |
| case RowOperationsPB::UPSERT_IGNORE: |
| return ValidateInsertOrUpsertUnlocked(op); |
| |
| case RowOperationsPB::UPDATE: |
| case RowOperationsPB::UPDATE_IGNORE: |
| case RowOperationsPB::DELETE: |
| case RowOperationsPB::DELETE_IGNORE: |
| return ValidateMutateUnlocked(op); |
| |
| default: |
| LOG(FATAL) << RowOperationsPB::Type_Name(op.decoded_op.type); |
| } |
| __builtin_unreachable(); |
| } |
| |
| Status Tablet::ValidateInsertOrUpsertUnlocked(const RowOp& op) { |
| // Check that the encoded key is not longer than the maximum. |
| auto enc_key_size = op.key_probe->encoded_key_slice().size(); |
| if (PREDICT_FALSE(enc_key_size > FLAGS_max_encoded_key_size_bytes)) { |
| return Status::InvalidArgument(Substitute( |
| "encoded primary key too large ($0 bytes, maximum is $1 bytes)", |
| enc_key_size, FLAGS_max_encoded_key_size_bytes)); |
| } |
| return Status::OK(); |
| } |
| |
| Status Tablet::ValidateMutateUnlocked(const RowOp& op) { |
| RowChangeListDecoder rcl_decoder(op.decoded_op.changelist); |
| RETURN_NOT_OK(rcl_decoder.Init()); |
| if (PREDICT_FALSE(rcl_decoder.is_reinsert())) { |
| // REINSERT mutations are the byproduct of an INSERT on top of a ghost |
| // row, not something the user is allowed to specify on their own. |
| return Status::InvalidArgument("User may not specify REINSERT mutations"); |
| } |
| |
| if (rcl_decoder.is_delete()) { |
| // Don't validate the composite key length on delete. This is important to allow users |
| // to delete a row if a row with a too-large key was inserted on a previous version |
| // that had no limits. |
| return Status::OK(); |
| } |
| |
| DCHECK(rcl_decoder.is_update()); |
| return Status::OK(); |
| } |
| |
| Status Tablet::InsertOrUpsertUnlocked(const IOContext* io_context, |
| WriteOpState *op_state, |
| RowOp* op, |
| ProbeStats* stats) { |
| DCHECK(op->checked_present); |
| DCHECK(op->valid); |
| |
| RowOperationsPB_Type op_type = op->decoded_op.type; |
| const TabletComponents* comps = DCHECK_NOTNULL(op_state->tablet_components()); |
| |
| if (op->present_in_rowset) { |
| switch (op_type) { |
| case RowOperationsPB::UPSERT: |
| case RowOperationsPB::UPSERT_IGNORE: |
| return ApplyUpsertAsUpdate(io_context, op_state, op, op->present_in_rowset, stats); |
| case RowOperationsPB::INSERT_IGNORE: |
| op->SetErrorIgnored(); |
| return Status::OK(); |
| case RowOperationsPB::INSERT: { |
| Status s = Status::AlreadyPresent("key already present"); |
| if (metrics_) { |
| metrics_->insertions_failed_dup_key->Increment(); |
| } |
| op->SetFailed(s); |
| return s; |
| } |
| default: |
| LOG(FATAL) << "Unknown operation type: " << op_type; |
| } |
| } |
| |
| DCHECK(!op->present_in_rowset); |
| Timestamp ts = op_state->timestamp(); |
| ConstContiguousRow row(schema().get(), op->decoded_op.row_data); |
| |
| // TODO(todd): the Insert() call below will re-encode the key, which is a |
| // waste. Should pass through the KeyProbe structure perhaps. |
| |
| const auto& txn_id = op_state->txn_id(); |
| if (txn_id) { |
| // Only inserts are supported -- this is guaranteed in the prepare phase. |
| DCHECK(op_type == RowOperationsPB::INSERT || |
| op_type == RowOperationsPB::INSERT_IGNORE); |
| // The previous presence checks only checked disk rowsets and committed txn |
| // memrowsets. Before inserting into this transaction's MRS, ensure the |
| // row doesn't already exist in the main MRS. |
| bool present_in_main_mrs = false; |
| RETURN_NOT_OK(comps->memrowset->CheckRowPresent(*op->key_probe, io_context, |
| &present_in_main_mrs, stats)); |
| if (present_in_main_mrs) { |
| if (op_type == RowOperationsPB::INSERT_IGNORE) { |
| op->SetErrorIgnored(); |
| return Status::OK(); |
| } |
| Status s = Status::AlreadyPresent("key already present"); |
| if (metrics_) { |
| metrics_->insertions_failed_dup_key->Increment(); |
| } |
| op->SetFailed(s); |
| return s; |
| } |
| const auto* txn_rowsets = DCHECK_NOTNULL(op_state->txn_rowsets()); |
| Status s = txn_rowsets->memrowset->Insert(ts, row, op_state->op_id()); |
| // TODO(awong): once we support transactional updates, update this to check |
| // for AlreadyPresent statuses. |
| if (PREDICT_TRUE(s.ok())) { |
| op->SetInsertSucceeded(*txn_id, txn_rowsets->memrowset->mrs_id()); |
| } else if (s.IsAlreadyPresent() && op_type == RowOperationsPB::INSERT_IGNORE) { |
| op->SetErrorIgnored(); |
| return Status::OK(); |
| } else { |
| op->SetFailed(s); |
| } |
| return s; |
| } |
| |
| // Now try to op into memrowset. The memrowset itself will return |
| // AlreadyPresent if it has already been inserted there. |
| Status s = comps->memrowset->Insert(ts, row, op_state->op_id()); |
| if (s.ok()) { |
| op->SetInsertSucceeded(comps->memrowset->mrs_id()); |
| } else { |
| if (s.IsAlreadyPresent()) { |
| switch (op_type) { |
| case RowOperationsPB::UPSERT: |
| case RowOperationsPB::UPSERT_IGNORE: |
| return ApplyUpsertAsUpdate(io_context, op_state, op, comps->memrowset.get(), stats); |
| case RowOperationsPB::INSERT_IGNORE: |
| op->SetErrorIgnored(); |
| return Status::OK(); |
| case RowOperationsPB::INSERT: |
| if (metrics_) { |
| metrics_->insertions_failed_dup_key->Increment(); |
| } |
| break; |
| default: |
| LOG(FATAL) << "Unknown operation type: " << op_type; |
| } |
| } |
| op->SetFailed(s); |
| } |
| return s; |
| } |
| |
| Status Tablet::ApplyUpsertAsUpdate(const IOContext* io_context, |
| WriteOpState* op_state, |
| RowOp* upsert, |
| RowSet* rowset, |
| ProbeStats* stats) { |
| const auto op_type = upsert->decoded_op.type; |
| const auto* schema = this->schema().get(); |
| ConstContiguousRow row(schema, upsert->decoded_op.row_data); |
| faststring buf; |
| RowChangeListEncoder enc(&buf); |
| for (int i = schema->num_key_columns(); i < schema->num_columns(); i++) { |
| // If the user didn't explicitly set this column in the UPSERT, then we should |
| // not turn it into an UPDATE. This prevents the UPSERT from updating |
| // values back to their defaults when unset. |
| if (!BitmapTest(upsert->decoded_op.isset_bitmap, i)) continue; |
| const auto& c = schema->column(i); |
| if (c.is_immutable()) { |
| if (op_type == RowOperationsPB::UPSERT) { |
| Status s = Status::Immutable("UPDATE not allowed for immutable column", c.ToString()); |
| upsert->SetFailed(s); |
| return s; |
| } |
| DCHECK_EQ(op_type, RowOperationsPB::UPSERT_IGNORE); |
| // Only set upsert->error_ignored flag instead of calling SetErrorIgnored() to avoid setting |
| // upsert->result which can be set only once. Then the upsert operation can be continued to |
| // mutate the other cells even if the current cell has been skipped, the upsert->result can be |
| // set normally in the next steps. |
| upsert->error_ignored = true; |
| continue; |
| } |
| |
| const void* val = c.is_nullable() ? row.nullable_cell_ptr(i) : row.cell_ptr(i); |
| enc.AddColumnUpdate(c, schema->column_id(i), val); |
| } |
| |
| // If the UPSERT just included the primary key columns, and the rest |
| // were unset (eg because the table only _has_ primary keys, or because |
| // the rest are intended to be set to their defaults), we need to |
| // avoid doing anything. |
| auto* result = google::protobuf::Arena::CreateMessage<OperationResultPB>( |
| op_state->pb_arena()); |
| if (enc.is_empty()) { |
| upsert->SetMutateSucceeded(result); |
| return Status::OK(); |
| } |
| |
| RowChangeList rcl = enc.as_changelist(); |
| |
| Status s = rowset->MutateRow(op_state->timestamp(), |
| *upsert->key_probe, |
| rcl, |
| op_state->op_id(), |
| io_context, |
| stats, |
| result); |
| CHECK(!s.IsNotFound()); |
| if (s.ok()) { |
| if (metrics_) { |
| metrics_->upserts_as_updates->Increment(); |
| } |
| upsert->SetMutateSucceeded(result); |
| } else { |
| upsert->SetFailed(s); |
| } |
| return s; |
| } |
| |
| vector<RowSet*> Tablet::FindRowSetsToCheck(const RowOp* op, |
| const TabletComponents* comps) { |
| vector<RowSet*> to_check; |
| for (const auto& txn_mrs : comps->txn_memrowsets) { |
| to_check.emplace_back(txn_mrs.get()); |
| uint64_t rows; |
| txn_mrs->CountLiveRows(&rows); |
| } |
| if (PREDICT_TRUE(!op->orig_result_from_log)) { |
| // TODO(yingchun): could iterate the rowsets in a smart order |
| // based on recent statistics - eg if a rowset is getting |
| // updated frequently, pick that one first. |
| comps->rowsets->FindRowSetsWithKeyInRange(op->key_probe->encoded_key_slice(), |
| &to_check); |
| #ifndef NDEBUG |
| // The order in which the rowset tree returns its results doesn't have semantic |
| // relevance. We've had bugs in the past (eg KUDU-1341) which were obscured by |
| // relying on the order of rowsets here. So, in debug builds, we shuffle the |
| // order to encourage finding such bugs more easily. |
| std::random_device rdev; |
| std::mt19937 gen(rdev()); |
| std::shuffle(to_check.begin(), to_check.end(), gen); |
| #endif |
| return to_check; |
| } |
| |
| // If we are replaying an operation during bootstrap, then we already have a |
| // COMMIT message which tells us specifically which memory store to apply it to. |
| for (const auto& store : op->orig_result_from_log->mutated_stores()) { |
| if (store.has_mrs_id()) { |
| to_check.push_back(comps->memrowset.get()); |
| } else { |
| DCHECK(store.has_rs_id()); |
| RowSet* drs = comps->rowsets->drs_by_id(store.rs_id()); |
| if (PREDICT_TRUE(drs)) { |
| to_check.push_back(drs); |
| } |
| |
| // If for some reason we didn't find any stores that the COMMIT message indicated, |
| // then 'to_check' will be empty at this point. That will result in a NotFound() |
| // status below, which the bootstrap code catches and propagates as a tablet |
| // corruption. |
| } |
| } |
| return to_check; |
| } |
| |
| Status Tablet::MutateRowUnlocked(const IOContext* io_context, |
| WriteOpState *op_state, |
| RowOp* op, |
| ProbeStats* stats) { |
| DCHECK(op->checked_present); |
| DCHECK(op->valid); |
| |
| auto* result = google::protobuf::Arena::CreateMessage<OperationResultPB>( |
| op_state->pb_arena()); |
| const TabletComponents* comps = DCHECK_NOTNULL(op_state->tablet_components()); |
| Timestamp ts = op_state->timestamp(); |
| |
| // If we found the row in any existing RowSet, mutate it there. Otherwise |
| // attempt to mutate in the MRS. |
| RowSet* rs_to_attempt = op->present_in_rowset ? |
| op->present_in_rowset : comps->memrowset.get(); |
| Status s = rs_to_attempt->MutateRow(ts, |
| *op->key_probe, |
| op->decoded_op.changelist, |
| op_state->op_id(), |
| io_context, |
| stats, |
| result); |
| if (PREDICT_TRUE(s.ok())) { |
| op->SetMutateSucceeded(result); |
| } else { |
| if (s.IsNotFound()) { |
| RowOperationsPB_Type op_type = op->decoded_op.type; |
| switch (op_type) { |
| case RowOperationsPB::UPDATE_IGNORE: |
| case RowOperationsPB::DELETE_IGNORE: |
| s = Status::OK(); |
| op->SetErrorIgnored(); |
| break; |
| case RowOperationsPB::UPDATE: |
| case RowOperationsPB::DELETE: |
| // Replace internal error messages with one more suitable for users. |
| s = Status::NotFound("key not found"); |
| op->SetFailed(s); |
| break; |
| default: |
| LOG(FATAL) << "Unknown operation type: " << op_type; |
| } |
| } else { |
| op->SetFailed(s); |
| } |
| } |
| return s; |
| } |
| |
| void Tablet::StartApplying(WriteOpState* op_state) { |
| shared_lock<rw_spinlock> l(component_lock_); |
| |
| const auto txn_id = op_state->txn_id(); |
| if (txn_id) { |
| auto txn_rowsets = FindPtrOrNull(uncommitted_rowsets_by_txn_id_, *txn_id); |
| // The provisional rowset for this transaction should have been created |
| // when applying the BEGIN_TXN op. |
| op_state->set_txn_rowsets(txn_rowsets); |
| } |
| op_state->StartApplying(); |
| op_state->set_tablet_components(components_); |
| } |
| |
| void Tablet::StartApplying(ParticipantOpState* op_state) { |
| const auto& op_type = op_state->request()->op().type(); |
| if (op_type == tserver::ParticipantOpPB::FINALIZE_COMMIT) { |
| // NOTE: we may not have an MVCC op if we are bootstrapping and did not |
| // replay a BEGIN_COMMIT op. |
| if (op_state->txn()->commit_op()) { |
| op_state->txn()->commit_op()->StartApplying(); |
| } |
| } |
| } |
| |
| void Tablet::CreateTxnRowSets(int64_t txn_id, scoped_refptr<TxnMetadata> txn_meta) { |
| shared_ptr<MemRowSet> new_mrs; |
| const SchemaPtr schema_ptr = schema(); |
| CHECK_OK(MemRowSet::Create(0, *schema_ptr, txn_id, std::move(txn_meta), |
| log_anchor_registry_.get(), |
| mem_trackers_.tablet_tracker, |
| &new_mrs)); |
| scoped_refptr<TxnRowSets> rowsets(new TxnRowSets(std::move(new_mrs))); |
| { |
| std::lock_guard<rw_spinlock> l(component_lock_); |
| // TODO(awong): can we ever get here? |
| if (ContainsKey(uncommitted_rowsets_by_txn_id_, txn_id)) { |
| return; |
| } |
| // We are guaranteed to succeed here because this is only ever called by |
| // the BEGIN_TXN op, which is only applied once per transaction |
| // participant. |
| EmplaceOrDie(&uncommitted_rowsets_by_txn_id_, txn_id, std::move(rowsets)); |
| } |
| } |
| |
| Status Tablet::BulkCheckPresence(const IOContext* io_context, WriteOpState* op_state) { |
| int num_ops = op_state->row_ops().size(); |
| |
| // TODO(todd) determine why we sometimes get empty writes! |
| if (PREDICT_FALSE(num_ops == 0)) return Status::OK(); |
| |
| // The compiler seems to be bad at hoisting this load out of the loops, |
| // so load it up top. |
| RowOp* const * row_ops_base = op_state->row_ops().data(); |
| |
| // Run all of the ops through the RowSetTree. |
| vector<pair<Slice, int>> keys_and_indexes; |
| keys_and_indexes.reserve(num_ops); |
| for (int i = 0; i < num_ops; i++) { |
| RowOp* op = row_ops_base[i]; |
| // If the op already failed in validation, or if we've got the original result |
| // filled in already during replay, then we don't need to consult the RowSetTree. |
| if (op->has_result() || op->orig_result_from_log) continue; |
| keys_and_indexes.emplace_back(op->key_probe->encoded_key_slice(), i); |
| } |
| |
| // Sort the query points by their probe keys, retaining the equivalent indexes. |
| // |
| // It's important to do a stable-sort here so that the 'unique' call |
| // below retains only the _first_ op the user specified, instead of |
| // an arbitrary one. |
| // |
| // TODO(todd): benchmark stable_sort vs using sort() and falling back to |
| // comparing 'a.second' when a.first == b.first. Some microbenchmarks |
| // seem to indicate stable_sort is actually faster. |
| // TODO(todd): could also consider weaving in a check in the loop above to |
| // see if the incoming batch is already totally-ordered and in that case |
| // skip this sort and std::unique call. |
| std::stable_sort(keys_and_indexes.begin(), keys_and_indexes.end(), |
| [](const pair<Slice, int>& a, |
| const pair<Slice, int>& b) { |
| return a.first < b.first; |
| }); |
| // If the batch has more than one operation for the same row, then we can't |
| // use the up-front presence optimization on those operations, since the |
| // first operation may change the result of the later presence-checks. |
| keys_and_indexes.erase(std::unique( |
| keys_and_indexes.begin(), keys_and_indexes.end(), |
| [](const pair<Slice, int>& a, |
| const pair<Slice, int>& b) { |
| return a.first == b.first; |
| }), keys_and_indexes.end()); |
| |
| // Unzip the keys into a separate array (since the RowSetTree API just wants a vector of |
| // Slices) |
| vector<Slice> keys(keys_and_indexes.size()); |
| for (int i = 0; i < keys.size(); i++) { |
| keys[i] = keys_and_indexes[i].first; |
| } |
| |
| // Check the presence in the committed transaction memrowsets. |
| // NOTE: we don't have to check whether the row is in the main memrowset; if |
| // it does exist there, we'll discover that when we try to insert into it. |
| const TabletComponents* comps = DCHECK_NOTNULL(op_state->tablet_components()); |
| for (auto& key_and_index : keys_and_indexes) { |
| int idx = key_and_index.second; |
| RowOp* op = row_ops_base[idx]; |
| if (op->present_in_rowset) { |
| continue; |
| } |
| bool present = false; |
| for (const auto& mrs : comps->txn_memrowsets) { |
| RETURN_NOT_OK_PREPEND(mrs->CheckRowPresent(*op->key_probe, io_context, &present, |
| op_state->mutable_op_stats(idx)), |
| Substitute("Tablet $0 failed to check row presence for op $1", |
| tablet_id(), op->ToString(key_schema_))); |
| if (present) { |
| op->present_in_rowset = mrs.get(); |
| break; |
| } |
| } |
| } |
| |
| // Perform the presence checks on the other rowsets. We use the "bulk query" |
| // functionality provided by RowSetTree::ForEachRowSetContainingKeys(), which |
| // yields results via a callback, with grouping guarantees that callbacks for |
| // the same RowSet will be grouped together with increasing query keys. |
| // |
| // We want to process each such "group" (set of subsequent calls for the same |
| // RowSet) one at a time. So, the callback itself aggregates results into |
| // 'pending_group' and then calls 'ProcessPendingGroup' when the next group |
| // begins. |
| vector<pair<RowSet*, int>> pending_group; |
| Status s; |
| const auto& ProcessPendingGroup = [&]() { |
| if (pending_group.empty() || !s.ok()) return; |
| // Check invariant of the batch RowSetTree query: within each output group |
| // we should have fully-sorted keys. |
| DCHECK(std::is_sorted(pending_group.begin(), pending_group.end(), |
| [&](const pair<RowSet*, int>& a, |
| const pair<RowSet*, int>& b) { |
| return keys[a.second] < keys[b.second]; |
| })); |
| RowSet* rs = pending_group[0].first; |
| for (auto it = pending_group.begin(); it != pending_group.end(); ++it) { |
| DCHECK_EQ(it->first, rs) << "All results within a group should be for the same RowSet"; |
| int op_idx = keys_and_indexes[it->second].second; |
| RowOp* op = row_ops_base[op_idx]; |
| if (op->present_in_rowset) { |
| // Already found this op present somewhere. |
| continue; |
| } |
| |
| bool present = false; |
| s = rs->CheckRowPresent(*op->key_probe, io_context, |
| &present, op_state->mutable_op_stats(op_idx)); |
| if (PREDICT_FALSE(!s.ok())) { |
| LOG(WARNING) << Substitute("Tablet $0 failed to check row presence for op $1: $2", |
| tablet_id(), op->ToString(key_schema_), s.ToString()); |
| return; |
| } |
| if (present) { |
| op->present_in_rowset = rs; |
| } |
| } |
| pending_group.clear(); |
| }; |
| comps->rowsets->ForEachRowSetContainingKeys( |
| keys, |
| [&](RowSet* rs, int i) { |
| if (!pending_group.empty() && rs != pending_group.back().first) { |
| ProcessPendingGroup(); |
| } |
| pending_group.emplace_back(rs, i); |
| }); |
| // Process the last group. |
| ProcessPendingGroup(); |
| RETURN_NOT_OK_PREPEND(s, "Error while checking presence of rows"); |
| |
| // Mark all of the ops as having been checked. |
| // TODO(todd): this could potentially be weaved into the std::unique() call up |
| // above to avoid some cache misses. |
| for (auto& p : keys_and_indexes) { |
| row_ops_base[p.second]->checked_present = true; |
| } |
| return Status::OK(); |
| } |
| |
| bool Tablet::HasBeenStopped() const { |
| State s = state_.load(); |
| return s == kStopped || s == kShutdown; |
| } |
| |
| Status Tablet::CheckHasNotBeenStopped(State* cur_state) const { |
| State s = state_.load(); |
| if (PREDICT_FALSE(s == kStopped || s == kShutdown)) { |
| return Status::IllegalState("Tablet has been stopped"); |
| } |
| if (cur_state) { |
| *cur_state = s; |
| } |
| return Status::OK(); |
| } |
| |
| void Tablet::BeginTransaction(Txn* txn, |
| const OpId& op_id) { |
| unique_ptr<MinLogIndexAnchorer> anchor(new MinLogIndexAnchorer(log_anchor_registry_.get(), |
| Substitute("BEGIN_TXN-$0-$1", txn->txn_id(), txn))); |
| anchor->AnchorIfMinimum(op_id.index()); |
| metadata_->AddTxnMetadata(txn->txn_id(), std::move(anchor)); |
| const auto& txn_id = txn->txn_id(); |
| CreateTxnRowSets(txn_id, FindOrDie(metadata_->GetTxnMetadata(), txn_id)); |
| txn->BeginTransaction(); |
| } |
| |
| void Tablet::BeginCommit(Txn* txn, Timestamp mvcc_op_ts, const OpId& op_id) { |
| unique_ptr<MinLogIndexAnchorer> anchor(new MinLogIndexAnchorer(log_anchor_registry_.get(), |
| Substitute("BEGIN_COMMIT-$0-$1", txn->txn_id(), txn))); |
| anchor->AnchorIfMinimum(op_id.index()); |
| metadata_->BeginCommitTransaction(txn->txn_id(), mvcc_op_ts, std::move(anchor)); |
| txn->BeginCommit(op_id); |
| } |
| |
| void Tablet::CommitTransaction(Txn* txn, Timestamp commit_ts, const OpId& op_id) { |
| unique_ptr<MinLogIndexAnchorer> anchor(new MinLogIndexAnchorer(log_anchor_registry_.get(), |
| Substitute("FINALIZE_COMMIT-$0-$1", txn->txn_id(), txn))); |
| anchor->AnchorIfMinimum(op_id.index()); |
| |
| const auto& txn_id = txn->txn_id(); |
| metadata_->AddCommitTimestamp(txn_id, commit_ts, std::move(anchor)); |
| CommitTxnRowSets(txn_id); |
| txn->FinalizeCommit(commit_ts.value()); |
| } |
| |
| void Tablet::CommitTxnRowSets(int64_t txn_id) { |
| std::lock_guard<rw_spinlock> lock(component_lock_); |
| auto txn_rowsets = EraseKeyReturnValuePtr(&uncommitted_rowsets_by_txn_id_, txn_id); |
| CHECK(txn_rowsets); |
| auto committed_mrss = components_->txn_memrowsets; |
| committed_mrss.emplace_back(txn_rowsets->memrowset); |
| components_ = new TabletComponents(components_->memrowset, |
| std::move(committed_mrss), |
| components_->rowsets); |
| } |
| |
| void Tablet::AbortTransaction(Txn* txn, const OpId& op_id) { |
| unique_ptr<MinLogIndexAnchorer> anchor(new MinLogIndexAnchorer(log_anchor_registry_.get(), |
| Substitute("ABORT_TXN-$0-$1", txn->txn_id(), txn))); |
| anchor->AnchorIfMinimum(op_id.index()); |
| const auto& txn_id = txn->txn_id(); |
| metadata_->AbortTransaction(txn_id, std::move(anchor)); |
| { |
| std::lock_guard<rw_spinlock> lock(component_lock_); |
| uncommitted_rowsets_by_txn_id_.erase(txn_id); |
| } |
| txn->AbortTransaction(); |
| } |
| |
| Status Tablet::ApplyRowOperations(WriteOpState* op_state) { |
| int num_ops = op_state->row_ops().size(); |
| |
| StartApplying(op_state); |
| |
| IOContext io_context({ tablet_id() }); |
| RETURN_NOT_OK(BulkCheckPresence(&io_context, op_state)); |
| |
| // Actually apply the ops. |
| for (int op_idx = 0; op_idx < num_ops; op_idx++) { |
| RowOp* row_op = op_state->row_ops()[op_idx]; |
| if (row_op->has_result()) continue; |
| RETURN_NOT_OK(ApplyRowOperation(&io_context, op_state, row_op, |
| op_state->mutable_op_stats(op_idx))); |
| DCHECK(row_op->has_result()); |
| } |
| |
| { |
| std::lock_guard<rw_spinlock> l(last_rw_time_lock_); |
| last_write_time_ = MonoTime::Now(); |
| } |
| |
| if (metrics_ && num_ops > 0) { |
| metrics_->AddProbeStats(op_state->mutable_op_stats(0), num_ops, op_state->arena()); |
| } |
| return Status::OK(); |
| } |
| |
| Status Tablet::ApplyRowOperation(const IOContext* io_context, |
| WriteOpState* op_state, |
| RowOp* row_op, |
| ProbeStats* stats) { |
| if (!ValidateOpOrMarkFailed(row_op)) { |
| return Status::OK(); |
| } |
| |
| { |
| State s; |
| RETURN_NOT_OK_PREPEND(CheckHasNotBeenStopped(&s), |
| Substitute("Apply of $0 exited early", op_state->ToString())); |
| CHECK(s == kOpen || s == kBootstrapping); |
| } |
| DCHECK(op_state != nullptr) << "must have a WriteOpState"; |
| DCHECK(op_state->op_id().IsInitialized()) << "OpState OpId needed for anchoring"; |
| DCHECK_EQ(op_state->schema_at_decode_time(), schema().get()); |
| |
| // If we were unable to check rowset presence in batch (e.g. because we are processing |
| // a batch which contains some duplicate keys) we need to do so now. |
| if (PREDICT_FALSE(!row_op->checked_present)) { |
| vector<RowSet *> to_check = FindRowSetsToCheck(row_op, |
| op_state->tablet_components()); |
| for (RowSet *rowset : to_check) { |
| bool present = false; |
| RETURN_NOT_OK_PREPEND(rowset->CheckRowPresent(*row_op->key_probe, io_context, |
| &present, stats), |
| "Failed to check if row is present"); |
| if (present) { |
| row_op->present_in_rowset = rowset; |
| break; |
| } |
| } |
| row_op->checked_present = true; |
| } |
| |
| Status s; |
| switch (row_op->decoded_op.type) { |
| case RowOperationsPB::INSERT: |
| case RowOperationsPB::INSERT_IGNORE: |
| case RowOperationsPB::UPSERT: |
| case RowOperationsPB::UPSERT_IGNORE: |
| s = InsertOrUpsertUnlocked(io_context, op_state, row_op, stats); |
| if (s.IsAlreadyPresent() || s.IsImmutable()) { |
| return Status::OK(); |
| } |
| return s; |
| |
| case RowOperationsPB::UPDATE: |
| case RowOperationsPB::UPDATE_IGNORE: |
| case RowOperationsPB::DELETE: |
| case RowOperationsPB::DELETE_IGNORE: |
| s = MutateRowUnlocked(io_context, op_state, row_op, stats); |
| if (s.IsNotFound()) { |
| return Status::OK(); |
| } |
| return s; |
| |
| default: |
| LOG_WITH_PREFIX(FATAL) << RowOperationsPB::Type_Name(row_op->decoded_op.type); |
| } |
| return Status::OK(); |
| } |
| |
| void Tablet::ModifyRowSetTree(const RowSetTree& old_tree, |
| const RowSetVector& rowsets_to_remove, |
| const RowSetVector& rowsets_to_add, |
| RowSetTree* new_tree) { |
| RowSetVector post_swap; |
| |
| // O(n^2) diff algorithm to collect the set of rowsets excluding |
| // the rowsets that were included in the compaction |
| int num_removed = 0; |
| |
| for (const shared_ptr<RowSet> &rs : old_tree.all_rowsets()) { |
| // Determine if it should be removed |
| bool should_remove = false; |
| for (const shared_ptr<RowSet> &to_remove : rowsets_to_remove) { |
| if (to_remove == rs) { |
| should_remove = true; |
| num_removed++; |
| break; |
| } |
| } |
| if (!should_remove) { |
| post_swap.push_back(rs); |
| } |
| } |
| |
| CHECK_EQ(num_removed, rowsets_to_remove.size()); |
| |
| // Then push the new rowsets on the end of the new list |
| std::copy(rowsets_to_add.begin(), |
| rowsets_to_add.end(), |
| std::back_inserter(post_swap)); |
| |
| |
| CHECK_OK(new_tree->Reset(post_swap)); |
| } |
| |
| void Tablet::AtomicSwapRowSets(const RowSetVector &to_remove, |
| const RowSetVector &to_add) { |
| std::lock_guard<rw_spinlock> lock(component_lock_); |
| AtomicSwapRowSetsUnlocked(to_remove, to_add); |
| } |
| |
| void Tablet::AtomicSwapRowSetsUnlocked(const RowSetVector &to_remove, |
| const RowSetVector &to_add) { |
| DCHECK(component_lock_.is_locked()); |
| |
| auto new_tree(make_shared<RowSetTree>()); |
| ModifyRowSetTree(*components_->rowsets, to_remove, to_add, new_tree.get()); |
| |
| components_ = new TabletComponents(components_->memrowset, |
| components_->txn_memrowsets, |
| std::move(new_tree)); |
| } |
| |
| Status Tablet::DoMajorDeltaCompaction(const vector<ColumnId>& col_ids, |
| const shared_ptr<RowSet>& input_rs, |
| const IOContext* io_context) { |
| RETURN_IF_STOPPED_OR_CHECK_STATE(kOpen); |
| Status s = down_cast<DiskRowSet*>(input_rs.get()) |
| ->MajorCompactDeltaStoresWithColumnIds(col_ids, io_context, GetHistoryGcOpts()); |
| return s; |
| } |
| |
| bool Tablet::GetTabletAncientHistoryMark(Timestamp* ancient_history_mark) const { |
| int32_t tablet_history_max_age_sec = FLAGS_tablet_history_max_age_sec; |
| if (metadata_->extra_config() && metadata_->extra_config()->has_history_max_age_sec()) { |
| // Override the global configuration with the configuration of the table |
| tablet_history_max_age_sec = metadata_->extra_config()->history_max_age_sec(); |
| } |
| // We currently only support history GC through a fully-instantiated tablet |
| // when using the HybridClock, since we can calculate the age of a mutation. |
| if (!clock_->HasPhysicalComponent() || tablet_history_max_age_sec < 0) { |
| return false; |
| } |
| Timestamp now = clock_->Now(); |
| uint64_t now_micros = HybridClock::GetPhysicalValueMicros(now); |
| uint64_t max_age_micros = tablet_history_max_age_sec * 1000000ULL; |
| // Ensure that the AHM calculation doesn't underflow when |
| // '--tablet_history_max_age_sec' is set to a very high value. |
| if (max_age_micros <= now_micros) { |
| *ancient_history_mark = |
| HybridClock::TimestampFromMicrosecondsAndLogicalValue( |
| now_micros - max_age_micros, |
| HybridClock::GetLogicalValue(now)); |
| } else { |
| *ancient_history_mark = Timestamp(0); |
| } |
| return true; |
| } |
| |
| HistoryGcOpts Tablet::GetHistoryGcOpts() const { |
| Timestamp ancient_history_mark; |
| if (GetTabletAncientHistoryMark(&ancient_history_mark)) { |
| return HistoryGcOpts::Enabled(ancient_history_mark); |
| } |
| return HistoryGcOpts::Disabled(); |
| } |
| |
| Status Tablet::Flush() { |
| TRACE_EVENT1("tablet", "Tablet::Flush", "id", tablet_id()); |
| std::lock_guard<Semaphore> lock(rowsets_flush_sem_); |
| return FlushUnlocked(); |
| } |
| |
| Status Tablet::FlushUnlocked() { |
| TRACE_EVENT0("tablet", "Tablet::FlushUnlocked"); |
| RETURN_NOT_OK(CheckHasNotBeenStopped()); |
| RowSetsInCompaction input; |
| vector<shared_ptr<MemRowSet>> old_mrss; |
| { |
| // Create a new MRS with the latest schema. |
| std::lock_guard<rw_spinlock> lock(component_lock_); |
| RETURN_NOT_OK(ReplaceMemRowSetsUnlocked(&input, &old_mrss)); |
| DCHECK_GE(old_mrss.size(), 1); |
| } |
| |
| // Wait for any in-flight ops to finish against the old MRS before we flush |
| // it. |
| // |
| // This may fail if the tablet has been stopped. |
| RETURN_NOT_OK(mvcc_.WaitForApplyingOpsToApply()); |
| |
| { |
| State s; |
| RETURN_NOT_OK(CheckHasNotBeenStopped(&s)); |
| CHECK(s == kOpen || s == kBootstrapping); |
| } |
| |
| // Freeze the old memrowset by blocking readers and swapping it in as a new |
| // rowset, replacing it with an empty one. |
| // |
| // At this point, we have already swapped in a new empty rowset, and any new |
| // inserts are going into that one. 'old_ms' is effectively frozen -- no new |
| // inserts should arrive after this point. |
| // |
| // NOTE: updates and deletes may still arrive into 'old_ms' at this point. |
| // |
| // TODO(perf): there's a memrowset.Freeze() call which we might be able to |
| // use to improve iteration performance during the flush. The old design |
| // used this, but not certain whether it's still doable with the new design. |
| |
| uint64_t start_insert_count = 0; |
| // Keep track of the main MRS. |
| int64_t main_mrs_id = -1; |
| vector<TxnInfoBeingFlushed> txns_being_flushed; |
| for (const auto& old_mrs : old_mrss) { |
| start_insert_count += old_mrs->debug_insert_count(); |
| if (old_mrs->txn_id()) { |
| txns_being_flushed.emplace_back(*old_mrs->txn_id()); |
| } else { |
| DCHECK_EQ(-1, main_mrs_id); |
| main_mrs_id = old_mrs->mrs_id(); |
| } |
| } |
| DCHECK_NE(-1, main_mrs_id); |
| |
| if (old_mrss.size() == 1 && old_mrss[0]->empty()) { |
| // If we're flushing an empty RowSet, we can short circuit here rather than |
| // waiting until the check at the end of DoCompactionAndFlush(). This avoids |
| // the need to create cfiles and write their headers only to later delete |
| // them. |
| LOG_WITH_PREFIX(INFO) << "MemRowSet was empty: no flush needed."; |
| return HandleEmptyCompactionOrFlush(input.rowsets(), main_mrs_id, txns_being_flushed); |
| } |
| |
| if (flush_hooks_) { |
| RETURN_NOT_OK_PREPEND(flush_hooks_->PostSwapNewMemRowSet(), |
| "PostSwapNewMemRowSet hook failed"); |
| } |
| |
| if (VLOG_IS_ON(1)) { |
| size_t memory_footprint = 0; |
| for (const auto& old_mrs : old_mrss) { |
| memory_footprint += old_mrs->memory_footprint(); |
| } |
| VLOG_WITH_PREFIX(1) << Substitute("Flush: entering stage 1 (old memrowset" |
| "already frozen for inserts). Memstore" |
| "in-memory size: $0 bytes", |
| memory_footprint); |
| } |
| |
| RETURN_NOT_OK(DoMergeCompactionOrFlush(input, main_mrs_id, txns_being_flushed)); |
| |
| uint64_t end_insert_count = 0; |
| for (const auto& old_mrs : old_mrss) { |
| end_insert_count += old_mrs->debug_insert_count(); |
| } |
| // Sanity check that no insertions happened during our flush. |
| CHECK_EQ(start_insert_count, end_insert_count) |
| << "Sanity check failed: insertions continued in memrowset " |
| << "after flush was triggered! Aborting to prevent data loss."; |
| |
| return Status::OK(); |
| } |
| |
| Status Tablet::ReplaceMemRowSetsUnlocked(RowSetsInCompaction* compaction, |
| vector<shared_ptr<MemRowSet>>* old_mrss) { |
| DCHECK(old_mrss->empty()); |
| old_mrss->emplace_back(components_->memrowset); |
| for (const auto& committed_mrs : components_->txn_memrowsets) { |
| old_mrss->emplace_back(committed_mrs); |
| } |
| // Mark the memrowsets as locked, so compactions won't consider it |
| // for inclusion in any concurrent compactions. |
| for (auto& mrs : *old_mrss) { |
| std::unique_lock<std::mutex> ms_lock(*mrs->compact_flush_lock(), std::try_to_lock); |
| CHECK(ms_lock.owns_lock()); |
| compaction->AddRowSet(mrs, std::move(ms_lock)); |
| } |
| |
| shared_ptr<MemRowSet> new_mrs; |
| const SchemaPtr schema_ptr = schema(); |
| RETURN_NOT_OK(MemRowSet::Create(next_mrs_id_++, *schema_ptr, |
| log_anchor_registry_.get(), |
| mem_trackers_.tablet_tracker, |
| &new_mrs)); |
| auto new_rst(make_shared<RowSetTree>()); |
| ModifyRowSetTree(*components_->rowsets, |
| RowSetVector(), // remove nothing |
| RowSetVector(old_mrss->begin(), old_mrss->end()), // add the old MRSs |
| new_rst.get()); |
| // Swap it in |
| components_.reset(new TabletComponents(std::move(new_mrs), {}, std::move(new_rst))); |
| return Status::OK(); |
| } |
| |
| Status Tablet::CreatePreparedAlterSchema(AlterSchemaOpState* op_state, |
| const SchemaPtr& schema) { |
| |
| if (!schema->has_column_ids()) { |
| // this probably means that the request is not from the Master |
| return Status::InvalidArgument("Missing Column IDs"); |
| } |
| |
| // Alter schema must run when no reads/writes are in progress. |
| // However, compactions and flushes can continue to run in parallel |
| // with the schema change, |
| op_state->AcquireSchemaLock(&schema_lock_); |
| |
| op_state->set_schema(schema); |
| return Status::OK(); |
| } |
| |
| Status Tablet::AlterSchema(AlterSchemaOpState* op_state) { |
| DCHECK(key_schema_.KeyTypeEquals(*DCHECK_NOTNULL(op_state->schema().get()))) |
| << "Schema keys cannot be altered(except name)"; |
| |
| // Prevent any concurrent flushes. Otherwise, we run into issues where |
| // we have an MRS in the rowset tree, and we can't alter its schema |
| // in-place. |
| std::lock_guard<Semaphore> lock(rowsets_flush_sem_); |
| |
| // If the current version >= new version, there is nothing to do. |
| const bool same_schema = (*schema() == *op_state->schema()); |
| if (metadata_->schema_version() >= op_state->schema_version()) { |
| const string msg = |
| Substitute("Skipping requested alter to schema version $0, tablet already " |
| "version $1", op_state->schema_version(), metadata_->schema_version()); |
| LOG_WITH_PREFIX(INFO) << msg; |
| op_state->SetError(Status::InvalidArgument(msg)); |
| return Status::OK(); |
| } |
| |
| LOG_WITH_PREFIX(INFO) << "Alter schema from " << schema()->ToString() |
| << " version " << metadata_->schema_version() |
| << " to " << op_state->schema()->ToString() |
| << " version " << op_state->schema_version(); |
| DCHECK(schema_lock_.is_locked()); |
| metadata_->SetSchema(op_state->schema(), op_state->schema_version()); |
| if (op_state->has_new_table_name()) { |
| metadata_->SetTableName(op_state->new_table_name()); |
| if (metric_entity_) { |
| metric_entity_->SetAttribute("table_name", op_state->new_table_name()); |
| } |
| } |
| if (op_state->has_new_extra_config()) { |
| metadata_->SetExtraConfig(op_state->new_extra_config()); |
| } |
| |
| // If the current schema and the new one are equal, there is nothing to do. |
| if (same_schema) { |
| return metadata_->Flush(); |
| } |
| |
| return FlushUnlocked(); |
| } |
| |
| Status Tablet::RewindSchemaForBootstrap(const Schema& new_schema, |
| int64_t schema_version) { |
| RETURN_IF_STOPPED_OR_CHECK_STATE(kBootstrapping); |
| |
| // We know that the MRS should be empty at this point, because we |
| // rewind the schema before replaying any operations. So, we just |
| // swap in a new one with the correct schema, rather than attempting |
| // to flush. |
| VLOG_WITH_PREFIX(1) << "Rewinding schema during bootstrap to " << new_schema.ToString(); |
| |
| SchemaPtr schema = std::make_shared<Schema>(new_schema); |
| metadata_->SetSchema(schema, schema_version); |
| { |
| std::lock_guard<rw_spinlock> lock(component_lock_); |
| |
| shared_ptr<MemRowSet> old_mrs = components_->memrowset; |
| shared_ptr<RowSetTree> old_rowsets = components_->rowsets; |
| CHECK(old_mrs->empty()); |
| shared_ptr<MemRowSet> new_mrs; |
| RETURN_NOT_OK(MemRowSet::Create(old_mrs->mrs_id(), *schema, |
| log_anchor_registry_.get(), |
| mem_trackers_.tablet_tracker, |
| &new_mrs)); |
| components_ = new TabletComponents(new_mrs, components_->txn_memrowsets, old_rowsets); |
| } |
| return Status::OK(); |
| } |
| |
| void Tablet::SetCompactionHooksForTests( |
| const shared_ptr<Tablet::CompactionFaultHooks> &hooks) { |
| compaction_hooks_ = hooks; |
| } |
| |
| void Tablet::SetFlushHooksForTests( |
| const shared_ptr<Tablet::FlushFaultHooks> &hooks) { |
| flush_hooks_ = hooks; |
| } |
| |
| void Tablet::SetFlushCompactCommonHooksForTests( |
| const shared_ptr<Tablet::FlushCompactCommonHooks> &hooks) { |
| common_hooks_ = hooks; |
| } |
| |
| int32_t Tablet::CurrentMrsIdForTests() const { |
| shared_lock<rw_spinlock> l(component_lock_); |
| return components_->memrowset->mrs_id(); |
| } |
| |
| bool Tablet::ShouldThrottleAllow(int64_t bytes) { |
| if (!throttler_) { |
| return true; |
| } |
| return throttler_->Take(MonoTime::Now(), 1, bytes); |
| } |
| |
| Status Tablet::PickRowSetsToCompact(RowSetsInCompaction *picked, |
| CompactFlags flags) const { |
| RETURN_IF_STOPPED_OR_CHECK_STATE(kOpen); |
| // Grab a local reference to the current RowSetTree. This is to avoid |
| // holding the component_lock_ for too long. See the comment on component_lock_ |
| // in tablet.h for details on why that would be bad. |
| shared_ptr<RowSetTree> rowsets_copy; |
| { |
| shared_lock<rw_spinlock> l(component_lock_); |
| rowsets_copy = components_->rowsets; |
| } |
| |
| std::lock_guard<std::mutex> compact_lock(compact_select_lock_); |
| CHECK_EQ(picked->num_rowsets(), 0); |
| |
| unordered_set<const RowSet*> picked_set; |
| |
| if (flags & FORCE_COMPACT_ALL) { |
| // Compact all rowsets, regardless of policy. |
| for (const shared_ptr<RowSet>& rs : rowsets_copy->all_rowsets()) { |
| if (rs->IsAvailableForCompaction()) { |
| picked_set.insert(rs.get()); |
| } |
| } |
| } else { |
| // Let the policy decide which rowsets to compact. |
| double quality = 0.0; |
| RETURN_NOT_OK(compaction_policy_->PickRowSets(*rowsets_copy, |
| &picked_set, |
| &quality, |
| /*log=*/nullptr)); |
| VLOG_WITH_PREFIX(2) << "Compaction quality: " << quality; |
| } |
| |
| shared_lock<rw_spinlock> l(component_lock_); |
| for (const shared_ptr<RowSet>& rs : components_->rowsets->all_rowsets()) { |
| if (picked_set.erase(rs.get()) == 0) { |
| // Not picked. |
| continue; |
| } |
| |
| // For every rowset we pick, we have to take its compact_flush_lock. TSAN |
| // disallows taking more than 64 locks in a single thread[1], so for large |
| // compactions this can cause TSAN CHECK failures. To work around, limit the |
| // number of rowsets picked in TSAN to 32. |
| // [1]: https://github.com/google/sanitizers/issues/950 |
| // TODO(wdberkeley): Experiment with a compact_flush lock table instead of |
| // a per-rowset compact_flush lock. |
| #if defined(THREAD_SANITIZER) |
| constexpr auto kMaxPickedUnderTsan = 32; |
| if (picked->num_rowsets() > kMaxPickedUnderTsan) { |
| LOG(WARNING) << Substitute("Limiting compaction to $0 rowsets under TSAN", |
| kMaxPickedUnderTsan); |
| // Clear 'picked_set' to indicate there's no more rowsets we expect |
| // to lock. |
| picked_set.clear(); |
| break; |
| } |
| #endif |
| |
| // Grab the compact_flush_lock: this prevents any other concurrent |
| // compaction from selecting this same rowset, and also ensures that |
| // we don't select a rowset which is currently in the middle of being |
| // flushed. |
| std::unique_lock<std::mutex> lock(*rs->compact_flush_lock(), std::try_to_lock); |
| CHECK(lock.owns_lock()) << rs->ToString() << " appeared available for " |
| "compaction when inputs were selected, but was unable to lock its " |
| "compact_flush_lock to prepare for compaction."; |
| |
| // Push the lock on our scoped list, so we unlock when done. |
| picked->AddRowSet(rs, std::move(lock)); |
| } |
| |
| // When we iterated through the current rowsets, we should have found all of |
| // the rowsets that we picked. If we didn't, that implies that some other |
| // thread swapped them out while we were making our selection decision -- |
| // that's not possible since we only picked rowsets that were marked as |
| // available for compaction. |
| if (!picked_set.empty()) { |
| for (const RowSet* not_found : picked_set) { |
| LOG_WITH_PREFIX(ERROR) << "Rowset selected for compaction but not available anymore: " |
| << not_found->ToString(); |
| } |
| const char* msg = "Was unable to find all rowsets selected for compaction"; |
| LOG_WITH_PREFIX(DFATAL) << msg; |
| return Status::RuntimeError(msg); |
| } |
| return Status::OK(); |
| } |
| |
| bool Tablet::disable_compaction() const { |
| if (metadata_->extra_config() && metadata_->extra_config()->has_disable_compaction()) { |
| return metadata_->extra_config()->disable_compaction(); |
| } |
| return false; |
| } |
| |
| void Tablet::GetRowSetsForTests(RowSetVector* out) { |
| shared_ptr<RowSetTree> rowsets_copy; |
| { |
| shared_lock<rw_spinlock> l(component_lock_); |
| rowsets_copy = components_->rowsets; |
| } |
| for (const shared_ptr<RowSet>& rs : rowsets_copy->all_rowsets()) { |
| out->push_back(rs); |
| } |
| } |
| |
| void Tablet::RegisterMaintenanceOps(MaintenanceManager* maint_mgr) { |
| // This method must be externally synchronized to not coincide with other |
| // calls to it or to UnregisterMaintenanceOps. |
| DFAKE_SCOPED_LOCK(maintenance_registration_fake_lock_); |
| { |
| std::lock_guard<simple_spinlock> l(state_lock_); |
| if (state_ == kStopped || state_ == kShutdown) { |
| LOG(WARNING) << "Could not register maintenance ops"; |
| return; |
| } |
| CHECK_EQ(kOpen, state_); |
| DCHECK(maintenance_ops_.empty()); |
| } |
| |
| vector<MaintenanceOp*> maintenance_ops; |
| unique_ptr<MaintenanceOp> rs_compact_op(new CompactRowSetsOp(this)); |
| maint_mgr->RegisterOp(rs_compact_op.get()); |
| maintenance_ops.push_back(rs_compact_op.release()); |
| |
| unique_ptr<MaintenanceOp> minor_delta_compact_op(new MinorDeltaCompactionOp(this)); |
| maint_mgr->RegisterOp(minor_delta_compact_op.get()); |
| maintenance_ops.push_back(minor_delta_compact_op.release()); |
| |
| unique_ptr<MaintenanceOp> major_delta_compact_op(new MajorDeltaCompactionOp(this)); |
| maint_mgr->RegisterOp(major_delta_compact_op.get()); |
| maintenance_ops.push_back(major_delta_compact_op.release()); |
| |
| unique_ptr<MaintenanceOp> undo_delta_block_gc_op(new UndoDeltaBlockGCOp(this)); |
| maint_mgr->RegisterOp(undo_delta_block_gc_op.get()); |
| maintenance_ops.push_back(undo_delta_block_gc_op.release()); |
| |
| // The deleted rowset GC operation relies on live rowset counting. If this |
| // tablet doesn't support such counting, do not register the op. |
| if (metadata_->supports_live_row_count()) { |
| unique_ptr<MaintenanceOp> deleted_rowset_gc_op(new DeletedRowsetGCOp(this)); |
| maint_mgr->RegisterOp(deleted_rowset_gc_op.get()); |
| maintenance_ops.push_back(deleted_rowset_gc_op.release()); |
| } |
| |
| std::lock_guard<simple_spinlock> l(state_lock_); |
| maintenance_ops_.swap(maintenance_ops); |
| } |
| |
| void Tablet::UnregisterMaintenanceOps() { |
| // This method must be externally synchronized to not coincide with other |
| // calls to it or to RegisterMaintenanceOps. |
| DFAKE_SCOPED_LOCK(maintenance_registration_fake_lock_); |
| |
| // First cancel all of the operations, so that while we're waiting for one |
| // operation to finish in Unregister(), a different one can't get re-scheduled. |
| CancelMaintenanceOps(); |
| |
| // We don't lock here because unregistering ops may take a long time. |
| // 'maintenance_registration_fake_lock_' is sufficient to ensure nothing else |
| // is updating 'maintenance_ops_'. |
| for (MaintenanceOp* op : maintenance_ops_) { |
| op->Unregister(); |
| } |
| |
| // Finally, delete the ops under lock. |
| std::lock_guard<simple_spinlock> l(state_lock_); |
| STLDeleteElements(&maintenance_ops_); |
| } |
| |
| void Tablet::CancelMaintenanceOps() { |
| std::lock_guard<simple_spinlock> l(state_lock_); |
| for (MaintenanceOp* op : maintenance_ops_) { |
| op->CancelAndDisable(); |
| } |
| } |
| |
| Status Tablet::FlushMetadata(const RowSetVector& to_remove, |
| const RowSetMetadataVector& to_add, |
| int64_t mrs_being_flushed, |
| const vector<TxnInfoBeingFlushed>& txns_being_flushed) { |
| RowSetMetadataIds to_remove_meta; |
| for (const shared_ptr<RowSet>& rowset : to_remove) { |
| // Skip MemRowSet & DuplicatingRowSets which don't have metadata. |
| if (rowset->metadata().get() == nullptr) { |
| continue; |
| } |
| to_remove_meta.insert(rowset->metadata()->id()); |
| } |
| |
| return metadata_->UpdateAndFlush(to_remove_meta, to_add, mrs_being_flushed, |
| txns_being_flushed); |
| } |
| |
| Status Tablet::DoMergeCompactionOrFlush(const RowSetsInCompaction &input, |
| int64_t mrs_being_flushed, |
| const vector<TxnInfoBeingFlushed>& txns_being_flushed) { |
| const char *op_name = |
| (mrs_being_flushed == TabletMetadata::kNoMrsFlushed) ? "Compaction" : "Flush"; |
| TRACE_EVENT2("tablet", "Tablet::DoMergeCompactionOrFlush", |
| "tablet_id", tablet_id(), |
| "op", op_name); |
| |
| const auto& tid = tablet_id(); |
| const IOContext io_context({ tid }); |
| |
| MvccSnapshot flush_snap(mvcc_); |
| VLOG_WITH_PREFIX(1) << Substitute("$0: entering phase 1 (flushing snapshot). " |
| "Phase 1 snapshot: $1", |
| op_name, flush_snap.ToString()); |
| |
| if (common_hooks_) { |
| RETURN_NOT_OK_PREPEND(common_hooks_->PostTakeMvccSnapshot(), |
| "PostTakeMvccSnapshot hook failed"); |
| } |
| |
| shared_ptr<CompactionInput> merge; |
| const SchemaPtr schema_ptr = schema(); |
| RETURN_NOT_OK(input.CreateCompactionInput(flush_snap, schema_ptr.get(), &io_context, &merge)); |
| |
| RollingDiskRowSetWriter drsw(metadata_.get(), merge->schema(), DefaultBloomSizing(), |
| compaction_policy_->target_rowset_size()); |
| RETURN_NOT_OK_PREPEND(drsw.Open(), "Failed to open DiskRowSet for flush"); |
| |
| HistoryGcOpts history_gc_opts = GetHistoryGcOpts(); |
| RETURN_NOT_OK_PREPEND( |
| FlushCompactionInput( |
| tid, metadata_->fs_manager()->block_manager()->error_manager(), |
| merge.get(), flush_snap, history_gc_opts, &drsw), |
| "Flush to disk failed"); |
| RETURN_NOT_OK_PREPEND(drsw.Finish(), "Failed to finish DRS writer"); |
| |
| if (common_hooks_) { |
| RETURN_NOT_OK_PREPEND(common_hooks_->PostWriteSnapshot(), |
| "PostWriteSnapshot hook failed"); |
| } |
| |
| // Though unlikely, it's possible that no rows were written because all of |
| // the input rows were GCed in this compaction. In that case, we don't |
| // actually want to reopen. |
| if (drsw.rows_written_count() == 0) { |
| LOG_WITH_PREFIX(INFO) << op_name << " resulted in no output rows (all input rows " |
| << "were GCed!) Removing all input rowsets."; |
| return HandleEmptyCompactionOrFlush(input.rowsets(), mrs_being_flushed, |
| txns_being_flushed); |
| } |
| |
| // The RollingDiskRowSet writer wrote out one or more RowSets as the |
| // output. Open these into 'new_rowsets'. |
| vector<shared_ptr<RowSet> > new_disk_rowsets; |
| RowSetMetadataVector new_drs_metas; |
| drsw.GetWrittenRowSetMetadata(&new_drs_metas); |
| |
| if (metrics_.get()) metrics_->bytes_flushed->IncrementBy(drsw.written_size()); |
| CHECK(!new_drs_metas.empty()); |
| { |
| TRACE_EVENT0("tablet", "Opening compaction results"); |
| for (const shared_ptr<RowSetMetadata>& meta : new_drs_metas) { |
| // TODO(awong): it'd be nice to plumb delta stats from the rowset writer |
| // into the new deltafile readers opened here. |
| shared_ptr<DiskRowSet> new_rowset; |
| Status s = DiskRowSet::Open(meta, |
| log_anchor_registry_.get(), |
| mem_trackers_, |
| &io_context, |
| &new_rowset); |
| if (!s.ok()) { |
| LOG_WITH_PREFIX(WARNING) << "Unable to open snapshot " << op_name << " results " |
| << meta->ToString() << ": " << s.ToString(); |
| return s; |
| } |
| new_disk_rowsets.push_back(new_rowset); |
| } |
| } |
| |
| // Setup for Phase 2: Start duplicating any new updates into the new on-disk |
| // rowsets. |
| // |
| // During Phase 1, we may have missed some updates which came into the input |
| // rowsets while we were writing. So, we can't immediately start reading from |
| // the on-disk rowsets alone. Starting here, we continue to read from the |
| // original rowset(s), but mirror updates to both the input and the output |
| // data. |
| // |
| // It's crucial that, during the rest of the compaction, we do not allow the |
| // output rowsets to flush their deltas to disk. This is to avoid the following |
| // bug: |
| // - during phase 1, timestamp 1 updates a flushed row. This is only reflected in the |
| // input rowset. (ie it is a "missed delta") |
| // - during phase 2, timestamp 2 updates the same row. This is reflected in both the |
| // input and output, because of the DuplicatingRowSet. |
| // - now suppose the output rowset were allowed to flush deltas. This would create the |
| // first DeltaFile for the output rowset, with only timestamp 2. |
| // - Now we run the "ReupdateMissedDeltas", and copy over the first op to the |
| // output DMS, which later flushes. |
| // The end result would be that redos[0] has timestamp 2, and redos[1] has timestamp 1. |
| // This breaks an invariant that the redo files are time-ordered, and we would probably |
| // reapply the deltas in the wrong order on the read path. |
| // |
| // The way that we avoid this case is that DuplicatingRowSet's FlushDeltas method is a |
| // no-op. |
| VLOG_WITH_PREFIX(1) << Substitute("$0: entering phase 2 (starting to " |
| "duplicate updates in new rowsets)", |
| op_name); |
| shared_ptr<DuplicatingRowSet> inprogress_rowset( |
| new DuplicatingRowSet(input.rowsets(), new_disk_rowsets)); |
| |
| // The next step is to swap in the DuplicatingRowSet, and at the same time, |
| // determine an MVCC snapshot which includes all of the ops that saw a |
| // pre-DuplicatingRowSet version of components_. |
| MvccSnapshot non_duplicated_ops_snap; |
| vector<Timestamp> applying_during_swap; |
| { |
| TRACE_EVENT0("tablet", "Swapping DuplicatingRowSet"); |
| // Taking component_lock_ in write mode ensures that no new ops can |
| // StartApplying() (or snapshot components_) during this block. |
| std::lock_guard<rw_spinlock> lock(component_lock_); |
| AtomicSwapRowSetsUnlocked(input.rowsets(), { inprogress_rowset }); |
| |
| // NOTE: ops may *commit* in between these two lines. |
| // We need to make sure all such ops end up in the 'applying_during_swap' |
| // list, the 'non_duplicated_ops_snap' snapshot, or both. Thus it's crucial |
| // that these next two lines are in this order! |
| mvcc_.GetApplyingOpsTimestamps(&applying_during_swap); |
| non_duplicated_ops_snap = MvccSnapshot(mvcc_); |
| } |
| |
| // All ops committed in 'non_duplicated_ops_snap' saw the pre-swap |
| // components_. Additionally, any ops that were APPLYING during the above |
| // block by definition _started_ doing so before the swap. Hence those ops |
| // also need to get included in non_duplicated_ops_snap. To do so, we wait |
| // for them to commit, and then manually include them into our snapshot. |
| if (VLOG_IS_ON(1) && !applying_during_swap.empty()) { |
| VLOG_WITH_PREFIX(1) << "Waiting for " << applying_during_swap.size() |
| << " mid-APPLY ops to commit before finishing compaction..."; |
| for (const Timestamp& ts : applying_during_swap) { |
| VLOG_WITH_PREFIX(1) << " " << ts.value(); |
| } |
| } |
| |
| // This wait is a little bit conservative - technically we only need to wait |
| // for those ops in 'applying_during_swap', but MVCC doesn't implement the |
| // ability to wait for a specific set. So instead we wait for all currently |
| // applying -- a bit more than we need, but still correct. |
| RETURN_NOT_OK(mvcc_.WaitForApplyingOpsToApply()); |
| |
| // Then we want to consider all those ops that were in-flight when we did the |
| // swap as committed in 'non_duplicated_ops_snap'. |
| non_duplicated_ops_snap.AddAppliedTimestamps(applying_during_swap); |
| |
| if (common_hooks_) { |
| RETURN_NOT_OK_PREPEND(common_hooks_->PostSwapInDuplicatingRowSet(), |
| "PostSwapInDuplicatingRowSet hook failed"); |
| } |
| |
| // Phase 2. Here we re-scan the compaction input, copying those missed updates into the |
| // new rowset's DeltaTracker. |
| VLOG_WITH_PREFIX(1) << Substitute("$0: Phase 2: carrying over any updates " |
| "which arrived during Phase 1. Snapshot: $1", |
| op_name, non_duplicated_ops_snap.ToString()); |
| const SchemaPtr schema_ptr2 = schema(); |
| RETURN_NOT_OK_PREPEND( |
| input.CreateCompactionInput(non_duplicated_ops_snap, schema_ptr2.get(), &io_context, &merge), |
| Substitute("Failed to create $0 inputs", op_name).c_str()); |
| |
| // Update the output rowsets with the deltas that came in in phase 1, before we swapped |
| // in the DuplicatingRowSets. This will perform a flush of the updated DeltaTrackers |
| // in the end so that the data that is reported in the log as belonging to the input |
| // rowsets is flushed. |
| RETURN_NOT_OK_PREPEND(ReupdateMissedDeltas(&io_context, |
| merge.get(), |
| history_gc_opts, |
| flush_snap, |
| non_duplicated_ops_snap, |
| new_disk_rowsets), |
| Substitute("Failed to re-update deltas missed during $0 phase 1", |
| op_name).c_str()); |
| |
| if (common_hooks_) { |
| RETURN_NOT_OK_PREPEND(common_hooks_->PostReupdateMissedDeltas(), |
| "PostReupdateMissedDeltas hook failed"); |
| } |
| |
| // ------------------------------ |
| // Flush was successful. |
| |
| // Run fault points used by some integration tests. |
| if (input.num_rowsets() > 1) { |
| MAYBE_FAULT(FLAGS_fault_crash_before_flush_tablet_meta_after_compaction); |
| } else if (input.num_rowsets() == 1 && |
| input.rowsets()[0]->OnDiskBaseDataSizeWithRedos() == 0) { |
| MAYBE_FAULT(FLAGS_fault_crash_before_flush_tablet_meta_after_flush_mrs); |
| } |
| |
| // Write out the new Tablet Metadata and remove old rowsets. |
| RETURN_NOT_OK_PREPEND(FlushMetadata(input.rowsets(), new_drs_metas, mrs_being_flushed, |
| txns_being_flushed), |
| "Failed to flush new tablet metadata"); |
| |
| // Now that we've completed the operation, mark any rowsets that have been |
| // compacted, preventing them from being considered for future compactions. |
| for (const auto& rs : input.rowsets()) { |
| rs->set_has_been_compacted(); |
| } |
| |
| // Replace the compacted rowsets with the new on-disk rowsets, making them visible now that |
| // their metadata was written to disk. |
| AtomicSwapRowSets({ inprogress_rowset }, new_disk_rowsets); |
| UpdateAverageRowsetHeight(); |
| |
| const auto rows_written = drsw.rows_written_count(); |
| const auto drs_written = drsw.drs_written_count(); |
| const auto bytes_written = drsw.written_size(); |
| TRACE_COUNTER_INCREMENT("rows_written", rows_written); |
| TRACE_COUNTER_INCREMENT("drs_written", drs_written); |
| TRACE_COUNTER_INCREMENT("bytes_written", bytes_written); |
| VLOG_WITH_PREFIX(1) << Substitute("$0 successful on $1 rows ($2 rowsets, $3 bytes)", |
| op_name, |
| rows_written, |
| drs_written, |
| bytes_written); |
| |
| if (common_hooks_) { |
| RETURN_NOT_OK_PREPEND(common_hooks_->PostSwapNewRowSet(), |
| "PostSwapNewRowSet hook failed"); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status Tablet::HandleEmptyCompactionOrFlush(const RowSetVector& rowsets, |
| int mrs_being_flushed, |
| const vector<TxnInfoBeingFlushed>& txns_being_flushed) { |
| // Write out the new Tablet Metadata and remove old rowsets. |
| RETURN_NOT_OK_PREPEND(FlushMetadata(rowsets, |
| RowSetMetadataVector(), |
| mrs_being_flushed, |
| txns_being_flushed), |
| "Failed to flush new tablet metadata"); |
| |
| AtomicSwapRowSets(rowsets, RowSetVector()); |
| UpdateAverageRowsetHeight(); |
| return Status::OK(); |
| } |
| |
| void Tablet::UpdateAverageRowsetHeight() { |
| if (!metrics_) { |
| return; |
| } |
| // TODO(wdberkeley): We should be able to cache the computation of the CDF |
| // and average height and efficiently recompute it instead of doing it from |
| // scratch. |
| scoped_refptr<TabletComponents> comps; |
| GetComponents(&comps); |
| std::lock_guard<std::mutex> l(compact_select_lock_); |
| double rowset_total_height, rowset_total_width; |
| RowSetInfo::ComputeCdfAndCollectOrdered(*comps->rowsets, |
| &rowset_total_height, |
| &rowset_total_width, |
| nullptr, |
| nullptr); |
| metrics_->average_diskrowset_height->set_value(rowset_total_height, rowset_total_width); |
| } |
| |
| Status Tablet::Compact(CompactFlags flags) { |
| RETURN_IF_STOPPED_OR_CHECK_STATE(kOpen); |
| |
| RowSetsInCompaction input; |
| // Step 1. Capture the rowsets to be merged |
| RETURN_NOT_OK_PREPEND(PickRowSetsToCompact(&input, flags), |
| "Failed to pick rowsets to compact"); |
| const auto num_input_rowsets = input.num_rowsets(); |
| TRACE_COUNTER_INCREMENT("num_input_rowsets", num_input_rowsets); |
| VLOG_WITH_PREFIX(1) << Substitute("Compaction: stage 1 complete, picked $0 " |
| "rowsets to compact or flush", |
| num_input_rowsets); |
| if (compaction_hooks_) { |
| RETURN_NOT_OK_PREPEND(compaction_hooks_->PostSelectIterators(), |
| "PostSelectIterators hook failed"); |
| } |
| |
| if (VLOG_IS_ON(1)) { |
| input.DumpToLog(); |
| } |
| |
| return DoMergeCompactionOrFlush(input, TabletMetadata::kNoMrsFlushed, {}); |
| } |
| |
| void Tablet::UpdateCompactionStats(MaintenanceOpStats* stats) { |
| |
| if (mvcc_.GetCleanTimestamp() == Timestamp::kInitialTimestamp && |
| PREDICT_TRUE(FLAGS_prevent_kudu_2233_corruption)) { |
| KLOG_EVERY_N_SECS(WARNING, 30) << LogPrefix() << "Can't schedule compaction. Clean time has " |
| << "not been advanced past its initial value."; |
| stats->set_runnable(false); |
| return; |
| } |
| |
| double quality = 0; |
| unordered_set<const RowSet*> picked_set_ignored; |
| |
| shared_ptr<RowSetTree> rowsets_copy; |
| { |
| shared_lock<rw_spinlock> l(component_lock_); |
| rowsets_copy = components_->rowsets; |
| } |
| |
| { |
| std::lock_guard<std::mutex> compact_lock(compact_select_lock_); |
| WARN_NOT_OK(compaction_policy_->PickRowSets(*rowsets_copy, &picked_set_ignored, &quality, NULL), |
| Substitute("Couldn't determine compaction quality for $0", tablet_id())); |
| } |
| |
| VLOG_WITH_PREFIX(1) << "Best compaction for " << tablet_id() << ": " << quality; |
| |
| stats->set_runnable(quality >= 0); |
| stats->set_perf_improvement(quality); |
| } |
| |
| |
| Status Tablet::DebugDump(vector<string> *lines) { |
| shared_lock<rw_spinlock> l(component_lock_); |
| |
| LOG_STRING(INFO, lines) << "Dumping tablet:"; |
| LOG_STRING(INFO, lines) << "---------------------------"; |
| |
| LOG_STRING(INFO, lines) << "MRS " << components_->memrowset->ToString() << ":"; |
| RETURN_NOT_OK(components_->memrowset->DebugDump(lines)); |
| |
| for (const shared_ptr<RowSet> &rs : components_->rowsets->all_rowsets()) { |
| LOG_STRING(INFO, lines) << "RowSet " << rs->ToString() << ":"; |
| RETURN_NOT_OK(rs->DebugDump(lines)); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status Tablet::CaptureConsistentIterators( |
| const RowIteratorOptions& opts, |
| const ScanSpec* spec, |
| vector<IterWithBounds>* iters) const { |
| |
| shared_lock<rw_spinlock> l(component_lock_); |
| RETURN_IF_STOPPED_OR_CHECK_STATE(kOpen); |
| |
| // Construct all the iterators locally first, so that if we fail |
| // in the middle, we don't modify the output arguments. |
| vector<IterWithBounds> ret; |
| |
| // Grab the memrowset iterator. |
| { |
| unique_ptr<RowwiseIterator> ms_iter; |
| RETURN_NOT_OK(components_->memrowset->NewRowIterator(opts, &ms_iter)); |
| IterWithBounds mrs_iwb; |
| mrs_iwb.iter = std::move(ms_iter); |
| ret.emplace_back(std::move(mrs_iwb)); |
| } |
| |
| // Capture any iterators for memrowsets whose inserts were added as a part of |
| // committed transactions. |
| for (const auto& txn_mrs : components_->txn_memrowsets) { |
| unique_ptr<RowwiseIterator> txn_ms_iter; |
| RETURN_NOT_OK(txn_mrs->NewRowIterator(opts, &txn_ms_iter)); |
| IterWithBounds txn_mrs_iwb; |
| txn_mrs_iwb.iter = std::move(txn_ms_iter); |
| ret.emplace_back(std::move(txn_mrs_iwb)); |
| } |
| |
| // Cull row-sets in the case of key-range queries. |
| if (spec != nullptr && (spec->lower_bound_key() || spec->exclusive_upper_bound_key())) { |
| optional<Slice> lower_bound = spec->lower_bound_key() ? |
| optional<Slice>(spec->lower_bound_key()->encoded_key()) : nullopt; |
| optional<Slice> upper_bound = spec->exclusive_upper_bound_key() ? |
| optional<Slice>(spec->exclusive_upper_bound_key()->encoded_key()) : nullopt; |
| vector<RowSet*> interval_sets; |
| components_->rowsets->FindRowSetsIntersectingInterval(lower_bound, upper_bound, &interval_sets); |
| for (const auto* rs : interval_sets) { |
| IterWithBounds iwb; |
| RETURN_NOT_OK_PREPEND(rs->NewRowIteratorWithBounds(opts, &iwb), |
| Substitute("Could not create iterator for rowset $0", |
| rs->ToString())); |
| ret.emplace_back(std::move(iwb)); |
| } |
| *iters = std::move(ret); |
| return Status::OK(); |
| } |
| |
| // If there are no encoded predicates of the primary keys, then |
| // fall back to grabbing all rowset iterators. |
| for (const shared_ptr<RowSet>& rs : components_->rowsets->all_rowsets()) { |
| IterWithBounds iwb; |
| RETURN_NOT_OK_PREPEND(rs->NewRowIteratorWithBounds(opts, &iwb), |
| Substitute("Could not create iterator for rowset $0", |
| rs->ToString())); |
| ret.emplace_back(std::move(iwb)); |
| } |
| |
| // Swap results into the parameters. |
| *iters = std::move(ret); |
| return Status::OK(); |
| } |
| |
| Status Tablet::CountRows(uint64_t *count) const { |
| // First grab a consistent view of the components of the tablet. |
| scoped_refptr<TabletComponents> comps; |
| GetComponents(&comps); |
| |
| // Now sum up the counts. |
| IOContext io_context({ tablet_id() }); |
| *count = comps->memrowset->entry_count(); |
| for (const shared_ptr<RowSet> &rowset : comps->rowsets->all_rowsets()) { |
| rowid_t l_count; |
| RETURN_NOT_OK(rowset->CountRows(&io_context, &l_count)); |
| *count += l_count; |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status Tablet::CountLiveRows(uint64_t* count) const { |
| if (!metadata_->supports_live_row_count()) { |
| return Status::NotSupported("This tablet doesn't support live row counting"); |
| } |
| |
| scoped_refptr<TabletComponents> comps; |
| GetComponentsOrNull(&comps); |
| if (!comps) { |
| return Status::RuntimeError("The tablet has been shut down"); |
| } |
| |
| uint64_t ret = 0; |
| uint64_t tmp = 0; |
| RETURN_NOT_OK(comps->memrowset->CountLiveRows(&ret)); |
| for (const auto& mrs : comps->txn_memrowsets) { |
| RETURN_NOT_OK(mrs->CountLiveRows(&tmp)); |
| ret += tmp; |
| } |
| for (const shared_ptr<RowSet>& rowset : comps->rowsets->all_rowsets()) { |
| RETURN_NOT_OK(rowset->CountLiveRows(&tmp)); |
| ret += tmp; |
| } |
| *count = ret; |
| return Status::OK(); |
| } |
| |
| size_t Tablet::MemRowSetSize() const { |
| scoped_refptr<TabletComponents> comps; |
| GetComponentsOrNull(&comps); |
| |
| size_t ret = 0; |
| if (comps) { |
| for (const auto& mrs : comps->txn_memrowsets) { |
| ret += mrs->memory_footprint(); |
| } |
| ret += comps->memrowset->memory_footprint(); |
| } |
| return ret; |
| } |
| |
| bool Tablet::MemRowSetEmpty() const { |
| scoped_refptr<TabletComponents> comps; |
| GetComponents(&comps); |
| |
| const auto& txn_mrss = comps->txn_memrowsets; |
| return comps->memrowset->empty() && std::all_of(txn_mrss.begin(), txn_mrss.end(), |
| [] (const shared_ptr<MemRowSet>& mrs) { return mrs->empty(); }); |
| } |
| |
| size_t Tablet::MemRowSetLogReplaySize(const ReplaySizeMap& replay_size_map) const { |
| scoped_refptr<TabletComponents> comps; |
| GetComponents(&comps); |
| auto min_index = comps->memrowset->MinUnflushedLogIndex(); |
| for (const auto& mrs : comps->txn_memrowsets) { |
| const auto& mrs_min_index = mrs->MinUnflushedLogIndex(); |
| // If the current min isn't valid, set it. |
| if (min_index == -1) { |
| min_index = mrs_min_index; |
| continue; |
| } |
| // If the transaction MRS's min is valid and lower than the current, valid |
| // min, set it. |
| if (mrs_min_index != -1) { |
| min_index = std::min(mrs_min_index, min_index); |
| } |
| } |
| |
| return GetReplaySizeForIndex(min_index, replay_size_map); |
| } |
| |
| size_t Tablet::OnDiskSize() const { |
| scoped_refptr<TabletComponents> comps; |
| GetComponentsOrNull(&comps); |
| |
| if (!comps) return 0; |
| |
| size_t ret = metadata()->on_disk_size(); |
| for (const shared_ptr<RowSet> &rowset : comps->rowsets->all_rowsets()) { |
| ret += rowset->OnDiskSize(); |
| } |
| |
| return ret; |
| } |
| |
| size_t Tablet::OnDiskDataSize() const { |
| scoped_refptr<TabletComponents> comps; |
| GetComponentsOrNull(&comps); |
| |
| if (!comps) return 0; |
| |
| size_t ret = 0; |
| for (const shared_ptr<RowSet> &rowset : comps->rowsets->all_rowsets()) { |
| ret += rowset->OnDiskBaseDataSize(); |
| } |
| return ret; |
| } |
| |
| uint64_t Tablet::LastReadElapsedSeconds() const { |
| shared_lock<rw_spinlock> l(last_rw_time_lock_); |
| DCHECK(last_read_time_.Initialized()); |
| return static_cast<uint64_t>((MonoTime::Now() - last_read_time_).ToSeconds()); |
| } |
| |
| void Tablet::UpdateLastReadTime() const { |
| std::lock_guard<rw_spinlock> l(last_rw_time_lock_); |
| last_read_time_ = MonoTime::Now(); |
| } |
| |
| uint64_t Tablet::LastWriteElapsedSeconds() const { |
| shared_lock<rw_spinlock> l(last_rw_time_lock_); |
| DCHECK(last_write_time_.Initialized()); |
| return static_cast<uint64_t>((MonoTime::Now() - last_write_time_).ToSeconds()); |
| } |
| |
| double Tablet::CollectAndUpdateWorkloadStats(MaintenanceOp::PerfImprovementOpType type) { |
| DCHECK(last_update_workload_stats_time_.Initialized()); |
| double workload_score = 0; |
| MonoDelta elapse = MonoTime::Now() - last_update_workload_stats_time_; |
| if (metrics_) { |
| int64_t scans_started = metrics_->scans_started->value(); |
| int64_t rows_mutated = metrics_->rows_inserted->value() + |
| metrics_->rows_upserted->value() + |
| metrics_->rows_updated->value() + |
| metrics_->rows_deleted->value(); |
| if (elapse.ToMilliseconds() > FLAGS_workload_stats_rate_collection_min_interval_ms) { |
| double last_read_rate = |
| static_cast<double>(scans_started - last_scans_started_) / elapse.ToSeconds(); |
| last_read_score_ = |
| std::min(1.0, last_read_rate / FLAGS_scans_started_per_sec_for_hot_tablets) * |
| FLAGS_workload_score_upper_bound; |
| double last_write_rate = |
| static_cast<double>(rows_mutated - last_rows_mutated_) / elapse.ToSeconds(); |
| last_write_score_ = |
| std::min(1.0, last_write_rate / FLAGS_rows_writed_per_sec_for_hot_tablets) * |
| FLAGS_workload_score_upper_bound; |
| } |
| if (elapse.ToMilliseconds() > FLAGS_workload_stats_metric_collection_interval_ms) { |
| last_update_workload_stats_time_ = MonoTime::Now(); |
| last_scans_started_ = metrics_->scans_started->value(); |
| last_rows_mutated_ = rows_mutated; |
| } |
| } |
| if (type == MaintenanceOp::FLUSH_OP) { |
| // Flush ops are already scored based on how hot the tablet is |
| // for writes, so we'll only adjust the workload score based on |
| // how hot the tablet is for reads. |
| workload_score = last_read_score_; |
| } else if (type == MaintenanceOp::COMPACT_OP) { |
| // Since compactions may improve both read and write performance, increase |
| // the workload score based on the read and write rate to the tablet. |
| workload_score = std::min(FLAGS_workload_score_upper_bound, |
| last_read_score_ + last_write_score_); |
| } |
| return workload_score; |
| } |
| |
| size_t Tablet::DeltaMemStoresSize() const { |
| scoped_refptr<TabletComponents> comps; |
| GetComponents(&comps); |
| |
| size_t ret = 0; |
| for (const shared_ptr<RowSet> &rowset : comps->rowsets->all_rowsets()) { |
| ret += rowset->DeltaMemStoreSize(); |
| } |
| |
| return ret; |
| } |
| |
| bool Tablet::DeltaMemRowSetEmpty() const { |
| scoped_refptr<TabletComponents> comps; |
| GetComponents(&comps); |
| |
| for (const shared_ptr<RowSet> &rowset : comps->rowsets->all_rowsets()) { |
| if (!rowset->DeltaMemStoreEmpty()) { |
| return false; |
| } |
| } |
| |
| return true; |
| } |
| |
| Status Tablet::FlushBestDMS(const ReplaySizeMap &replay_size_map) const { |
| RETURN_IF_STOPPED_OR_CHECK_STATE(kOpen); |
| shared_ptr<RowSet> rowset = FindBestDMSToFlush(replay_size_map); |
| if (rowset) { |
| IOContext io_context({ tablet_id() }); |
| return rowset->FlushDeltas(&io_context); |
| } |
| return Status::OK(); |
| } |
| |
| shared_ptr<RowSet> Tablet::FindBestDMSToFlush(const ReplaySizeMap& replay_size_map, |
| int64_t* mem_size, int64_t* replay_size, |
| MonoTime* earliest_dms_time) const { |
| scoped_refptr<TabletComponents> comps; |
| GetComponents(&comps); |
| int64_t max_dms_size = 0; |
| double max_score = 0; |
| double mem_weight = 0; |
| int64_t dms_replay_size = 0; |
| MonoTime earliest_creation_time = MonoTime::Max(); |
| // If system is under memory pressure, we use the percentage of the hard limit consumed |
| // as mem_weight, so the tighter memory, the higher weight. Otherwise just left the |
| // mem_weight to 0. |
| process_memory::UnderMemoryPressure(&mem_weight); |
| |
| shared_ptr<RowSet> best_dms; |
| for (const shared_ptr<RowSet>& rowset : comps->rowsets->all_rowsets()) { |
| size_t dms_size_bytes; |
| MonoTime creation_time; |
| if (!rowset->DeltaMemStoreInfo(&dms_size_bytes, &creation_time)) { |
| continue; |
| } |
| earliest_creation_time = std::min(earliest_creation_time, creation_time); |
| int64_t replay_size_bytes = GetReplaySizeForIndex(rowset->MinUnflushedLogIndex(), |
| replay_size_map); |
| // To facilitate memory-based flushing when under memory pressure, we |
| // define a score that's part memory and part WAL retention bytes. |
| double score = dms_size_bytes * mem_weight + replay_size_bytes * (100 - mem_weight); |
| if ((score > max_score) || |
| // If the score is close to the max, as a tie-breaker, just look at the |
| // DMS size. |
| (score > max_score - 1 && dms_size_bytes > max_dms_size)) { |
| max_score = score; |
| max_dms_size = dms_size_bytes; |
| dms_replay_size = replay_size_bytes; |
| best_dms = rowset; |
| } |
| } |
| if (earliest_dms_time) { |
| *earliest_dms_time = earliest_creation_time; |
| } |
| if (mem_size) { |
| *mem_size = max_dms_size; |
| } |
| if (replay_size) { |
| *replay_size = dms_replay_size; |
| } |
| return best_dms; |
| } |
| |
| int64_t Tablet::GetReplaySizeForIndex(int64_t min_log_index, |
| const ReplaySizeMap& size_map) { |
| // If min_log_index is -1, that indicates that there is no anchor held |
| // for the tablet, and therefore no logs would need to be replayed. |
| if (size_map.empty() || min_log_index == -1) { |
| return 0; |
| } |
| |
| const auto& it = size_map.lower_bound(min_log_index); |
| if (it == size_map.end()) { |
| return 0; |
| } |
| return it->second; |
| } |
| |
| Status Tablet::FlushBiggestDMS() { |
| RETURN_IF_STOPPED_OR_CHECK_STATE(kOpen); |
| scoped_refptr<TabletComponents> comps; |
| GetComponents(&comps); |
| |
| int64_t max_size = -1; |
| shared_ptr<RowSet> biggest_drs; |
| for (const shared_ptr<RowSet> &rowset : comps->rowsets->all_rowsets()) { |
| int64_t current = rowset->DeltaMemStoreSize(); |
| if (current > max_size) { |
| max_size = current; |
| biggest_drs = rowset; |
| } |
| } |
| return max_size > 0 ? biggest_drs->FlushDeltas(nullptr) : Status::OK(); |
| } |
| |
| Status Tablet::FlushAllDMSForTests() { |
| RETURN_IF_STOPPED_OR_CHECK_STATE(kOpen); |
| scoped_refptr<TabletComponents> comps; |
| GetComponents(&comps); |
| for (const auto& rowset : comps->rowsets->all_rowsets()) { |
| RETURN_NOT_OK(rowset->FlushDeltas(nullptr)); |
| } |
| return Status::OK(); |
| } |
| |
| Status Tablet::MajorCompactAllDeltaStoresForTests() { |
| LOG_WITH_PREFIX(INFO) << "Major compacting all delta stores, for tests"; |
| RETURN_IF_STOPPED_OR_CHECK_STATE(kOpen); |
| scoped_refptr<TabletComponents> comps; |
| GetComponents(&comps); |
| IOContext io_context({ tablet_id() }); |
| for (const auto& rs : comps->rowsets->all_rowsets()) { |
| if (!rs->IsAvailableForCompaction()) continue; |
| DiskRowSet* drs = down_cast<DiskRowSet*>(rs.get()); |
| RETURN_NOT_OK(drs->delta_tracker()->InitAllDeltaStoresForTests(DeltaTracker::REDOS_ONLY)); |
| RETURN_NOT_OK_PREPEND(drs->MajorCompactDeltaStores(&io_context, GetHistoryGcOpts()), |
| "Failed major delta compaction on " + rs->ToString()); |
| } |
| return Status::OK(); |
| } |
| |
| Status Tablet::CompactWorstDeltas(RowSet::DeltaCompactionType type) { |
| RETURN_IF_STOPPED_OR_CHECK_STATE(kOpen); |
| shared_ptr<RowSet> rs; |
| |
| // We're required to grab the rowset's compact_flush_lock under the compact_select_lock_. |
| std::unique_lock<std::mutex> lock; |
| double perf_improv; |
| { |
| // We only want to keep the selection lock during the time we look at rowsets to compact. |
| // The returned rowset is guaranteed to be available to lock since locking must be done |
| // under this lock. |
| std::lock_guard<std::mutex> compact_lock(compact_select_lock_); |
| perf_improv = GetPerfImprovementForBestDeltaCompactUnlocked(type, &rs); |
| if (!rs) { |
| return Status::OK(); |
| } |
| lock = std::unique_lock<std::mutex>(*rs->compact_flush_lock(), std::try_to_lock); |
| CHECK(lock.owns_lock()); |
| } |
| |
| // We just released compact_select_lock_ so other compactions can select and run, but the |
| // rowset is ours. |
| DCHECK(perf_improv != 0); |
| IOContext io_context({ tablet_id() }); |
| if (type == RowSet::MINOR_DELTA_COMPACTION) { |
| RETURN_NOT_OK_PREPEND(rs->MinorCompactDeltaStores(&io_context), |
| "Failed minor delta compaction on " + rs->ToString()); |
| } else if (type == RowSet::MAJOR_DELTA_COMPACTION) { |
| RETURN_NOT_OK_PREPEND( |
| down_cast<DiskRowSet*>(rs.get())->MajorCompactDeltaStores(&io_context, GetHistoryGcOpts()), |
| "Failed major delta compaction on " + rs->ToString()); |
| } |
| return Status::OK(); |
| } |
| |
| double Tablet::GetPerfImprovementForBestDeltaCompact(RowSet::DeltaCompactionType type, |
| shared_ptr<RowSet>* rs) const { |
| std::lock_guard<std::mutex> compact_lock(compact_select_lock_); |
| return GetPerfImprovementForBestDeltaCompactUnlocked(type, rs); |
| } |
| |
| double Tablet::GetPerfImprovementForBestDeltaCompactUnlocked(RowSet::DeltaCompactionType type, |
| shared_ptr<RowSet>* rs) const { |
| std::unique_lock<std::mutex> cs_lock(compact_select_lock_, std::try_to_lock); |
| DCHECK(!cs_lock.owns_lock()); |
| scoped_refptr<TabletComponents> comps; |
| GetComponents(&comps); |
| double worst_delta_perf = 0; |
| shared_ptr<RowSet> worst_rs; |
| for (const shared_ptr<RowSet> &rowset : comps->rowsets->all_rowsets()) { |
| if (!rowset->IsAvailableForCompaction()) { |
| continue; |
| } |
| double perf_improv = rowset->DeltaStoresCompactionPerfImprovementScore(type); |
| if (perf_improv > worst_delta_perf) { |
| worst_rs = rowset; |
| worst_delta_perf = perf_improv; |
| } |
| } |
| if (rs && worst_delta_perf > 0) { |
| *rs = worst_rs; |
| } |
| return worst_delta_perf; |
| } |
| |
| Status Tablet::EstimateBytesInPotentiallyAncientUndoDeltas(int64_t* bytes) { |
| DCHECK(bytes); |
| |
| Timestamp ancient_history_mark; |
| if (!Tablet::GetTabletAncientHistoryMark(&ancient_history_mark)) { |
| VLOG_WITH_PREFIX(1) << "Cannot get ancient history mark. " |
| "The clock is likely not a hybrid clock"; |
| return Status::OK(); |
| } |
| |
| scoped_refptr<TabletComponents> comps; |
| GetComponents(&comps); |
| |
| int64_t tablet_bytes = 0; |
| for (const auto& rowset : comps->rowsets->all_rowsets()) { |
| int64_t rowset_bytes; |
| RETURN_NOT_OK(rowset->EstimateBytesInPotentiallyAncientUndoDeltas(ancient_history_mark, |
| &rowset_bytes)); |
| tablet_bytes += rowset_bytes; |
| } |
| |
| metrics_->undo_delta_block_estimated_retained_bytes->set_value(tablet_bytes); |
| *bytes = tablet_bytes; |
| return Status::OK(); |
| } |
| |
| Status Tablet::InitAncientUndoDeltas(MonoDelta time_budget, int64_t* bytes_in_ancient_undos) { |
| MonoTime tablet_init_start = MonoTime::Now(); |
| |
| IOContext io_context({ tablet_id() }); |
| Timestamp ancient_history_mark; |
| if (!Tablet::GetTabletAncientHistoryMark(&ancient_history_mark)) { |
| VLOG_WITH_PREFIX(1) << "Cannot get ancient history mark. " |
| "The clock is likely not a hybrid clock"; |
| return Status::OK(); |
| } |
| |
| scoped_refptr<TabletComponents> comps; |
| GetComponents(&comps); |
| |
| RowSetVector rowsets = comps->rowsets->all_rowsets(); |
| |
| // Estimate the size of the ancient undos in each rowset so that we can |
| // initialize them greedily. |
| vector<pair<size_t, int64_t>> rowset_ancient_undos_est_sizes; // index, bytes |
| rowset_ancient_undos_est_sizes.reserve(rowsets.size()); |
| for (size_t i = 0; i < rowsets.size(); i++) { |
| const auto& rowset = rowsets[i]; |
| int64_t bytes; |
| RETURN_NOT_OK(rowset->EstimateBytesInPotentiallyAncientUndoDeltas(ancient_history_mark, |
| &bytes)); |
| rowset_ancient_undos_est_sizes.emplace_back(i, bytes); |
| } |
| |
| // Sort the rowsets in descending size order to optimize for the worst offenders. |
| std::sort(rowset_ancient_undos_est_sizes.begin(), rowset_ancient_undos_est_sizes.end(), |
| [&](const pair<size_t, int64_t>& a, const pair<size_t, int64_t>& b) { |
| return a.second > b.second; // Descending order. |
| }); |
| |
| // Begin timeout / deadline countdown here in case the above takes some time. |
| MonoTime deadline = time_budget.Initialized() ? MonoTime::Now() + time_budget : MonoTime(); |
| |
| // Initialize the rowsets largest-first. |
| int64_t tablet_bytes_in_ancient_undos = 0; |
| for (const auto& rs_est_size : rowset_ancient_undos_est_sizes) { |
| size_t index = rs_est_size.first; |
| const auto& rowset = rowsets[index]; |
| int64_t rowset_blocks_initialized; |
| int64_t rowset_bytes_in_ancient_undos; |
| RETURN_NOT_OK(rowset->InitUndoDeltas(ancient_history_mark, deadline, &io_context, |
| &rowset_blocks_initialized, |
| &rowset_bytes_in_ancient_undos)); |
| tablet_bytes_in_ancient_undos += rowset_bytes_in_ancient_undos; |
| } |
| |
| MonoDelta tablet_init_duration = MonoTime::Now() - tablet_init_start; |
| metrics_->undo_delta_block_gc_init_duration->Increment( |
| tablet_init_duration.ToMilliseconds()); |
| |
| VLOG_WITH_PREFIX(2) << Substitute("Bytes in ancient undos: $0. Init duration: $1", |
| HumanReadableNumBytes::ToString(tablet_bytes_in_ancient_undos), |
| tablet_init_duration.ToString()); |
| |
| if (bytes_in_ancient_undos) *bytes_in_ancient_undos = tablet_bytes_in_ancient_undos; |
| return Status::OK(); |
| } |
| |
| Status Tablet::GetBytesInAncientDeletedRowsets(int64_t* bytes_in_ancient_deleted_rowsets) { |
| Timestamp ancient_history_mark; |
| if (!Tablet::GetTabletAncientHistoryMark(&ancient_history_mark)) { |
| VLOG_WITH_PREFIX(1) << "Cannot get ancient history mark. " |
| "The clock is likely not a hybrid clock"; |
| *bytes_in_ancient_deleted_rowsets = 0; |
| return Status::OK(); |
| } |
| |
| scoped_refptr<TabletComponents> comps; |
| GetComponents(&comps); |
| int64_t bytes = 0; |
| { |
| std::lock_guard<std::mutex> csl(compact_select_lock_); |
| for (const auto& rowset : comps->rowsets->all_rowsets()) { |
| if (!rowset->IsAvailableForCompaction()) { |
| continue; |
| } |
| bool deleted_and_ancient = false; |
| RETURN_NOT_OK(rowset->IsDeletedAndFullyAncient(ancient_history_mark, &deleted_and_ancient)); |
| if (deleted_and_ancient) { |
| bytes += rowset->OnDiskSize(); |
| } |
| } |
| } |
| metrics_->deleted_rowset_estimated_retained_bytes->set_value(bytes); |
| *bytes_in_ancient_deleted_rowsets = bytes; |
| return Status::OK(); |
| } |
| |
| Status Tablet::DeleteAncientDeletedRowsets() { |
| RETURN_IF_STOPPED_OR_CHECK_STATE(kOpen); |
| const MonoTime start_time = MonoTime::Now(); |
| Timestamp ancient_history_mark; |
| if (!Tablet::GetTabletAncientHistoryMark(&ancient_history_mark)) { |
| VLOG_WITH_PREFIX(1) << "Cannot get ancient history mark. " |
| "The clock is likely not a hybrid clock"; |
| return Status::OK(); |
| } |
| |
| scoped_refptr<TabletComponents> comps; |
| GetComponents(&comps); |
| |
| // We'll take our the rowsets' locks to ensure we don't GC the rowsets while |
| // they're being compacted. |
| RowSetVector to_delete; |
| int num_unavailable_for_delete = 0; |
| vector<std::unique_lock<std::mutex>> rowset_locks; |
| int64_t bytes_deleted = 0; |
| { |
| std::lock_guard<std::mutex> csl(compact_select_lock_); |
| for (const auto& rowset : comps->rowsets->all_rowsets()) { |
| // Check if this rowset has been locked by a compaction. If so, we |
| // shouldn't attempt to delete it. |
| if (!rowset->IsAvailableForCompaction()) { |
| num_unavailable_for_delete++; |
| continue; |
| } |
| bool deleted_and_empty = false; |
| RETURN_NOT_OK(rowset->IsDeletedAndFullyAncient(ancient_history_mark, &deleted_and_empty)); |
| if (deleted_and_empty) { |
| // If we intend on deleting the rowset, take its lock so concurrent |
| // compactions don't try to select it for compactions. |
| std::unique_lock<std::mutex> l(*rowset->compact_flush_lock(), std::try_to_lock); |
| CHECK(l.owns_lock()); |
| to_delete.emplace_back(rowset); |
| rowset_locks.emplace_back(std::move(l)); |
| bytes_deleted += rowset->OnDiskSize(); |
| } |
| } |
| } |
| if (to_delete.empty()) { |
| return Status::OK(); |
| } |
| RETURN_NOT_OK(HandleEmptyCompactionOrFlush( |
| to_delete, TabletMetadata::kNoMrsFlushed, {})); |
| metrics_->deleted_rowset_gc_bytes_deleted->IncrementBy(bytes_deleted); |
| metrics_->deleted_rowset_gc_duration->Increment((MonoTime::Now() - start_time).ToMilliseconds()); |
| return Status::OK(); |
| } |
| |
| Status Tablet::DeleteAncientUndoDeltas(int64_t* blocks_deleted, int64_t* bytes_deleted) { |
| RETURN_IF_STOPPED_OR_CHECK_STATE(kOpen); |
| MonoTime tablet_delete_start = MonoTime::Now(); |
| |
| Timestamp ancient_history_mark; |
| if (!Tablet::GetTabletAncientHistoryMark(&ancient_history_mark)) return Status::OK(); |
| |
| scoped_refptr<TabletComponents> comps; |
| GetComponents(&comps); |
| |
| // We need to hold the compact_flush_lock for each rowset we GC undos from. |
| RowSetVector rowsets_to_gc_undos; |
| vector<std::unique_lock<std::mutex>> rowset_locks; |
| { |
| // We hold the selection lock so other threads will not attempt to select the |
| // same rowsets for compaction while we delete old undos. |
| std::lock_guard<std::mutex> compact_lock(compact_select_lock_); |
| for (const auto& rowset : comps->rowsets->all_rowsets()) { |
| if (!rowset->IsAvailableForCompaction()) { |
| continue; |
| } |
| std::unique_lock<std::mutex> lock(*rowset->compact_flush_lock(), std::try_to_lock); |
| CHECK(lock.owns_lock()) << rowset->ToString() << " unable to lock compact_flush_lock"; |
| rowsets_to_gc_undos.push_back(rowset); |
| rowset_locks.push_back(std::move(lock)); |
| } |
| } |
| |
| int64_t tablet_blocks_deleted = 0; |
| int64_t tablet_bytes_deleted = 0; |
| fs::IOContext io_context({ tablet_id() }); |
| for (const auto& rowset : rowsets_to_gc_undos) { |
| int64_t rowset_blocks_deleted; |
| int64_t rowset_bytes_deleted; |
| RETURN_NOT_OK(rowset->DeleteAncientUndoDeltas(ancient_history_mark, &io_context, |
| &rowset_blocks_deleted, &rowset_bytes_deleted)); |
| tablet_blocks_deleted += rowset_blocks_deleted; |
| tablet_bytes_deleted += rowset_bytes_deleted; |
| } |
| // We flush the tablet metadata at the end because we don't flush per-RowSet |
| // for performance reasons. |
| if (tablet_blocks_deleted > 0) { |
| RETURN_NOT_OK(metadata_->Flush()); |
| } |
| |
| MonoDelta tablet_delete_duration = MonoTime::Now() - tablet_delete_start; |
| metrics_->undo_delta_block_gc_bytes_deleted->IncrementBy(tablet_bytes_deleted); |
| metrics_->undo_delta_block_gc_delete_duration->Increment( |
| tablet_delete_duration.ToMilliseconds()); |
| |
| if (blocks_deleted) *blocks_deleted = tablet_blocks_deleted; |
| if (bytes_deleted) *bytes_deleted = tablet_bytes_deleted; |
| return Status::OK(); |
| } |
| |
| int64_t Tablet::CountUndoDeltasForTests() const { |
| scoped_refptr<TabletComponents> comps; |
| GetComponents(&comps); |
| int64_t sum = 0; |
| for (const auto& rowset : comps->rowsets->all_rowsets()) { |
| shared_ptr<RowSetMetadata> metadata = rowset->metadata(); |
| if (metadata) { |
| sum += metadata->undo_delta_blocks().size(); |
| } |
| } |
| return sum; |
| } |
| |
| int64_t Tablet::CountRedoDeltasForTests() const { |
| scoped_refptr<TabletComponents> comps; |
| GetComponents(&comps); |
| int64_t sum = 0; |
| for (const auto& rowset : comps->rowsets->all_rowsets()) { |
| shared_ptr<RowSetMetadata> metadata = rowset->metadata(); |
| if (metadata) { |
| sum += metadata->redo_delta_blocks().size(); |
| } |
| } |
| return sum; |
| } |
| |
| size_t Tablet::num_rowsets() const { |
| shared_lock<rw_spinlock> l(component_lock_); |
| return components_ ? components_->rowsets->all_rowsets().size() : 0; |
| } |
| |
| void Tablet::PrintRSLayout(ostream* o) { |
| DCHECK(o); |
| auto& out = *o; |
| |
| shared_ptr<RowSetTree> rowsets_copy; |
| { |
| shared_lock<rw_spinlock> l(component_lock_); |
| rowsets_copy = components_->rowsets; |
| } |
| std::lock_guard<std::mutex> compact_lock(compact_select_lock_); |
| // Run the compaction policy in order to get its log and highlight those |
| // rowsets which would be compacted next. |
| vector<string> log; |
| unordered_set<const RowSet*> picked; |
| double quality; |
| Status s = compaction_policy_->PickRowSets(*rowsets_copy, &picked, &quality, &log); |
| if (!s.ok()) { |
| out << "<b>Error:</b> " << EscapeForHtmlToString(s.ToString()); |
| return; |
| } |
| |
| if (!picked.empty()) { |
| out << "<p>"; |
| out << "Highlighted rowsets indicate those that would be compacted next if a " |
| << "compaction were to run on this tablet."; |
| out << "</p>"; |
| } |
| |
| double rowset_total_height, rowset_total_width; |
| vector<RowSetInfo> min, max; |
| RowSetInfo::ComputeCdfAndCollectOrdered(*rowsets_copy, |
| &rowset_total_height, |
| &rowset_total_width, |
| &min, |
| &max); |
| double average_rowset_height = rowset_total_width > 0 |
| ? rowset_total_height / rowset_total_width |
| : 0.0; |
| DumpCompactionSVG(min, picked, o, /*print_xml_header=*/false); |
| |
| // Compaction policy ignores rowsets unavailable for compaction. This is good, |
| // except it causes the SVG to be potentially missing rowsets. It's hard to |
| // take these presently-compacting rowsets into account because we are racing |
| // against the compaction finishing, and at the end of the compaction the |
| // rowsets might no longer exist (merge compaction) or their bounds may have |
| // changed (major delta compaction). So, let's just disclose how many of these |
| // rowsets there are. |
| int num_rowsets_unavailable_for_compaction = std::count_if( |
| rowsets_copy->all_rowsets().begin(), |
| rowsets_copy->all_rowsets().end(), |
| [](const shared_ptr<RowSet>& rowset) { |
| // The first condition excludes the memrowset. |
| return rowset->metadata() && !rowset->IsAvailableForCompaction(); |
| }); |
| out << Substitute("<div><p>In addition to the rowsets pictured and listed, " |
| "there are $0 rowset(s) currently undergoing compactions." |
| "</p></div>", |
| num_rowsets_unavailable_for_compaction) |
| << endl; |
| |
| // Compute some summary statistics for the tablet's rowsets. |
| const auto num_rowsets = min.size(); |
| if (num_rowsets > 0) { |
| vector<int64_t> rowset_sizes; |
| rowset_sizes.reserve(num_rowsets); |
| for (const auto& rsi : min) { |
| rowset_sizes.push_back(rsi.size_bytes()); |
| } |
| out << "<table class=\"table tablet-striped table-hover\">" << endl; |
| // Compute the stats quick'n'dirty by sorting and looking at approximately |
| // the right spot. |
| // TODO(wdberkeley): Could use an O(n) quickselect-based algorithm. |
| // TODO(wdberkeley): A bona fide box-and-whisker plot would be nice. |
| // d3.js can make really nice ones: https://bl.ocks.org/mbostock/4061502. |
| std::sort(rowset_sizes.begin(), rowset_sizes.end()); |
| const auto size_bytes_min = rowset_sizes[0]; |
| const auto size_bytes_first_quartile = rowset_sizes[num_rowsets / 4]; |
| const auto size_bytes_median = rowset_sizes[num_rowsets / 2]; |
| const auto size_bytes_third_quartile = rowset_sizes[3 * num_rowsets / 4]; |
| const auto size_bytes_max = rowset_sizes[num_rowsets - 1]; |
| out << Substitute("<thead><tr>" |
| " <th>Statistic</th>" |
| " <th>Approximate Value</th>" |
| "<tr></thead>" |
| "<tbody>" |
| " <tr><td>Count</td><td>$0</td></tr>" |
| " <tr><td>Min</td><td>$1</td></tr>" |
| " <tr><td>First quartile</td><td>$2</td></tr>" |
| " <tr><td>Median</td><td>$3</td></tr>" |
| " <tr><td>Third quartile</td><td>$4</td></tr>" |
| " <tr><td>Max</td><td>$5</td></tr>" |
| " <tr><td>Avg. Height</td><td>$6</td></tr>" |
| "<tbody>", |
| num_rowsets, |
| HumanReadableNumBytes::ToString(size_bytes_min), |
| HumanReadableNumBytes::ToString(size_bytes_first_quartile), |
| HumanReadableNumBytes::ToString(size_bytes_median), |
| HumanReadableNumBytes::ToString(size_bytes_third_quartile), |
| HumanReadableNumBytes::ToString(size_bytes_max), |
| average_rowset_height); |
| out << "</table>" << endl; |
| } |
| |
| // TODO(wdberkeley): Should we even display this? It's one line per rowset |
| // and doesn't contain any useful information except each rowset's size. |
| out << "<h2>Compaction policy log</h2>" << endl; |
| |
| out << "<pre>" << std::endl; |
| for (const string& s : log) { |
| out << EscapeForHtmlToString(s) << endl; |
| } |
| out << "</pre>" << endl; |
| } |
| |
| string Tablet::LogPrefix() const { |
| return Substitute("T $0 P $1: ", tablet_id(), metadata_->fs_manager()->uuid()); |
| } |
| |
| //////////////////////////////////////////////////////////// |
| // Tablet::Iterator |
| //////////////////////////////////////////////////////////// |
| |
| Tablet::Iterator::Iterator(const Tablet* tablet, |
| RowIteratorOptions opts) |
| : tablet_(tablet), |
| io_context_({ tablet->tablet_id() }), |
| projection_(*CHECK_NOTNULL(opts.projection)), |
| opts_(std::move(opts)) { |
| opts_.io_context = &io_context_; |
| opts_.projection = &projection_; |
| } |
| |
| Tablet::Iterator::~Iterator() {} |
| |
| Status Tablet::Iterator::Init(ScanSpec *spec) { |
| RETURN_NOT_OK(tablet_->CheckHasNotBeenStopped()); |
| DCHECK(iter_.get() == nullptr); |
| |
| RETURN_NOT_OK(tablet_->GetMappedReadProjection(projection_, &projection_)); |
| |
| vector<IterWithBounds> iters; |
| RETURN_NOT_OK(tablet_->CaptureConsistentIterators(opts_, spec, &iters)); |
| TRACE_COUNTER_INCREMENT("rowset_iterators", iters.size()); |
| |
| switch (opts_.order) { |
| case ORDERED: |
| iter_ = NewMergeIterator(MergeIteratorOptions(opts_.include_deleted_rows), std::move(iters)); |
| break; |
| case UNORDERED: |
| default: |
| iter_ = NewUnionIterator(std::move(iters)); |
| break; |
| } |
| |
| RETURN_NOT_OK(iter_->Init(spec)); |
| return Status::OK(); |
| } |
| |
| bool Tablet::Iterator::HasNext() const { |
| DCHECK(iter_.get() != nullptr) << "Not initialized!"; |
| return iter_->HasNext(); |
| } |
| |
| Status Tablet::Iterator::NextBlock(RowBlock *dst) { |
| DCHECK(iter_.get() != nullptr) << "Not initialized!"; |
| return iter_->NextBlock(dst); |
| } |
| |
| string Tablet::Iterator::ToString() const { |
| string s; |
| s.append("tablet iterator: "); |
| if (iter_.get() == nullptr) { |
| s.append("NULL"); |
| } else { |
| s.append(iter_->ToString()); |
| } |
| return s; |
| } |
| |
| const Schema& Tablet::Iterator::schema() const { |
| return *opts_.projection; |
| } |
| |
| void Tablet::Iterator::GetIteratorStats(vector<IteratorStats>* stats) const { |
| iter_->GetIteratorStats(stats); |
| } |
| |
| } // namespace tablet |
| } // namespace kudu |