blob: 1ca5a1a3fe422e348d30b26979ca7e966c3c5a60 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <limits.h>
#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <functional>
#include <map>
#include <memory>
#include <ostream>
#include <set>
#include <string>
#include <type_traits>
#include <unordered_set>
#include <utility>
#include <vector>
#include <boost/optional/optional.hpp>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/cfile/cfile_util.h"
#include "kudu/common/columnblock.h"
#include "kudu/common/columnblock-test-util.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/iterator.h"
#include "kudu/common/row.h"
#include "kudu/common/row_changelist.h"
#include "kudu/common/rowblock.h"
#include "kudu/common/rowid.h"
#include "kudu/common/schema.h"
#include "kudu/common/timestamp.h"
#include "kudu/consensus/log_anchor_registry.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/fs/block_id.h"
#include "kudu/fs/block_manager.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/strcat.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tablet/delta_key.h"
#include "kudu/tablet/delta_stats.h"
#include "kudu/tablet/delta_store.h"
#include "kudu/tablet/deltafile.h"
#include "kudu/tablet/deltamemstore.h"
#include "kudu/tablet/mutation.h"
#include "kudu/tablet/mvcc.h"
#include "kudu/tablet/ops/alter_schema_op.h"
#include "kudu/tablet/rowset.h"
#include "kudu/tablet/tablet-harness.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_metadata.h"
#include "kudu/tserver/tserver_admin.pb.h"
#include "kudu/util/faststring.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/memory/arena.h"
#include "kudu/util/random.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
using strings::Substitute;
namespace kudu {
namespace clock {
class Clock;
} // namespace clock
namespace tablet {
class RowSetMetadata;
class KuduTabletTest : public KuduTest {
explicit KuduTabletTest(const Schema& schema,
TabletHarness::Options::ClockType clock_type =
: schema_(schema.CopyWithColumnIds()),
clock_type_(clock_type) {
virtual void SetUp() OVERRIDE {
void CreateTestTablet(const std::string& root_dir = "") {
std::string dir = root_dir.empty() ? GetTestPath("fs_root") : root_dir;
TabletHarness::Options opts(dir);
opts.clock_type = clock_type_;
bool first_time = harness_ == nullptr;
harness_.reset(new TabletHarness(schema_, opts));
void SetUpTestTablet(const std::string& root_dir = "") {
void TabletReOpen(const std::string& root_dir = "") {
const Schema &schema() const {
return schema_;
const Schema &client_schema() const {
return client_schema_;
clock::Clock* clock() {
return harness_->clock();
FsManager* fs_manager() {
return harness_->fs_manager();
void AlterSchema(const Schema& schema,
boost::optional<TableExtraConfigPB> extra_config = boost::none) {
tserver::AlterSchemaRequestPB req;
req.set_schema_version(tablet()->metadata()->schema_version() + 1);
if (extra_config) {
*(req.mutable_new_extra_config()) = *extra_config;
AlterSchemaOpState op_state(nullptr, &req, nullptr);
ASSERT_OK(tablet()->CreatePreparedAlterSchema(&op_state, &schema));
const std::shared_ptr<Tablet>& tablet() const {
return harness_->tablet();
Tablet* mutable_tablet() {
return harness_->mutable_tablet();
TabletHarness* harness() {
return harness_.get();
const Schema schema_;
const Schema client_schema_;
const TabletHarness::Options::ClockType clock_type_;
std::unique_ptr<TabletHarness> harness_;
class KuduRowSetTest : public KuduTabletTest {
explicit KuduRowSetTest(const Schema& schema)
: KuduTabletTest(schema) {
virtual void SetUp() OVERRIDE {
Status FlushMetadata() {
return tablet()->metadata()->Flush();
std::shared_ptr<RowSetMetadata> rowset_meta_;
// Iterate through the values without outputting them at the end
// This is strictly a measure of decoding and evaluating predicates
static inline Status SilentIterateToStringList(RowwiseIterator* iter,
int* fetched) {
const Schema& schema = iter->schema();
RowBlockMemory memory(1024);
RowBlock block(&schema, 100, &memory);
*fetched = 0;
while (iter->HasNext()) {
for (size_t i = 0; i < block.nrows(); i++) {
if (block.selection_vector()->IsRowSelected(i)) {
return Status::OK();
static inline Status IterateToStringList(RowwiseIterator* iter,
std::vector<std::string>* out,
int limit = INT_MAX) {
Schema schema = iter->schema();
RowBlockMemory memory(1024);
RowBlock block(&schema, 100, &memory);
int fetched = 0;
while (iter->HasNext() && fetched < limit) {
for (size_t i = 0; i < block.nrows() && fetched < limit; i++) {
if (block.selection_vector()->IsRowSelected(i)) {
return Status::OK();
// Performs snapshot reads, under each of the snapshots in 'snaps', and stores
// the results in 'collected_rows'.
static inline void CollectRowsForSnapshots(
Tablet* tablet,
const Schema& schema,
const std::vector<MvccSnapshot>& snaps,
std::vector<std::vector<std::string>* >* collected_rows) {
for (const MvccSnapshot& snapshot : snaps) {
DVLOG(1) << "Snapshot: " << snapshot.ToString();
RowIteratorOptions opts;
opts.projection = &schema;
opts.snap_to_include = snapshot;
std::unique_ptr<RowwiseIterator> iter;
ASSERT_OK(tablet->NewRowIterator(std::move(opts), &iter));
auto collector = new std::vector<std::string>();
ASSERT_OK(IterateToStringList(iter.get(), collector));
for (const auto& mrs : *collector) {
DVLOG(1) << "Got from MRS: " << mrs;
// Performs snapshot reads, under each of the snapshots in 'snaps', and verifies that
// the results match the ones in 'expected_rows'.
static inline void VerifySnapshotsHaveSameResult(
Tablet* tablet,
const Schema& schema,
const std::vector<MvccSnapshot>& snaps,
const std::vector<std::vector<std::string>* >& expected_rows) {
int idx = 0;
// Now iterate again and make sure we get the same thing.
for (const MvccSnapshot& snapshot : snaps) {
DVLOG(1) << "Snapshot: " << snapshot.ToString();
RowIteratorOptions opts;
opts.projection = &schema;
opts.snap_to_include = snapshot;
std::unique_ptr<RowwiseIterator> iter;
ASSERT_OK(tablet->NewRowIterator(std::move(opts), &iter));
std::vector<std::string> collector;
ASSERT_OK(IterateToStringList(iter.get(), &collector));
ASSERT_EQ(collector.size(), expected_rows[idx]->size());
for (int i = 0; i < expected_rows[idx]->size(); i++) {
DVLOG(1) << "Got from DRS: " << collector[i];
DVLOG(1) << "Expected: " << (*expected_rows[idx])[i];
ASSERT_EQ((*expected_rows[idx])[i], collector[i]);
// Constructs a new iterator for 'rs' with 'opts' and dumps all of its rows into 'out'.
// The previous contents of 'out' are cleared.
static inline Status DumpRowSet(const RowSet& rs,
const RowIteratorOptions& opts,
std::vector<std::string>* out) {
std::unique_ptr<RowwiseIterator> iter;
RETURN_NOT_OK(rs.NewRowIterator(opts, &iter));
RETURN_NOT_OK(IterateToStringList(iter.get(), out));
return Status::OK();
// Take an un-initialized iterator, Init() it, and iterate through all of its rows.
// The resulting string contains a line per entry.
static inline std::string InitAndDumpIterator(RowwiseIterator* iter) {
std::vector<std::string> out;
CHECK_OK(IterateToStringList(iter, &out));
return JoinStrings(out, "\n");
// Dump all of the rows of the tablet into the given vector.
static inline Status DumpTablet(const Tablet& tablet,
const Schema& projection,
std::vector<std::string>* out) {
std::unique_ptr<RowwiseIterator> iter;
RETURN_NOT_OK(tablet.NewRowIterator(projection, &iter));
std::vector<std::string> rows;
RETURN_NOT_OK(IterateToStringList(iter.get(), &rows));
std::sort(rows.begin(), rows.end());
return Status::OK();
// Write a single row to the given RowSetWriter (which may be of the rolling
// or non-rolling variety).
template<class RowSetWriterClass>
static Status WriteRow(const Slice &row_slice, RowSetWriterClass *writer) {
const Schema &schema = writer->schema();
DCHECK_EQ(row_slice.size(), schema.byte_size() +
RowBlock block(&schema, 1, nullptr);
ConstContiguousRow row(&schema,;
RowBlockRow dst_row = block.row(0);
RETURN_NOT_OK(CopyRow(row, &dst_row, static_cast<Arena*>(nullptr)));
return writer->AppendBlock(block, 1);
// Tracks encoded deltas and provides a DeltaIterator-like interface for
// querying them.
// This is effectively a poor man's DeltaMemStore, except it allows REINSERTs.
template<typename T>
class MirroredDeltas {
// Map of row index to map of timestamp to encoded delta.
// The inner map is sorted by an order that respects DeltaType. That is, REDO
// timestamps are sorted in ascending order, while UNDO timestamps are sorted
// in descending order.
using ComparatorType = typename std::conditional<T::kTag == REDO,
using MirroredDeltaTimestampMap = std::map<Timestamp, faststring, ComparatorType>;
using MirroredDeltaMap = std::map<rowid_t, MirroredDeltaTimestampMap>;
explicit MirroredDeltas(const Schema* schema)
: schema_(schema),
arena_(1024) {
// Tracks a new key/delta pair. The key must not already have an associated
// encoded delta.
void AddEncodedDelta(const DeltaKey& k, const faststring& changes) {
auto& existing = all_deltas_[k.row_idx()][k.timestamp()];
DCHECK_EQ(0, existing.length());
existing.assign_copy(, changes.length());
// Returns true if all tracked deltas are irrelevant to 'ts', false otherwise.
bool CheckAllDeltasCulled(const boost::optional<Timestamp>& lower_ts,
Timestamp upper_ts) const {
for (const auto& e1 : all_deltas_) {
for (const auto& e2 : e1.second) {
bool relevant = IsDeltaRelevantForApply(upper_ts, e2.first);
if (lower_ts) {
relevant |= IsDeltaRelevantForSelect(*lower_ts, upper_ts, e2.first);
if (relevant) {
return false;
return true;
// Returns a string representation of all tracked deltas.
std::string ToString() const {
std::string s;
bool append_delim = false;
for (const auto& e1 : all_deltas_) {
for (const auto& e2 : e1.second) {
if (append_delim) {
StrAppend(&s, "\n");
} else {
append_delim = true;
StrAppend(&s, e1.first);
StrAppend(&s, ",");
StrAppend(&s, e2.first.ToString());
StrAppend(&s, ": ");
StrAppend(&s, RowChangeList(e2.second).ToString(*schema_));
return s;
// Applies tracked UPDATE and REINSERT values to 'cb'.
// Rows not set in 'filter' are skipped.
// Deltas not relevant to 'lower_ts' or 'upper_ts' are skipped. The set of
// rows considered is determined by 'start_row_idx' and the number of rows in 'cb'.
Status ApplyUpdates(const Schema& projection,
const boost::optional<Timestamp>& lower_ts, Timestamp upper_ts,
rowid_t start_row_idx, int col_idx, ColumnBlock* cb,
const SelectionVector& filter) {
if (VLOG_IS_ON(3)) {
std::string lower_ts_str = lower_ts ? lower_ts->ToString() : "INF";
VLOG(3) << Substitute("Begin applying for timestamps [$0, $1)",
lower_ts_str, upper_ts.ToString());
for (int i = 0; i < cb->nrows(); i++) {
rowid_t row_idx = start_row_idx + i;
if (lower_ts) {
// First pass: establish whether this row should be applied at all.
bool at_least_one_delta_relevant = false;
for (const auto& e : all_deltas_[row_idx]) {
bool is_relevant = IsDeltaRelevantForSelect(*lower_ts, upper_ts, e.first);
if (VLOG_IS_ON(3)) {
RowChangeList changes(e.second);
VLOG(3) << "Row " << i << " ts " << e.first << " (relevant: "
<< is_relevant << "): " << changes.ToString(*schema_);
if (is_relevant) {
at_least_one_delta_relevant = true;
if (!at_least_one_delta_relevant) {
// Not one delta was relevant; skip the row.
// Second pass: apply all relevant deltas.
for (const auto& e : all_deltas_[row_idx]) {
if (!IsDeltaRelevantForApply(upper_ts, e.first)) {
// No need to keep iterating; all future deltas for this row will also
// be irrelevant.
if (!filter.IsRowSelected(i)) {
RowChangeList changes(e.second);
if (changes.is_delete()) {
RowChangeListDecoder decoder(changes);
RETURN_NOT_OK(decoder.ApplyToOneColumn(i, cb, projection, col_idx, &arena_));
return Status::OK();
// Applies deletions to 'sel_vec' by unselecting all rows whose last tracked
// delta is a DELETE.
// Deltas not relevant to 'ts' are skipped. The set of rows considered is
// determined by 'start_row_idx' and the number of rows in 'sel_vec'.
Status ApplyDeletes(Timestamp ts, rowid_t start_row_idx, SelectionVector* sel_vec) {
for (int i = 0; i < sel_vec->nrows(); i++) {
bool deleted = false;
for (const auto& e : all_deltas_[start_row_idx + i]) {
if (!IsDeltaRelevantForApply(ts, e.first)) {
// No need to keep iterating; all future deltas for this row will also
// be irrelevant.
RowChangeList changes(e.second);
RowChangeListDecoder decoder(changes);
if (deleted) {
return Status::OK();
// Selects all rows in 'sel_vec' for which there exists a tracked delta.
// Deltas not relevant to 'lower_ts' or 'upper_ts' are skipped. The set of
// rows considered is determined by 'start_row_idx' and the number of rows in 'sel_vec'.
void SelectDeltas(Timestamp lower_ts, Timestamp upper_ts,
rowid_t start_row_idx, SelectionVector* sel_vec) {
for (int i = 0; i < sel_vec->nrows(); i++) {
boost::optional<const typename MirroredDeltaTimestampMap::mapped_type&> first;
boost::optional<const typename MirroredDeltaTimestampMap::mapped_type&> last;
for (const auto& e : all_deltas_[start_row_idx + i]) {
if (!IsDeltaRelevantForSelect(lower_ts, upper_ts, e.first)) {
// Must keep iterating; short-circuit out of the select criteria is
// complex and not worth using in test code.
if (!first.is_initialized()) {
first = e.second;
last = e.second;
// No relevant deltas.
if (!first) {
// One relevant delta.
if (first == last) {
// At least two relevant deltas.
bool first_liveness;
RowChangeList changes(*first);
RowChangeListDecoder decoder(changes);
first_liveness = !decoder.is_reinsert();
bool last_liveness;
RowChangeList changes(*last);
RowChangeListDecoder decoder(changes);
last_liveness = !decoder.is_delete();
if (!first_liveness && !last_liveness) {
} else {
// Transforms and writes deltas into 'deltas', a vector of "delta lists", each
// of which represents a particular row, and each entry of which is a
// Timestamp and encoded delta pair. The encoded delta is a Slice into
// all_deltas_ and should not outlive this class instance.
// Deltas not relevant to 'ts' are skipped. The set of rows considered is
// determined by 'start_row_idx' and the number of rows in 'deltas'.
using DeltaList = std::vector<std::pair<Timestamp, Slice>>;
void CollectMutations(Timestamp ts, rowid_t start_row_idx,
std::vector<DeltaList>* deltas) {
for (int i = 0; i < deltas->size(); i++) {
for (const auto& e : all_deltas_[start_row_idx + i]) {
if (!IsDeltaRelevantForApply(ts, e.first)) {
// No need to keep iterating; all future deltas for this row will also
// be irrelevant.
(*deltas)[i].emplace_back(e.first, Slice(e.second));
// Transforms and writes deltas into 'deltas', a vector of delta key to encoded
// delta pairs. The encoded delta is a Slice into this class instance's arena
// and should not outlive it.
// Notably, this function does not compare timestamps; all deltas for the
// requested rows are returned. The set of rows considered is determined by
// 'start_row_idx' and 'num_rows'.
Status FilterColumnIdsAndCollectDeltas(rowid_t start_row_idx, size_t num_rows,
const std::vector<ColumnId>& col_ids,
std::vector<std::pair<DeltaKey, Slice>>* deltas) {
faststring buf;
RowChangeListEncoder encoder(&buf);
for (int i = 0; i < num_rows; i++) {
rowid_t row_idx = start_row_idx + i;
for (const auto& e : all_deltas_[row_idx]) {
RowChangeList(e.second), col_ids, &encoder));
if (encoder.is_initialized()) {
RowChangeList changes = encoder.as_changelist();
Slice relocated;
CHECK(arena_.RelocateSlice(changes.slice(), &relocated));
deltas->emplace_back(DeltaKey(row_idx, e.first), relocated);
return Status::OK();
const MirroredDeltaMap& all_deltas() const { return all_deltas_; }
const Schema* schema() const { return schema_; }
// Returns true if 'ts' is relevant to 'to_include', false otherwise.
bool IsDeltaRelevantForApply(Timestamp to_include, Timestamp ts) const;
// Returns true if 'ts' is relevant with respect to both 'to_exclude' and
// 'to_include', false otherwise.
bool IsDeltaRelevantForSelect(Timestamp to_exclude,
Timestamp to_include,
Timestamp ts) const;
// All encoded deltas, arranged in DeltaKey order.
MirroredDeltaMap all_deltas_;
// Schema of all encoded deltas.
const Schema* schema_;
// Arena used for allocations in ApplyUpdates and FilterColumnIdsAndCollectDeltas.
Arena arena_;
// Returns a sequence of randomly chosen positive integers with the following properties:
// 1. All integers are in the range [0, 'max_integer').
// 2. The number of entries is equal to 'num_integers', or 'max_integer' if
// 'num_integers' is greater than the size of the range in #1.
// 3. An integer cannot repeat.
// 4. The integers are in ascending order.
static inline std::vector<size_t> GetRandomIntegerSequence(
Random* prng,
size_t max_integer,
size_t num_integers) {
// Clamp the length of the sequence in case the max is too low to supply the
// desired number of integers.
size_t num_integers_clamped = std::min(max_integer, num_integers);
// Pick some integers.
// We use an ordered set so that the sequence is in ascending order.
std::set<size_t> integers;
do {
} while (integers.size() < num_integers_clamped);
return std::vector<size_t>(integers.begin(), integers.end());
// Generates random deltas conforming to 'schema' and stores them in 'mirror'.
// 'row_range' and 'ts_range' constrain the DeltaKeys used in the created deltas.
// If 'allow_reinserts' is true, REINSERT deltas may also be generated.
// Otherwise, a row won't receive any more deltas after a DELETE has been
// generated for it.
template <typename T>
void CreateRandomDeltas(const Schema& schema,
Random* prng,
int num_deltas,
std::pair<rowid_t, rowid_t> row_range,
std::pair<uint64_t, uint64_t> ts_range,
bool allow_reinserts,
MirroredDeltas<T>* mirror) {
DCHECK_GT(row_range.second, row_range.first);
DCHECK_GT(ts_range.second, ts_range.first);
// Randomly generate a set of delta keys, then sort them.
DeltaKeyEqualToFunctor<T::kTag>> keys;
int i = 0;
while (i < num_deltas) {
rowid_t row_idx = prng->Uniform(row_range.second - row_range.first) +
uint64_t ts_val = prng->Uniform(ts_range.second - ts_range.first) +
if (EmplaceIfNotPresent(&keys, row_idx, Timestamp(ts_val))) {
std::vector<DeltaKey> sorted_keys(keys.begin(), keys.end());
std::sort(sorted_keys.begin(), sorted_keys.end(),
// Randomly generate deltas using the keys.
// Because the timestamps are sorted in DeltaType order, we can track
// the deletion status of each row directly.
faststring buf;
RowChangeListEncoder encoder(&buf);
bool is_deleted = false;
auto prev_row_idx = boost::make_optional<rowid_t>(false, 0);
for (i = 0; i < sorted_keys.size(); i++) {
const auto& k = sorted_keys[i];
if (!prev_row_idx || prev_row_idx != k.row_idx()) {
// New row; reset the deletion status.
is_deleted = false;
prev_row_idx = k.row_idx();
if (is_deleted) {
// The row is deleted; we must REINSERT it.
RowBuilder rb(&schema);
for (int i = 0; i < schema.num_columns(); i++) {
is_deleted = false;
VLOG(3) << "REINSERT: " << k.row_idx() << "," << k.timestamp().ToString()
<< ": " << encoder.as_changelist().ToString(schema);
} else if (prng->Uniform(100) < 90 ||
(!allow_reinserts &&
i + 1 < sorted_keys.size() &&
k.row_idx() == sorted_keys[i + 1].row_idx())) {
// The row is live and we randomly chose to UPDATE it. Do so to a random
// assortment of columns.
// There's a special case here for when we chose to DELETE (see below) but
// we're not allowed to REINSERT: if this won't be the last delta for this
// row, we'll generate another UPDATE instead of the DELETE. This is
// because we've generated the keys up front; if we DELETE now and can't
// REINSERT, we'd have to discard the remaining keys for this row.
int num_cols_to_update = std::min(prng->Uniform(5) + 1UL,
auto idxs_to_update = GetRandomIntegerSequence(prng,
for (auto idx : idxs_to_update) {
// Pick a random value to assign to the UPDATE. NULL is an option if the
// schema supports it.
auto col_id = schema.column_id(idx);
const auto& col = schema.column(idx);
if (col.is_nullable() && prng->Uniform(10) == 0) {
encoder.AddColumnUpdate(col, col_id, nullptr);
} else {
uint32_t u32_val = prng->Next();
encoder.AddColumnUpdate(col, col_id, &u32_val);
VLOG(3) << "UPDATE: " << k.row_idx() << "," << k.timestamp().ToString()
<< ": " << encoder.as_changelist().ToString(schema);
} else {
// The row is live; DELETE it.
is_deleted = true;
VLOG(3) << "DELETE: " << k.row_idx() << "," << k.timestamp().ToString();
mirror->AddEncodedDelta(k, buf);
// Create a random projection conforming to 'schema'.
// 'max_cols_to_project' defines the maximum number of columns that should be
// allowed into the projection; the actual number of columns is randomly
// generated.
// If 'allow' is true, an IS_DELETED virtual column may be randomly added to
// the projection.
enum class AllowIsDeleted {
static inline Schema GetRandomProjection(const Schema& schema,
Random* prng,
size_t max_cols_to_project,
AllowIsDeleted allow) {
// Set up the projection.
auto idxs_to_project = GetRandomIntegerSequence(prng,
std::vector<ColumnSchema> projected_cols;
std::vector<ColumnId> projected_col_ids;
for (auto idx : idxs_to_project) {
// Add a IS_DELETED virtual column some of the time.
if (allow == AllowIsDeleted::YES && prng->Uniform(10) == 0) {
bool read_default = false;
projected_cols.emplace_back("is_deleted", IS_DELETED, /*is_nullable=*/ false,
projected_col_ids.emplace_back(schema.max_col_id() + 1);
return Schema(projected_cols, projected_col_ids, 0);
// Create a DMS and populate it with random deltas.
// 'num_deltas' dictates the number of deltas that should be created.
// 'row_range' and 'ts_range' constrain the DeltaKeys used in the created deltas.
// 'mirror' will be updated with all created deltas.
static inline Status CreateRandomDMS(
const Schema& schema,
Random* prng,
int num_deltas,
std::pair<rowid_t, rowid_t> row_range,
std::pair<uint64_t, uint64_t> ts_range,
MirroredDeltas<DeltaTypeSelector<REDO>>* mirror,
std::shared_ptr<DeltaMemStore>* dms) {
// Create a smattering of deltas in 'mirror'.
CreateRandomDeltas(schema, prng, num_deltas,
std::move(row_range), std::move(ts_range),
/*allow_reinserts=*/ false, mirror);
// Add them to the DMS.
std::shared_ptr<DeltaMemStore> local_dms;
0, 0, new log::LogAnchorRegistry(), MemTracker::GetRootTracker(),
consensus::OpId op_id(consensus::MaximumOpId());
for (const auto& e1 : mirror->all_deltas()) {
for (const auto& e2 : e1.second) {
DeltaKey k(e1.first, e2.first);
RowChangeList changes(e2.second);
RETURN_NOT_OK(local_dms->Update(e2.first, e1.first,
RowChangeList(e2.second), op_id));
*dms = std::move(local_dms);
return Status::OK();
// Create a delta file, populate it with random deltas, and return an opened
// DeltaFileReader for it.
// 'num_deltas' dictates the number of deltas that should be created.
// 'row_range' and 'ts_range' constrain the DeltaKeys used in the created deltas.
// 'mirror' will be updated with all created deltas.
template <typename T>
Status CreateRandomDeltaFile(const Schema& schema,
FsManager* fs_manager,
Random* prng,
int num_deltas,
std::pair<rowid_t, rowid_t> row_range,
std::pair<uint64_t, uint64_t> ts_range,
MirroredDeltas<T>* mirror,
std::shared_ptr<DeltaFileReader>* delta_reader) {
// Create a smattering of deltas in 'mirror'.
CreateRandomDeltas(schema, prng, num_deltas,
std::move(row_range), std::move(ts_range),
/*allow_reinserts=*/ true, mirror);
// Write them out to a delta file in order.
std::unique_ptr<fs::WritableBlock> wb;
RETURN_NOT_OK(fs_manager->CreateNewBlock({}, &wb));
BlockId block_id = wb->id();
std::unique_ptr<DeltaFileWriter> writer(new DeltaFileWriter(std::move(wb)));
std::unique_ptr<DeltaStats> stats(new DeltaStats);
for (const auto& e1 : mirror->all_deltas()) {
for (const auto& e2 : e1.second) {
DeltaKey k(e1.first, e2.first);
RowChangeList changes(e2.second);
RETURN_NOT_OK(writer->AppendDelta<T::kTag>(k, changes));
RETURN_NOT_OK(stats->UpdateStats(k.timestamp(), changes));
// Open a reader for this newly written delta file.
std::unique_ptr<fs::ReadableBlock> rb;
RETURN_NOT_OK(fs_manager->OpenBlock(block_id, &rb));
return DeltaFileReader::Open(std::move(rb), T::kTag,
cfile::ReaderOptions(), delta_reader);
// Fuzz tests a DeltaStore by generating a fairly random DeltaIterator, using it
// to retrieve deltas from 'store' via several DeltaIterator methods, and
// comparing those deltas with the ones found in 'mirror'. Assumes that both
// 'store' and 'mirror' have been initialized with the same logical deltas.
// 'ts_range' controls the timestamp range to be used by the iterator.
// If 'test_filter_column_ids_and_collect_deltas' is true, will test that
// DeltaIterator method too.
template <typename T>
void RunDeltaFuzzTest(const DeltaStore& store,
Random* prng,
MirroredDeltas<T>* mirror,
std::pair<uint64_t, uint64_t> ts_range,
bool test_filter_column_ids_and_collect_deltas) {
// Arbitrary constants to control the running time and coverage of the test.
const int kMaxBatchSize = 1000;
const int kNumScans = 100;
const int kMaxColsToProject = 10;
const int kMaxColsToFilter = 4;
// Run a series of tests on random timestamps as well as one scan on a
// snapshot for whom all deltas are relevant.
for (int i = 0; i < kNumScans + 1; i++) {
// Pick a timestamp for the iterator. The last iteration will use a snapshot
// that includes all deltas.
Timestamp upper_ts;
boost::optional<Timestamp> lower_ts;
if (i < kNumScans) {
uint64_t upper_ts_val = prng->Uniform(ts_range.second - ts_range.first) +
upper_ts = Timestamp(upper_ts_val);
// Use a lower bound in half the scans.
if (prng->Uniform(2)) {
uint64_t lower_ts_val = upper_ts_val > 0 ? prng->Uniform(upper_ts_val) : 0;
lower_ts = Timestamp(lower_ts_val);
} else if (T::kTag == REDO) {
upper_ts = Timestamp::kMax;
} else {
DCHECK(T::kTag == UNDO);
upper_ts = Timestamp::kMin;
// Create and initialize the iterator. If none iterator is returned, it's
// because all deltas in 'store' were irrelevant; verify this.
Schema projection = GetRandomProjection(*mirror->schema(), prng, kMaxColsToProject,
lower_ts ? AllowIsDeleted::YES :
SCOPED_TRACE(Substitute("Projection $0", projection.ToString()));
RowIteratorOptions opts;
opts.projection = &projection;
if (lower_ts) {
opts.snap_to_exclude = MvccSnapshot(*lower_ts);
opts.snap_to_include = MvccSnapshot(upper_ts);
SCOPED_TRACE(Substitute("Timestamps: [$0,$1)",
lower_ts ? lower_ts->ToString() : "INF",
std::unique_ptr<DeltaIterator> iter;
Status s = store.NewDeltaIterator(opts, &iter);
if (s.IsNotFound()) {
ASSERT_STR_CONTAINS(s.ToString(), "MvccSnapshot outside the range of this delta");
ASSERT_TRUE(mirror->CheckAllDeltasCulled(lower_ts, upper_ts));
// Run tests in batches, in case there's some bug related to batching.
rowid_t start_row_idx = 0;
while (iter->HasNext()) {
int batch_size = prng->Uniform(kMaxBatchSize) + 1;
SCOPED_TRACE(Substitute("batch starting at $0 ($1 rows)",
start_row_idx, batch_size));
int prepare_flags = DeltaIterator::PREPARE_FOR_APPLY |
if (lower_ts) {
prepare_flags |= DeltaIterator::PREPARE_FOR_SELECT;
ASSERT_OK(iter->PrepareBatch(batch_size, prepare_flags));
// Test SelectDeltas: the selection vector begins all false and a row is
// set if there is at least one relevant update for it.
// Note: we retain 'actual_selected' for use as a possible filter in the
// ApplyUpdates test below.
SelectionVector actual_selected(batch_size);
if (lower_ts) {
SelectionVector expected_selected(batch_size);
mirror->SelectDeltas(*lower_ts, upper_ts, start_row_idx, &expected_selected);
SelectedDeltas deltas(batch_size);
ASSERT_EQ(expected_selected, actual_selected)
<< "Expected selvec: " << expected_selected.ToString()
<< "\nActual selvec: " << actual_selected.ToString();
// Test ApplyDeletes: the selection vector is all true and a row is unset
// if the last relevant update deleted it.
// Note: we retain 'actual_deleted' for use as a possible filter in the
// ApplyUpdates test below.
SelectionVector actual_deleted(batch_size);
SelectionVector expected_deleted(batch_size);
ASSERT_OK(mirror->ApplyDeletes(upper_ts, start_row_idx, &expected_deleted));
ASSERT_EQ(expected_deleted, actual_deleted)
<< "Expected selvec: " << expected_deleted.ToString()
<< "\nActual selvec: " << actual_deleted.ToString();
// Test ApplyUpdates: all relevant updates are applied to the column block.
for (int j = 0; j < opts.projection->num_columns(); j++) {
SCOPED_TRACE(Substitute("Column $0", j));
bool col_is_nullable = opts.projection->column(j).is_nullable();
ScopedColumnBlock<UINT32> expected_scb(batch_size, col_is_nullable);
ScopedColumnBlock<UINT32> actual_scb(batch_size, col_is_nullable);
for (int k = 0; k < batch_size; k++) {
expected_scb[k] = 0;
actual_scb[k] = 0;
const SelectionVector& filter = lower_ts ? actual_selected : actual_deleted;
if (j == opts.projection->first_is_deleted_virtual_column_idx()) {
// Reconstruct the expected IS_DELETED state using 'actual_selected'
// and 'actual_deleted', which we've already verified above.
for (int k = 0; k < batch_size; k++) {
if (actual_selected.IsRowSelected(k)) {
expected_scb[k] = !actual_deleted.IsRowSelected(k);
} else {
ASSERT_OK(mirror->ApplyUpdates(*opts.projection, lower_ts, upper_ts,
start_row_idx, j, &expected_scb, filter));
ASSERT_OK(iter->ApplyUpdates(j, &actual_scb, filter));
ASSERT_EQ(expected_scb, actual_scb)
<< "Expected column block: " << expected_scb.ToString()
<< "\nActual column block: " << actual_scb.ToString();
// Test CollectMutations: all relevant updates are returned.
Arena arena(1024);
std::vector<typename MirroredDeltas<T>::DeltaList> expected_muts(batch_size);
std::vector<Mutation*> actual_muts(batch_size);
ASSERT_OK(iter->CollectMutations(&actual_muts, &arena));
mirror->CollectMutations(upper_ts, start_row_idx, &expected_muts);
for (int i = 0; i < expected_muts.size(); i++) {
const auto& expected = expected_muts[i];
auto* actual = actual_muts[i];
// Mutations from CollectMutations() are in the opposite timestamp
// order than what's needed for REDOs or UNDOs.
for (int j = 0; j < expected.size(); j++) {
ASSERT_EQ(expected[j].first, actual->timestamp());
ASSERT_EQ(expected[j].second, actual->changelist().slice());
actual = actual->next();
// Test FilterColumnIdsAndCollectDeltas with a random filter set.
// Note that this operation only works on a totally inclusive snapshot.
if (test_filter_column_ids_and_collect_deltas && i == kNumScans) {
// Create a sequence of column ids to filter.
auto idxs_to_filter = GetRandomIntegerSequence(prng,
std::vector<ColumnId> col_ids_to_filter(idxs_to_filter.size());
for (auto idx : idxs_to_filter) {
// Collect and filter, then compare the results.
Arena arena(1024);
std::vector<std::pair<DeltaKey, Slice>> expected_deltas;
std::vector<DeltaKeyAndUpdate> actual_deltas;
start_row_idx, batch_size, col_ids_to_filter, &expected_deltas));
col_ids_to_filter, &actual_deltas, &arena));
ASSERT_EQ(expected_deltas.size(), actual_deltas.size());
for (int j = 0; j < expected_deltas.size(); j++) {
actual_deltas[j].key) == 0);
ASSERT_EQ(expected_deltas[j].second, actual_deltas[j].cell);
start_row_idx += batch_size;
} // namespace tablet
} // namespace kudu