blob: 903bb0ef48a113eccf3d2d1b596267d35118053e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <algorithm>
#include <cstdint>
#include <cstdlib>
#include <memory>
#include <optional>
#include <string>
#include <thread>
#include <tuple>
#include <utility>
#include <vector>
#include <gtest/gtest.h>
#include "kudu/common/common.pb.h"
#include "kudu/common/iterator.h"
#include "kudu/common/scan_spec.h"
#include "kudu/common/schema.h"
#include "kudu/tablet/local_tablet_writer.h"
#include "kudu/tablet/mvcc.h"
#include "kudu/tablet/rowset.h"
#include "kudu/tablet/tablet-harness.h"
#include "kudu/tablet/tablet-test-base.h"
#include "kudu/tablet/tablet-test-util.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_metadata.h"
#include "kudu/util/monotime.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
using std::string;
using std::unique_ptr;
using std::vector;
namespace kudu {
namespace tablet {
class DiffScanTest : public TabletTestBase<IntKeyTestSetup<INT64>>,
public ::testing::WithParamInterface<std::tuple<OrderMode, bool>> {
public:
DiffScanTest()
: Superclass(TabletHarness::Options::ClockType::HYBRID_CLOCK) {}
private:
using Superclass = TabletTestBase<IntKeyTestSetup<INT64>>;
};
INSTANTIATE_TEST_SUITE_P(DiffScanModes, DiffScanTest,
::testing::Combine(
/*order_mode*/ ::testing::Values(UNORDERED, ORDERED),
/*include_deleted_rows*/ ::testing::Bool()));
TEST_P(DiffScanTest, TestDiffScan) {
OrderMode order_mode = std::get<0>(GetParam());
bool include_deleted_rows = std::get<1>(GetParam());
auto tablet = this->tablet();
auto tablet_id = tablet->tablet_id();
MvccSnapshot snap1(*tablet->mvcc_manager());
LocalTabletWriter writer(tablet.get(), &client_schema_);
constexpr int64_t kRowKey = 1;
ASSERT_OK(InsertTestRow(&writer, kRowKey, 1));
ASSERT_OK(tablet->Flush());
// 2. Delete the row and flush the DMS.
ASSERT_OK(DeleteTestRow(&writer, kRowKey));
ASSERT_OK(tablet->FlushAllDMSForTests());
// 3. Insert the same row key (with another value) and flush the MRS.
ASSERT_OK(InsertTestRow(&writer, kRowKey, 2));
ASSERT_OK(tablet->Flush());
// Ensure there is only 1 live row in the tablet (our reinsert).
vector<string> rows;
ASSERT_OK(DumpTablet(*tablet, tablet->schema()->CopyWithoutColumnIds(), &rows));
ASSERT_EQ(1, rows.size()) << "expected only one live row";
ASSERT_EQ("(int64 key=1, int32 key_idx=1, int32 val=2)", rows[0]);
// 4. Do a diff scan from time snap1.
ASSERT_OK(tablet->mvcc_manager()->WaitForApplyingOpsToApply());
MvccSnapshot snap2(*tablet->mvcc_manager());
RowIteratorOptions opts;
opts.snap_to_include = snap2;
opts.order = order_mode;
opts.include_deleted_rows = include_deleted_rows;
static const bool kIsDeletedDefault = false;
SchemaBuilder builder(*tablet->metadata()->schema());
if (order_mode == ORDERED) {
// Define our diff scan to start from snap1.
// NOTE: it isn't critical to set this given the default is -Inf, but it
// can't hurt to specify one, given we expect it to be the common case with
// the backup jobs.
opts.snap_to_exclude = snap1;
// The merge iterator requires an IS_DELETED column when including deleted
// rows in order to support deduplication of the rows.
ASSERT_OK(builder.AddColumn("deleted", IS_DELETED,
/*is_nullable=*/ false,
/*read_default=*/ &kIsDeletedDefault,
/*write_default=*/ nullptr));
}
Schema projection = builder.BuildWithoutIds();
opts.projection = &projection;
unique_ptr<RowwiseIterator> row_iterator;
ASSERT_OK(tablet->NewRowIterator(std::move(opts),
&row_iterator));
ASSERT_TRUE(row_iterator);
ScanSpec spec;
ASSERT_OK(row_iterator->Init(&spec));
ASSERT_OK(tablet::IterateToStringList(row_iterator.get(), &rows));
// In unordered mode, the union iterator will not deduplicate row keys.
// In ordered mode, the merge iterator will perform deduplication.
if (order_mode == UNORDERED) {
if (include_deleted_rows) {
// No de-dup.
ASSERT_EQ(2, rows.size());
// There is no guaranteed order of these results so get them in alpha order.
std::sort(rows.begin(), rows.end());
EXPECT_EQ("(int64 key=1, int32 key_idx=1, int32 val=1)", rows[0]);
EXPECT_EQ("(int64 key=1, int32 key_idx=1, int32 val=2)", rows[1]);
} else {
// There will only ever be a single live version of any one row.
ASSERT_EQ(1, rows.size());
EXPECT_EQ("(int64 key=1, int32 key_idx=1, int32 val=2)", rows[0]);
}
} else {
// De-dup, regardless of whether deleted rows are included or not.
ASSERT_EQ(1, rows.size());
EXPECT_EQ("(int64 key=1, int32 key_idx=1, int32 val=2, is_deleted deleted=false)", rows[0]);
}
}
class OrderedDiffScanWithDeletesTest : public TabletTestBase<IntKeyTestSetup<INT64>> {
public:
OrderedDiffScanWithDeletesTest()
: Superclass(TabletHarness::Options::ClockType::HYBRID_CLOCK) {}
private:
using Superclass = TabletTestBase<IntKeyTestSetup<INT64>>;
};
// Regression test for KUDU-3108, wherein running the merge iterator on
// overlapping rowsets could potentially lead to invalid memory access.
TEST_F(OrderedDiffScanWithDeletesTest, TestKudu3108) {
auto tablet = this->tablet();
auto tablet_id = tablet->tablet_id();
LocalTabletWriter writer(tablet.get(), &client_schema_);
ASSERT_OK(InsertTestRow(&writer, 1, 1));
ASSERT_OK(tablet->Flush());
MvccSnapshot snap1(*tablet->mvcc_manager());
ASSERT_OK(DeleteTestRow(&writer, 1));
ASSERT_OK(InsertTestRow(&writer, 3, 1));
ASSERT_OK(tablet->Flush());
ASSERT_OK(InsertTestRow(&writer, 1, 1));
ASSERT_OK(InsertTestRow(&writer, 0, 1));
ASSERT_OK(tablet->mvcc_manager()->WaitForApplyingOpsToApply());
MvccSnapshot snap2(*tablet->mvcc_manager());
RowIteratorOptions opts;
opts.snap_to_exclude = snap1;
opts.snap_to_include = snap2;
opts.order = ORDERED;
opts.include_deleted_rows = true;
static const bool kIsDeletedDefault = false;
SchemaBuilder builder(*tablet->metadata()->schema());
ASSERT_OK(builder.AddColumn("deleted", IS_DELETED,
/*is_nullable=*/ false,
/*read_default=*/ &kIsDeletedDefault,
/*write_default=*/ nullptr));
Schema projection = builder.BuildWithoutIds();
opts.projection = &projection;
// We should be able to iterate through the rows without issue.
unique_ptr<RowwiseIterator> row_iterator;
ASSERT_OK(tablet->NewRowIterator(std::move(opts),
&row_iterator));
ASSERT_TRUE(row_iterator);
ScanSpec spec;
ASSERT_OK(row_iterator->Init(&spec));
vector<string> rows;
ASSERT_OK(tablet::IterateToStringList(row_iterator.get(), &rows));
ASSERT_EQ(3, rows.size());
}
// Regression test for KUDU-3291, where doing a diff scan after a delta flush
// raced with a batch update to a single row could result in a crash.
TEST_F(OrderedDiffScanWithDeletesTest, TestDiffScanAfterDeltaFlushRacesWithBatchUpdate) {
auto tablet = this->tablet();
auto tablet_id = tablet->tablet_id();
LocalTabletWriter writer(tablet.get(), &client_schema_);
constexpr int64_t kRowKey = 1;
ASSERT_OK(InsertTestRow(&writer, kRowKey, 1));
MvccSnapshot snap1(*tablet->mvcc_manager());
// Start off with a DRS that we can add deltas to.
ASSERT_OK(tablet->Flush());
Status s;
// Update the same row several times, and concurrently delta flush. Inject a
// short, random sleep to encourage different sizes of delta stores. If
// implemented incorrectly, the DeltaIteratorMerger could crash, unable to
// disambiguate between rows of the delta stores being merged.
const auto& sleep_ms = rand() % 3000;
std::thread t([&] {
SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
s = tablet->FlushAllDMSForTests();
});
auto thread_joiner = MakeScopedCleanup([&] {
t.join();
});
ASSERT_OK(UpdateTestRow(&writer, kRowKey, 0, 10000));
t.join();
thread_joiner.cancel();
ASSERT_OK(s);
ASSERT_OK(tablet->mvcc_manager()->WaitForApplyingOpsToApply());
// Now perform a diff scan, which is an ordered scan with a start and end
// timestamp.
MvccSnapshot snap2(*tablet->mvcc_manager());
RowIteratorOptions opts;
opts.snap_to_exclude = snap1;
opts.snap_to_include = snap2;
opts.order = ORDERED;;
SchemaBuilder builder(*tablet->metadata()->schema());
Schema projection = builder.BuildWithoutIds();
opts.projection = &projection;
unique_ptr<RowwiseIterator> row_iterator;
ASSERT_OK(tablet->NewRowIterator(std::move(opts), &row_iterator));
ASSERT_TRUE(row_iterator);
// Regression test for KUDU-3291, iterating through the rows shouldn't result
// in a crash.
ScanSpec spec;
ASSERT_OK(row_iterator->Init(&spec));
vector<string> rows;
ASSERT_OK(tablet::IterateToStringList(row_iterator.get(), &rows));
ASSERT_EQ(1, rows.size());
ASSERT_STR_CONTAINS(rows[0], "val=9999");
}
} // namespace tablet
} // namespace kudu