blob: 3b30ea0db3dfac9da76b19eb2abfa15bd7527d76 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cstddef>
#include <cstdint>
#include <deque>
#include <memory>
#include <optional>
#include <ostream>
#include <string>
#include <vector>
#include <glog/logging.h>
#include "kudu/common/row_changelist.h"
#include "kudu/common/rowid.h"
#include "kudu/common/timestamp.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tablet/delta_key.h"
#include "kudu/tablet/rowset.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
namespace kudu {
class Arena;
class ColumnBlock;
class ScanSpec;
class Schema;
class SelectionVector;
struct ColumnId;
namespace fs {
struct IOContext;
} // namespace fs
namespace tablet {
class DeltaFileWriter;
class DeltaIterator;
class DeltaStats;
class Mutation;
// Tracks deltas that have been selected by PreparedDeltas::SelectDeltas.
//
// May track deltas belonging to a single delta store, or to multiple stores
// whose SelectedDeltas have been merged together.
class SelectedDeltas {
public:
SelectedDeltas() = default;
// Equivalent to calling:
//
// SelectedDeltas sd;
// sd.Reset(nrows);
explicit SelectedDeltas(size_t nrows);
// Converts the selected deltas into a simpler SelectionVector.
void ToSelectionVector(SelectionVector* sel_vec) const;
// Returns a textual representation suitable for debugging.
std::string ToString() const;
private:
template<class Traits>
friend class DeltaPreparer;
// Mutation that has met the 'select' criteria in a delta store.
struct Delta {
// Key fields.
// The delta's timestamp.
Timestamp ts;
// Whether this delta was an UNDO or a REDO.
DeltaType dtype;
// It's possible for multiple UNDOs or REDOs in the same delta store to
// share a common timestamp. To ensure a total ordering, this additional key
// field reflects the logical ordering of such deltas.
//
// For example, consider the sequence of REDOs:
// D1: @ts10 UPDATE key=1
// D2: @ts10 DELETE key=1
//
// D1 and D2 are identical as far as 'ts' and 'dtype' are concerned, so D1's
// disambiguator must be less than that of D2.
int64_t disambiguator;
// Non-key fields.
// Whether this delta was an UPDATE, DELETE, or REINSERT.
RowChangeList::ChangeType ctype;
};
// Tracks the oldest and newest deltas for a given row.
//
// When there's only one delta, 'oldest' and 'newest' are equal and
// 'same_delta' is true. Otherwise, oldest is guaranteed to be less than
// newest as per the rules defined in DeltaLessThanFunctor.
//
// Most of the time "oldest" and "newest" is determined purely by timestamp,
// but some deltas can share timestamps, in which case additional rules are
// used to maintain a total ordering.
struct DeltaPair {
Delta oldest;
Delta newest;
bool same_delta;
};
// Comparator that establishes a total ordering amongst Deltas for the same row.
struct DeltaLessThanFunctor {
bool operator() (const Delta& a, const Delta& b) const {
// Most of the time, deltas are ordered using timestamp.
if (PREDICT_TRUE(a.ts != b.ts)) {
return a.ts < b.ts;
}
// If the timestamps match, we can order by observing that UNDO < REDO, an
// invariant that is preserved inside of a rowset.
if (a.dtype != b.dtype) {
return a.dtype == UNDO;
}
// The timestamps and delta types match. It should only be possible to get
// here if we're comparing deltas from within the same store, in which
// case the disambiguators must not match.
if (PREDICT_TRUE(a.disambiguator != b.disambiguator)) {
return a.disambiguator < b.disambiguator;
}
LOG(FATAL) << strings::Substitute("Could not differentiate between two deltas "
"ts=$0, type=$1, disambiguator=$2", a.ts.ToString(), a.dtype, a.disambiguator);
}
};
// Merges two SelectedDeltas together on a row-by-row basis.
void MergeFrom(const SelectedDeltas& other);
// Considers a new delta, possibly adding it to 'rows_'.
void ProcessDelta(rowid_t row_idx, Delta new_delta);
// Clears out 'rows_' and makes it suitable for handling 'nrows'.
void Reset(size_t nrows);
// All tracked deltas, indexed by row ordinal.
//
// If an element is std::nullopt, there are no deltas for that row.
std::vector<std::optional<DeltaPair>> rows_;
};
// Interface for the pieces of the system that track deltas/updates.
// This is implemented by DeltaMemStore and by DeltaFileReader.
class DeltaStore {
public:
// Performs any post-construction work for the DeltaStore, which may
// include additional I/O.
virtual Status Init(const fs::IOContext* io_context) = 0;
// Whether this delta store was initialized or not.
virtual bool Initted() const = 0;
// Create a DeltaIterator for the given projection.
//
// The projection in 'opts' corresponds to whatever scan is currently ongoing.
// All RowBlocks passed to this DeltaIterator must have this same schema.
//
// The snapshot in 'opts' is the MVCC state which determines which ops should
// be considered committed (and thus applied by the iterator).
//
// Returns Status::OK and sets 'iterator' to the new DeltaIterator, or
// returns Status::NotFound if the mutations within this delta store
// cannot include the snapshot.
virtual Status NewDeltaIterator(const RowIteratorOptions& opts,
std::unique_ptr<DeltaIterator>* iterator) const = 0;
// Set *deleted to true if the latest update for the given row is a deletion.
virtual Status CheckRowDeleted(rowid_t row_idx, const fs::IOContext* io_context,
bool *deleted) const = 0;
// Get the store's estimated size in bytes.
virtual uint64_t EstimateSize() const = 0;
virtual std::string ToString() const = 0;
// TODO(jdcryans): remove this once we don't need to have delta_stats for
// both DMS and DFR. Currently DeltaTracker#GetColumnsIdxWithUpdates() needs
// to filter out DMS from the redo list but it can't without RTTI.
virtual const DeltaStats& delta_stats() const = 0;
// Returns whether callers can use 'delta_stats()', either because they've
// been read from disk, or because the store has been initialized with the
// stats cached.
virtual bool has_delta_stats() const {
return Initted();
}
virtual ~DeltaStore() {}
};
typedef std::vector<std::shared_ptr<DeltaStore>> SharedDeltaStoreVector;
// Iterator over deltas.
// For each rowset, this iterator is constructed alongside the base data iterator,
// and used to apply any updates which haven't been yet compacted into the base
// (i.e. those edits in the DeltaMemStore or in delta files)
//
// Typically this is used as follows:
//
// Open iterator, seek to particular point in file
// RowBlock rowblock;
// foreach RowBlock in base data {
// clear row block
// CHECK_OK(iter->PrepareBatch(rowblock.size()));
// ... read column 0 from base data into row block ...
// CHECK_OK(iter->ApplyUpdates(0, rowblock.column(0))
// ... check predicates for column ...
// ... read another column from base data...
// CHECK_OK(iter->ApplyUpdates(1, rowblock.column(1)))
// ...
// }
struct DeltaKeyAndUpdate {
DeltaKey key;
Slice cell;
// Stringifies this DeltaKeyAndUpdate, according to 'schema'.
//
// If 'pad' is true, pads the delta row ids and txn ids in the output so that we can
// compare two stringified representations and obtain the same result as comparing the DeltaKey
// itself. That is, if 'pad' is true, then DeltaKey a < DeltaKey b => Stringify(a) < Stringify(b).
std::string Stringify(DeltaType type, const Schema& schema, bool pad_key = false) const;
};
// Representation of deltas that have been "prepared" by an iterator. That is,
// they have been consistently read (at a snapshot) from their backing store
// into an in-memory format suitable for efficient retrieval.
class PreparedDeltas {
public:
// Applies the snapshotted updates to one of the columns.
//
// 'dst' must be the same length as was previously passed to PrepareBatch().
//
// Updates belonging to unselected rows in 'filter' will be skipped. This is
// intended as an optimization; if the caller knows with certainty that some
// rows are irrelevant (e.g. they've been deleted), we can avoid some copying.
//
// Deltas must have been prepared with the flag PREPARE_FOR_APPLY.
virtual Status ApplyUpdates(size_t col_to_apply, ColumnBlock* dst,
const SelectionVector& filter) = 0;
// Applies any deletes to the given selection vector.
//
// Rows which have been deleted in the associated MVCC snapshot are set to 0
// in the selection vector so that they don't show up in the output.
//
// Deltas must have been prepared with the flag PREPARE_FOR_APPLY.
virtual Status ApplyDeletes(SelectionVector* sel_vec) = 0;
// Modifies the given SelectedDeltas to include rows with relevant deltas from
// the current prepared batch.
//
// Deltas must have been prepared with the flag PREPARE_FOR_SELECT.
virtual Status SelectDeltas(SelectedDeltas* deltas) = 0;
// Collects the mutations associated with each row in the current prepared batch.
//
// Each entry in the vector will be treated as a singly linked list of Mutation
// objects. If there are no mutations for that row, the entry will be unmodified.
// If there are mutations, they will be prepended at the head of the linked list
// (i.e the resulting list will be in descending timestamp order)
//
// The Mutation objects will be allocated out of the provided Arena, which must be non-NULL.
//
// Deltas must have been prepared with the flag PREPARE_FOR_COLLECT.
virtual Status CollectMutations(std::vector<Mutation*>* dst, Arena* arena) = 0;
// Iterates through all deltas, adding deltas for columns not specified in
// 'col_ids' to 'out'.
//
// Unlike CollectMutations, the iterator's MVCC snapshots are ignored; all
// deltas are considered relevant.
//
// The delta objects will be allocated out the provided Arena, which must be non-NULL.
//
// Deltas must have been prepared with the flag PREPARE_FOR_COLLECT.
virtual Status FilterColumnIdsAndCollectDeltas(const std::vector<ColumnId>& col_ids,
std::vector<DeltaKeyAndUpdate>* out,
Arena* arena) = 0;
// Returns true if there might exist deltas to be applied. It is safe to
// conservatively return true, but this would force a skip over decoder-level
// evaluation.
//
// Deltas must have been prepared with the flag PREPARE_FOR_APPLY.
virtual bool MayHaveDeltas() const = 0;
};
class DeltaIterator : public PreparedDeltas {
public:
// Initialize the iterator. This must be called once before any other
// call.
virtual Status Init(ScanSpec *spec) = 0;
// Seek to a particular ordinal position in the delta data. This cancels any prepared
// block, and must be called at least once prior to PrepareBatch().
virtual Status SeekToOrdinal(rowid_t idx) = 0;
// Prepare to apply deltas to a block of rows. This takes a consistent snapshot
// of all updates to the next 'nrows' rows, so that subsequent calls to a
// PreparedDeltas method will not cause any "tearing"/non-atomicity.
//
// 'prepare_flags' is a bitfield describing what operation(s) the batch will
// be used for; some implementations may choose to prepare differently.
// PREPARE_NONE is an invalid value; it is only used internally.
//
// Each time this is called, the iterator is advanced by the full length
// of the previously prepared block.
enum {
// There are no prepared blocks. Attempts to call a PreparedDeltas function
// will fail.
PREPARE_NONE = 0,
// Prepare a batch of deltas for applying. All deltas in the batch will be
// decoded. Operations affecting row data (i.e. UPDATEs and REINSERTs) will
// be coalesced into a column-major data structure suitable for
// ApplyUpdates. Operations affecting row lifecycle (i.e. DELETEs and
// REINSERTs) will be coalesced into a row-major data structure suitable for ApplyDeletes.
//
// On success, ApplyUpdates and ApplyDeltas will be callable.
PREPARE_FOR_APPLY = 1 << 0,
// Prepare a batch of deltas for collecting. Deltas will remain encoded and
// in the order that they were loaded from the backing store.
//
// On success, CollectMutations and FilterColumnIdsAndCollectDeltas will be callable.
PREPARE_FOR_COLLECT = 1 << 1,
// Prepare a batch of deltas for selecting. All deltas in the batch will be
// decoded, and a data structure describing which rows had deltas will be
// populated.
//
// On success, SelectUpdates will be callable.
PREPARE_FOR_SELECT = 1 << 2
};
virtual Status PrepareBatch(size_t nrows, int prepare_flags) = 0;
// Returns true if there are any more rows left in this iterator.
virtual bool HasNext() const = 0;
// Return a string representation suitable for debug printouts.
virtual std::string ToString() const = 0;
// Returns the number of deltas that have been selected so far. For iterators
// that iterate across multiple DeltaIterators, this value is useful in
// defining a total ordering for deltas across all sub-iterators, e.g. by
// defining the starting point of each sub-iterator's DeltaPreparer's
// disambiguator.
//
// See SelectedDeltas::Delta for more details.
virtual int64_t deltas_selected() const = 0;
// Sets the number of deltas selected by the preparer so far, used to define
// a starting point of a disambiguator between deltas in iterators that
// iterate across multiple DeltaIterators. Callers should only call this with
// a monotonically increasing value.
//
// See SelectedDeltas::Delta for more details.
virtual void set_deltas_selected(int64_t deltas_selected) = 0;
virtual ~DeltaIterator() {}
};
// DeltaPreparer traits suited for a DMSIterator.
struct DMSPreparerTraits {
static constexpr DeltaType kType = REDO;
static constexpr bool kAllowReinserts = false;
static constexpr bool kAllowFilterColumnIdsAndCollectDeltas = false;
static constexpr bool kInitializeDecodersWithSafetyChecks = false;
};
// DeltaPreparer traits suited for a DeltaFileIterator.
//
// This is just a partial specialization; the DeltaFileIterator is expected to
// dictate the DeltaType.
template<DeltaType Type>
struct DeltaFilePreparerTraits {
static constexpr DeltaType kType = Type;
static constexpr bool kAllowReinserts = true;
static constexpr bool kAllowFilterColumnIdsAndCollectDeltas = true;
static constexpr bool kInitializeDecodersWithSafetyChecks = true;
};
// Encapsulates all logic and responsibility related to "delta preparation";
// that is, the transformation of encoded deltas into an in-memory
// representation more suitable for efficient service during iteration.
//
// This class is intended to be composed inside a DeltaIterator. The iterator
// is responsible for loading encoded deltas from a backing store, passing them
// to the DeltaPreparer to be transformed, and later, calling the DeltaPreparer
// to serve the deltas.
template<class Traits>
class DeltaPreparer : public PreparedDeltas {
public:
explicit DeltaPreparer(RowIteratorOptions opts);
// Updates internal state to reflect a seek performed by a DeltaIterator.
//
// Call upon completion of DeltaIterator::SeekToOrdinal.
void Seek(rowid_t row_idx);
// Updates internal state to reflect the beginning of delta batch preparation
// on the part of a DeltaIterator.
//
// Call at the beginning of DeltaIterator::PrepareBatch.
void Start(size_t nrows, int prepare_flags);
// Updates internal state to reflect the end of delta batch preparation on the
// part of a DeltaIterator.
//
// Call at the end of DeltaIterator::PrepareBatch.
void Finish(size_t nrows);
// Prepares the delta given by 'key' whose encoded changes are pointed to by 'val'.
//
// On success, the memory pointed to by 'val' can be destroyed. The
// 'finished_row' output parameter will be set if we can determine that all
// future deltas belonging to 'key.row_idx()' are irrelevant under the
// snapshot provided at preparer construction time; the caller can skip ahead
// to deltas belonging to the next row.
//
// Call when a new delta becomes available in DeltaIterator::PrepareBatch.
Status AddDelta(const DeltaKey& key, Slice val, bool* finished_row);
Status ApplyUpdates(size_t col_to_apply, ColumnBlock* dst,
const SelectionVector& filter) override;
Status ApplyDeletes(SelectionVector* sel_vec) override;
Status SelectDeltas(SelectedDeltas* deltas) override;
Status CollectMutations(std::vector<Mutation*>* dst, Arena* arena) override;
Status FilterColumnIdsAndCollectDeltas(const std::vector<ColumnId>& col_ids,
std::vector<DeltaKeyAndUpdate>* out,
Arena* arena) override;
bool MayHaveDeltas() const override;
rowid_t cur_prepared_idx() const { return cur_prepared_idx_; }
std::optional<rowid_t> last_added_idx() const { return last_added_idx_; }
const RowIteratorOptions& opts() const { return opts_; }
int64_t deltas_selected() const {
return deltas_selected_;
}
void set_deltas_selected(int64_t deltas_selected) {
// NOTE: it's possible that the iterator's number of 'deltas_selected'
// hasn't changed if no deltas were selected since last being called, hence
// the DCHECK_GE check instead of DCHECK_GT.
DCHECK_GE(deltas_selected, deltas_selected_);
deltas_selected_ = deltas_selected;
}
private:
// If 'decoder' is not yet initialized, initializes it in accordance with the
// preparer's traits.
static Status InitDecoderIfNecessary(RowChangeListDecoder* decoder);
// Checks whether we are done processing a row's deltas. If so, attempts to
// convert the row's latest deletion state into a saved deletion or
// reinsertion. By deferring this work to when a row is finished, we avoid
// creating unnecessary deletions and reinsertions for rows that are
// repeatedly deleted and reinserted.
//
// 'cur_row_idx' may be unset when there is no new row index, such as when
// called upon completion of an entire batch of deltas (i.e. from Finish()).
void MaybeProcessPreviousRowChange(std::optional<rowid_t> cur_row_idx);
// Update the deletion state of the current row being processed based on 'op'.
void UpdateDeletionState(RowChangeList::ChangeType op);
// Options with which the DeltaPreparer's iterator was constructed.
const RowIteratorOptions opts_;
// The row index at which the most recent batch preparation ended.
rowid_t cur_prepared_idx_;
// The value of 'cur_prepared_idx_' from the previous batch.
rowid_t prev_prepared_idx_;
// The index of the row last added in AddDelta(), if one exists.
std::optional<rowid_t> last_added_idx_;
// Whether there are any prepared blocks.
int prepared_flags_;
// State when prepared_flags_ & PREPARED_FOR_APPLY
// ------------------------------------------------------------
struct ColumnUpdate {
rowid_t row_id;
void* new_val_ptr;
uint8_t new_val_buf[16];
};
typedef std::deque<ColumnUpdate> UpdatesForColumn;
std::vector<UpdatesForColumn> updates_by_col_;
// A row whose last relevant mutation was DELETE (or REINSERT).
//
// These lists are disjoint; a row that was both deleted and reinserted will
// not be in either.
std::vector<rowid_t> deleted_;
std::vector<rowid_t> reinserted_;
// The deletion state of the row last processed by AddDelta().
//
// As a row's DELETEs and REINSERTs are processed, the deletion state
// alternates between the values below.
enum RowDeletionState {
UNKNOWN,
DELETED,
REINSERTED
};
RowDeletionState deletion_state_;
// State when prepared_flags_ & PREPARED_FOR_COLLECT
// ------------------------------------------------------------
struct PreparedDelta {
DeltaKey key;
Slice val;
};
std::vector<PreparedDelta> prepared_deltas_;
// State when prepared_for_ & PREPARED_FOR_SELECT
// ------------------------------------------------------------
SelectedDeltas selected_;
// The number of deltas selected so far by this DeltaPreparer. Used to build
// disambiguators (see SelectedDeltas::Delta).
int64_t deltas_selected_;
// Used for PREPARED_FOR_APPLY mode.
//
// Set to true in all of the spots where deleted_, reinserted_, and updates_by_col_
// are modified.
bool may_have_deltas_;
DISALLOW_COPY_AND_ASSIGN(DeltaPreparer);
};
enum { ITERATE_OVER_ALL_ROWS = 0 };
// Dumps contents of 'iter' to 'out', line-by-line. Used to unit test
// minor delta compaction.
//
// If 'nrows' is ITERATE_OVER_ALL_ROWS, all rows will be dumped.
Status DebugDumpDeltaIterator(DeltaType type,
DeltaIterator* iter,
const Schema& schema,
size_t nrows,
std::vector<std::string>* out);
// Writes the contents of 'iter' to 'out', block by block. Used by
// minor delta compaction.
//
// If 'nrows' is ITERATE_OVER_ALL_ROWS, all rows will be dumped.
template<DeltaType Type>
Status WriteDeltaIteratorToFile(DeltaIterator* iter,
size_t nrows,
DeltaFileWriter* out);
} // namespace tablet
} // namespace kudu