blob: e021ec829e297bc939d4ed2c293b79476a0b0f13 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cstddef>
#include <cstdint>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include <gtest/gtest_prod.h>
#include "kudu/common/rowid.h"
#include "kudu/fs/block_id.h"
#include "kudu/gutil/macros.h"
#include "kudu/tablet/cfile_set.h"
#include "kudu/tablet/delta_key.h"
#include "kudu/tablet/delta_stats.h"
#include "kudu/tablet/delta_store.h"
#include "kudu/tablet/tablet_mem_trackers.h"
#include "kudu/util/atomic.h"
#include "kudu/util/locks.h"
#include "kudu/util/mutex.h"
#include "kudu/util/status.h"
namespace kudu {
class ColumnwiseIterator;
class MonoTime;
class RowChangeList;
class Schema;
class Timestamp;
struct ColumnId;
namespace consensus {
class OpId;
namespace fs {
class WritableBlock;
struct IOContext;
namespace log {
class LogAnchorRegistry;
namespace tablet {
class DeltaFileReader;
class DeltaMemStore;
class OperationResultPB;
class RowSetMetadata;
class RowSetMetadataUpdate;
struct ProbeStats;
struct RowIteratorOptions;
typedef std::pair<BlockId, std::unique_ptr<DeltaStats>> DeltaBlockIdAndStats;
// The DeltaTracker is the part of a DiskRowSet which is responsible for
// tracking modifications against the base data. It consists of a set of
// DeltaStores which each contain a set of mutations against the base data.
// These DeltaStores may be on disk (DeltaFileReader) or in-memory (DeltaMemStore).
// This class is also responsible for flushing the in-memory deltas to disk.
class DeltaTracker {
enum MetadataFlushType {
static Status Open(const std::shared_ptr<RowSetMetadata>& rowset_metadata,
log::LogAnchorRegistry* log_anchor_registry,
const TabletMemTrackers& mem_trackers,
const fs::IOContext* io_context,
std::unique_ptr<DeltaTracker>* delta_tracker);
Status WrapIterator(const std::shared_ptr<CFileSet::Iterator> &base,
const RowIteratorOptions& opts,
std::unique_ptr<ColumnwiseIterator>* out) const;
// Enum used for NewDeltaIterator() and CollectStores() below.
// Determines whether all types of stores should be considered,
// or just UNDO or REDO stores.
enum WhichStores {
// Create a new DeltaIterator which merges the delta stores tracked
// by this DeltaTracker. Depending on the value of 'which' (see above),
// this iterator may include UNDOs, REDOs, or both.
// Pointers in 'opts' must remain valid for the lifetime of the returned iterator.
Status NewDeltaIterator(const RowIteratorOptions& opts,
WhichStores which,
std::unique_ptr<DeltaIterator>* out) const;
Status NewDeltaIterator(const RowIteratorOptions& opts,
std::unique_ptr<DeltaIterator>* out) const {
return NewDeltaIterator(opts, UNDOS_AND_REDOS, out);
// Like NewDeltaIterator() but only includes file based stores, does not include
// the DMS.
// Returns the delta stores being merged in *included_stores.
Status NewDeltaFileIterator(
const RowIteratorOptions& opts,
DeltaType type,
std::vector<std::shared_ptr<DeltaStore>>* included_stores,
std::unique_ptr<DeltaIterator>* out) const;
// Flushes the current DeltaMemStore and replaces it with a new one.
// Caller selects whether to also have the RowSetMetadata (and consequently
// the TabletMetadata) flushed.
// NOTE: 'flush_type' should almost always be set to 'FLUSH_METADATA', or else
// delta stores might become unrecoverable.
Status Flush(const fs::IOContext* io_context, MetadataFlushType flush_type);
// Update the given row in the database.
// Copies the data, as well as any referenced values into a local arena.
// "result" tracks the status of the update as well as which data
// structure(s) it ended up at.
Status Update(Timestamp timestamp,
rowid_t row_idx,
const RowChangeList &update,
const consensus::OpId& op_id,
OperationResultPB* result);
// Check if the given row has been deleted -- i.e if the most recent
// delta for this row is a deletion.
// Sets *deleted to true if so; otherwise sets it to false.
Status CheckRowDeleted(rowid_t row_idx, const fs::IOContext* io_context,
bool *deleted, ProbeStats* stats) const;
// Compacts all REDO delta files.
Status Compact(const fs::IOContext* io_context);
// Updates the in-memory list of delta stores and then persists the updated
// metadata. This should only be used for compactions or ancient history
// data GC, not when adding mutations, since it makes the updated stores
// visible before attempting to flush the metadata to disk.
// The 'compact_flush_lock_' should be acquired before calling this method.
Status CommitDeltaStoreMetadataUpdate(const RowSetMetadataUpdate& update,
const SharedDeltaStoreVector& to_remove,
std::vector<DeltaBlockIdAndStats> new_delta_blocks,
const fs::IOContext* io_context,
DeltaType type,
MetadataFlushType flush_type);
// Performs minor compaction on all REDO delta files between index
// "start_idx" and "end_idx" (inclusive) and writes this to a
// new REDO delta block. If "end_idx" is set to -1, then delta files at
// all indexes starting with "start_idx" will be compacted.
Status CompactStores(const fs::IOContext* io_context, int start_idx, int end_idx);
// See RowSet::EstimateBytesInPotentiallyAncientUndoDeltas().
Status EstimateBytesInPotentiallyAncientUndoDeltas(Timestamp ancient_history_mark,
int64_t* bytes);
// Returns whether all redo (DMS and newest redo delta file) are ancient
// (i.e. that the redo with the highest timestamp is older than the AHM).
// This is an estimate, since if the newest redo file has not yet been
// initted, this will return a false negative.
bool EstimateAllRedosAreAncient(Timestamp ancient_history_mark);
// See RowSet::InitUndoDeltas().
Status InitUndoDeltas(Timestamp ancient_history_mark,
MonoTime deadline,
const fs::IOContext* io_context,
int64_t* delta_blocks_initialized,
int64_t* bytes_in_ancient_undos);
// See RowSet::DeleteAncientUndoDeltas().
Status DeleteAncientUndoDeltas(Timestamp ancient_history_mark, const fs::IOContext* io_context,
int64_t* blocks_deleted, int64_t* bytes_deleted);
// Opens the input 'blocks' of type 'type' and returns the opened delta file
// readers in 'stores'.
Status OpenDeltaReaders(std::vector<DeltaBlockIdAndStats> blocks,
const fs::IOContext* io_context,
std::vector<std::shared_ptr<DeltaStore>>* stores,
DeltaType type);
#ifndef NDEBUG
// Validates that 'first' may precede 'second' in an ordered list of deltas,
// given a delta type of 'type'. This should only be run in DEBUG mode.
// Crashes if there is an ordering violation, and returns an error if the
// validation could not be performed.
Status ValidateDeltaOrder(const std::shared_ptr<DeltaStore>& first,
const std::shared_ptr<DeltaStore>& second,
const fs::IOContext* io_context,
DeltaType type);
// Validates the relative ordering of the deltas in the specified list. This
// should only be run in DEBUG mode.
// Crashes if there is an ordering violation, and returns an error if the
// validation could not be performed.
Status ValidateDeltasOrdered(const SharedDeltaStoreVector& list,
const fs::IOContext* io_context,
DeltaType type);
#endif // NDEBUG
// Replaces the subsequence of stores that matches 'stores_to_replace' with
// delta file readers corresponding to 'new_delta_blocks', which may be empty.
// If 'stores_to_replace' is empty then the stores represented by
// 'new_delta_blocks' are prepended to the relevant delta stores list.
// In DEBUG mode, this may do IO to validate the delta ordering.
void AtomicUpdateStores(const SharedDeltaStoreVector& stores_to_replace,
const SharedDeltaStoreVector& new_stores,
const fs::IOContext* io_context,
DeltaType type);
// Returns true if a DMS exists, returning its size in bytes and the time at
// which it was created. Otherwise, returns false and doesn't update the
// input pointers.
// NOTE: we lazily create the DMS, so the creation time corresponds to the
// age of the oldest update to the rowset.
bool GetDeltaMemStoreInfo(size_t* size_bytes, MonoTime* creation_time) const;
// Get the delta MemStore's size in bytes, including pre-allocation.
size_t DeltaMemStoreSize() const;
// Returns true if the DMS doesn't exist. This doesn't rely on the size.
bool DeltaMemStoreEmpty() const {
return !dms_exists_.Load();
// Get the minimum log index for this tracker's DMS, -1 if it wasn't set.
int64_t MinUnflushedLogIndex() const;
// Return the number of undo delta stores.
size_t CountUndoDeltaStores() const;
// Return the number of redo delta stores, not including the DeltaMemStore.
size_t CountRedoDeltaStores() const;
// Return the size on-disk of UNDO deltas, in bytes.
uint64_t UndoDeltaOnDiskSize() const;
// Return the size on-disk of REDO deltas, in bytes.
uint64_t RedoDeltaOnDiskSize() const;
// Retrieves the list of column indexes that currently have updates.
void GetColumnIdsWithUpdates(std::vector<ColumnId>* col_ids) const;
Mutex* compact_flush_lock() {
return &compact_flush_lock_;
// Returns an error if the delta tracker has been marked read-only.
// Else, returns OK.
// 'compact_flush_lock_' must be held when this is called.
Status CheckWritableUnlocked() const;
// Init() all of the specified delta stores. For tests only.
Status InitAllDeltaStoresForTests(WhichStores stores);
// Count the number of deleted rows in the current DMS as well as
// in a flushing DMS (if one exists)
int64_t CountDeletedRows() const;
FRIEND_TEST(TestRowSet, TestRowSetUpdate);
FRIEND_TEST(TestRowSet, TestDMSFlush);
FRIEND_TEST(TestRowSet, TestMakeDeltaIteratorMergerUnlocked);
FRIEND_TEST(TestRowSet, TestCompactStores);
FRIEND_TEST(TestMajorDeltaCompaction, TestCompact);
DeltaTracker(std::shared_ptr<RowSetMetadata> rowset_metadata,
log::LogAnchorRegistry* log_anchor_registry,
TabletMemTrackers mem_trackers);
Status DoOpen(const fs::IOContext* io_context);
Status FlushDMS(DeltaMemStore* dms,
const fs::IOContext* io_context,
std::shared_ptr<DeltaFileReader>* dfr,
MetadataFlushType flush_type);
// This collects undo and/or redo stores into '*stores'.
void CollectStores(std::vector<std::shared_ptr<DeltaStore>>* stores,
WhichStores which) const;
// Performs the actual compaction. Results of compaction are written with
// 'block' and stats for are populated in 'output_stats'. Delta stores that
// underwent compaction are appended to 'compacted_stores', and their
// corresponding block ids are appended to 'compacted_blocks'.
// NOTE: the caller of this method should acquire or already hold an
// exclusive lock on 'compact_flush_lock_' before calling this method in
// order to protect 'redo_delta_stores_'.
Status DoCompactStores(const fs::IOContext* io_context,
size_t start_idx, size_t end_idx,
std::unique_ptr<fs::WritableBlock> block,
std::unique_ptr<DeltaStats>* output_stats,
std::vector<std::shared_ptr<DeltaStore>>* compacted_stores,
std::vector<BlockId>* compacted_blocks);
// Creates a merge delta iterator and captures the delta stores and
// delta blocks under compaction into 'target_stores' and
// 'target_blocks', respectively. The merge iterator is stored in
// 'out'; 'out' is valid until this instance of DeltaTracker
// is destroyed.
// NOTE: the caller of this method must first acquire or already
// hold a lock on 'compact_flush_lock_'in order to guard against a
// race on 'redo_delta_stores_'.
Status MakeDeltaIteratorMergerUnlocked(const fs::IOContext* io_context,
size_t start_idx, size_t end_idx, const Schema* projection,
std::vector<std::shared_ptr<DeltaStore>>* target_stores,
std::vector<BlockId>* target_blocks,
std::unique_ptr<DeltaIterator>* out);
std::string LogPrefix() const;
Status CreateAndInitDMSUnlocked(const fs::IOContext* io_context);
std::shared_ptr<RowSetMetadata> rowset_metadata_;
bool open_;
// Certain errors (e.g. failed delta tracker flushes) may leave the delta
// store lists in a state such that it would be unsafe to run further
// maintenance ops.
// Must be checked immediately after locking compact_flush_lock_.
bool read_only_;
log::LogAnchorRegistry* log_anchor_registry_;
TabletMemTrackers mem_trackers_;
int64_t next_dms_id_;
// The current DeltaMemStore into which updates should be written.
std::shared_ptr<DeltaMemStore> dms_;
// The set of tracked REDO delta stores, in increasing timestamp order.
SharedDeltaStoreVector redo_delta_stores_;
// The set of tracked UNDO delta stores, in decreasing timestamp order.
SharedDeltaStoreVector undo_delta_stores_;
// The maintenance scheduler calls DeltaMemStoreEmpty() a lot.
// We use an atomic variable to indicate whether DMS exists or not and
// to avoid having to take component_lock_ in order to satisfy this call.
AtomicBool dms_exists_;
// read-write lock protecting dms_ and {redo,undo}_delta_stores_.
// - Readers take this lock in shared mode.
// - Mutators take this lock in exclusive mode if they need to create
// a new DMS, and shared mode otherwise.
// - Flushers take this lock in exclusive mode before they modify the
// structure of the rowset.
// TODO(perf): convert this to a reader-biased lock to avoid any cacheline
// contention between threads.
mutable rw_spinlock component_lock_;
// Exclusive lock that ensures that only one flush or compaction can run
// at a time. Protects redo_delta_stores_ and undo_delta_stores_.
// NOTE: this lock cannot be acquired while component_lock is held:
// otherwise, Flush and Compaction threads (that both first acquire this lock
// and then component_lock) will deadlock.
// TODO(perf): this needs to be more fine grained
mutable Mutex compact_flush_lock_;
// Number of deleted rows for a DMS that is currently being flushed.
// When the flush completes, this is merged into the RowSetMetadata
// and reset.
int64_t deleted_row_count_;
} // namespace tablet
} // namespace kudu