blob: 11b70989c66e65d7478641709c831942a46849bb [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tablet/rowset.h"
#include <limits>
#include <memory>
#include <string>
#include <type_traits>
#include <utility>
#include <vector>
#include "kudu/common/generic_iterators.h"
#include "kudu/common/iterator.h"
#include "kudu/common/schema.h"
#include "kudu/common/timestamp.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tablet/rowset_metadata.h"
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::vector;
using strings::Substitute;
namespace kudu {
using fs::IOContext;
namespace tablet {
RowIteratorOptions::RowIteratorOptions()
: projection(nullptr),
snap_to_include(MvccSnapshot::CreateSnapshotIncludingAllOps()),
order(OrderMode::UNORDERED),
include_deleted_rows(false) {}
Status RowSet::NewRowIteratorWithBounds(const RowIteratorOptions& opts,
IterWithBounds* out) const {
// Get the iterator.
unique_ptr<RowwiseIterator> iter;
RETURN_NOT_OK(NewRowIterator(opts, &iter));
// Get the bounds. Some rowsets (e.g. MRS) have no bounds; that's OK as the
// bounds aren't required.
string lower;
string upper;
Status s = GetBounds(&lower, &upper);
if (s.ok()) {
out->encoded_bounds = std::make_pair(std::move(lower), std::move(upper));
} else if (!s.IsNotSupported()) {
RETURN_NOT_OK(s);
}
out->iter = std::move(iter);
return Status::OK();
}
DuplicatingRowSet::DuplicatingRowSet(RowSetVector old_rowsets,
RowSetVector new_rowsets)
: old_rowsets_(std::move(old_rowsets)),
new_rowsets_(std::move(new_rowsets)) {
CHECK_GT(old_rowsets_.size(), 0);
CHECK_GT(new_rowsets_.size(), 0);
}
DuplicatingRowSet::~DuplicatingRowSet() {
}
// Stringify the given list of rowsets into 'dst'.
static void AppendRowSetStrings(const RowSetVector &rowsets, string *dst) {
bool first = true;
dst->append("[");
for (const shared_ptr<RowSet> &rs : rowsets) {
if (!first) {
dst->append(", ");
}
first = false;
dst->append(rs->ToString());
}
dst->append("]");
}
string DuplicatingRowSet::ToString() const {
string ret;
ret.append("DuplicatingRowSet(");
AppendRowSetStrings(old_rowsets_, &ret);
ret.append(" -> ");
AppendRowSetStrings(new_rowsets_, &ret);
ret.append(")");
return ret;
}
Status DuplicatingRowSet::NewRowIterator(const RowIteratorOptions& opts,
unique_ptr<RowwiseIterator>* out) const {
// Use the original rowset.
if (old_rowsets_.size() == 1) {
return old_rowsets_[0]->NewRowIterator(opts, out);
}
// Union or merge between them
vector<IterWithBounds> iters;
for (const auto& rs : old_rowsets_) {
IterWithBounds iwb;
RETURN_NOT_OK_PREPEND(rs->NewRowIteratorWithBounds(opts, &iwb),
Substitute("Could not create iterator for rowset $0",
rs->ToString()));
iters.emplace_back(std::move(iwb));
}
switch (opts.order) {
case ORDERED:
*out = NewMergeIterator(MergeIteratorOptions(opts.include_deleted_rows), std::move(iters));
break;
case UNORDERED:
*out = NewUnionIterator(std::move(iters));
break;
default:
LOG(FATAL) << "unknown order: " << opts.order;
}
return Status::OK();
}
Status DuplicatingRowSet::NewCompactionInput(const Schema* /*projection*/,
const MvccSnapshot& /*snap*/,
const IOContext* /*io_context*/,
unique_ptr<CompactionInput>* /*out*/) const {
LOG(FATAL) << "duplicating rowsets do not act as compaction input";
return Status::OK();
}
Status DuplicatingRowSet::MutateRow(Timestamp timestamp,
const RowSetKeyProbe &probe,
const RowChangeList &update,
const consensus::OpId& op_id,
const IOContext* io_context,
ProbeStats* stats,
OperationResultPB* result) {
// Duplicate the update to both the relevant input rowset and the output rowset.
//
// It's crucial to do the mutation against the input side first, due to the potential
// for a race during flush: the output rowset may not yet hold a DELETE which
// is present in the input rowset. In that case, the UPDATE against the output rowset would
// succeed whereas it can't be applied to the input rowset. So, we update the input rowset first,
// and if it succeeds, propagate to the output.
// First mutate the relevant input rowset.
bool updated = false;
for (const shared_ptr<RowSet> &rowset : old_rowsets_) {
Status s = rowset->MutateRow(timestamp, probe, update, op_id, io_context, stats, result);
if (s.ok()) {
updated = true;
break;
} else if (!s.IsNotFound()) {
LOG(ERROR) << "Unable to update key "
<< probe.schema()->CreateKeyProjection().DebugRow(probe.row_key())
<< " (failed on rowset " << rowset->ToString() << "): "
<< s.ToString();
return s;
}
}
if (!updated) {
return Status::NotFound("not found in any compaction input");
}
// If it succeeded there, we also need to mirror into the new rowset.
int mirrored_count = 0;
for (const shared_ptr<RowSet> &new_rowset : new_rowsets_) {
Status s = new_rowset->MutateRow(timestamp, probe, update, op_id, io_context, stats, result);
if (s.ok()) {
mirrored_count++;
#ifdef NDEBUG
// In non-DEBUG builds, we can break as soon as we find the correct
// rowset to mirror to. In a DEBUG build, though, we keep looking
// through all, and make sure that we only update in one of them.
break;
#endif
} else if (!s.IsNotFound()) {
RETURN_NOT_OK_PREPEND(s, Substitute("Unable to mirror update to rowset $0 for key: $1",
new_rowset->ToString(), probe.schema()->CreateKeyProjection().DebugRow(probe.row_key())));
}
// IsNotFound is OK - it might be in a different one.
}
CHECK_EQ(mirrored_count, 1)
<< "Updated row in compaction input, but didn't mirror in exactly 1 new rowset: "
<< probe.schema()->CreateKeyProjection().DebugRow(probe.row_key());
return Status::OK();
}
Status DuplicatingRowSet::CheckRowPresent(const RowSetKeyProbe &probe, const IOContext* io_context,
bool *present, ProbeStats* stats) const {
*present = false;
for (const shared_ptr<RowSet> &rowset : old_rowsets_) {
RETURN_NOT_OK(rowset->CheckRowPresent(probe, io_context, present, stats));
if (*present) {
return Status::OK();
}
}
return Status::OK();
}
Status DuplicatingRowSet::CountRows(const IOContext* io_context, rowid_t *count) const {
int64_t accumulated_count = 0;
for (const shared_ptr<RowSet> &rs : new_rowsets_) {
rowid_t this_count;
RETURN_NOT_OK(rs->CountRows(io_context, &this_count));
accumulated_count += this_count;
}
CHECK_LT(accumulated_count, std::numeric_limits<rowid_t>::max())
<< "TODO: should make sure this is 64-bit safe - probably not right now"
<< " because rowid_t is only 32-bit.";
*count = accumulated_count;
return Status::OK();
}
Status DuplicatingRowSet::CountLiveRows(uint64_t* count) const {
for (const shared_ptr<RowSet>& rs : old_rowsets_) {
uint64_t tmp = 0;
RETURN_NOT_OK(rs->CountLiveRows(&tmp));
*count += tmp;
}
return Status::OK();
}
Status DuplicatingRowSet::GetBounds(string* min_encoded_key,
string* max_encoded_key) const {
// The range out of the output rowset always spans the full range
// of the input rowsets, since no new rows can be inserted.
// The output rowsets are in ascending order, so their total range
// spans the range [front().min, back().max].
string junk;
RETURN_NOT_OK(new_rowsets_.front()->GetBounds(min_encoded_key, &junk));
RETURN_NOT_OK(new_rowsets_.back()->GetBounds(&junk, max_encoded_key));
return Status::OK();
}
uint64_t DuplicatingRowSet::OnDiskSize() const {
uint64_t size = 0;
for (const shared_ptr<RowSet> &rs : new_rowsets_) {
size += rs->OnDiskSize();
}
return size;
}
uint64_t DuplicatingRowSet::OnDiskBaseDataSize() const {
uint64_t size = 0;
for (const shared_ptr<RowSet> &rs : new_rowsets_) {
size += rs->OnDiskBaseDataSize();
}
return size;
}
uint64_t DuplicatingRowSet::OnDiskBaseDataColumnSize(const ColumnId& col_id) const {
uint64_t size = 0;
for (const shared_ptr<RowSet> &rs : new_rowsets_) {
size += rs->OnDiskBaseDataColumnSize(col_id);
}
return size;
}
uint64_t DuplicatingRowSet::OnDiskBaseDataSizeWithRedos() const {
// The actual value of this doesn't matter, since it won't be selected
// for compaction.
uint64_t size = 0;
for (const shared_ptr<RowSet> &rs : new_rowsets_) {
size += rs->OnDiskBaseDataSizeWithRedos();
}
return size;
}
shared_ptr<RowSetMetadata> DuplicatingRowSet::metadata() {
return shared_ptr<RowSetMetadata>(reinterpret_cast<RowSetMetadata *>(NULL));
}
Status DuplicatingRowSet::DebugDump(vector<string> *lines) { // NOLINT(*)
int i = 1;
for (const shared_ptr<RowSet> &rs : old_rowsets_) {
LOG_STRING(INFO, lines) << "Duplicating rowset input " << ToString() << " "
<< i << "/" << old_rowsets_.size() << ":";
RETURN_NOT_OK(rs->DebugDump(lines));
i++;
}
i = 1;
for (const shared_ptr<RowSet> &rs : new_rowsets_) {
LOG_STRING(INFO, lines) << "Duplicating rowset output " << ToString() << " "
<< i << "/" << new_rowsets_.size() << ":";
RETURN_NOT_OK(rs->DebugDump(lines));
i++;
}
return Status::OK();
}
} // namespace tablet
} // namespace kudu