| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/tablet/rowset.h" |
| |
| #include <limits> |
| #include <memory> |
| #include <string> |
| #include <type_traits> |
| #include <utility> |
| #include <vector> |
| |
| #include "kudu/common/generic_iterators.h" |
| #include "kudu/common/iterator.h" |
| #include "kudu/common/schema.h" |
| #include "kudu/common/timestamp.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/tablet/rowset_metadata.h" |
| |
| using std::shared_ptr; |
| using std::string; |
| using std::unique_ptr; |
| using std::vector; |
| using strings::Substitute; |
| |
| namespace kudu { |
| |
| using fs::IOContext; |
| |
| namespace tablet { |
| |
| RowIteratorOptions::RowIteratorOptions() |
| : projection(nullptr), |
| snap_to_include(MvccSnapshot::CreateSnapshotIncludingAllOps()), |
| order(OrderMode::UNORDERED), |
| include_deleted_rows(false) {} |
| |
| Status RowSet::NewRowIteratorWithBounds(const RowIteratorOptions& opts, |
| IterWithBounds* out) const { |
| // Get the iterator. |
| unique_ptr<RowwiseIterator> iter; |
| RETURN_NOT_OK(NewRowIterator(opts, &iter)); |
| |
| // Get the bounds. Some rowsets (e.g. MRS) have no bounds; that's OK as the |
| // bounds aren't required. |
| string lower; |
| string upper; |
| Status s = GetBounds(&lower, &upper); |
| if (s.ok()) { |
| out->encoded_bounds = std::make_pair(std::move(lower), std::move(upper)); |
| } else if (!s.IsNotSupported()) { |
| RETURN_NOT_OK(s); |
| } |
| out->iter = std::move(iter); |
| return Status::OK(); |
| } |
| |
| DuplicatingRowSet::DuplicatingRowSet(RowSetVector old_rowsets, |
| RowSetVector new_rowsets) |
| : old_rowsets_(std::move(old_rowsets)), |
| new_rowsets_(std::move(new_rowsets)) { |
| CHECK_GT(old_rowsets_.size(), 0); |
| CHECK_GT(new_rowsets_.size(), 0); |
| } |
| |
| DuplicatingRowSet::~DuplicatingRowSet() { |
| } |
| |
| // Stringify the given list of rowsets into 'dst'. |
| static void AppendRowSetStrings(const RowSetVector &rowsets, string *dst) { |
| bool first = true; |
| dst->append("["); |
| for (const shared_ptr<RowSet> &rs : rowsets) { |
| if (!first) { |
| dst->append(", "); |
| } |
| first = false; |
| dst->append(rs->ToString()); |
| } |
| dst->append("]"); |
| } |
| |
| string DuplicatingRowSet::ToString() const { |
| string ret; |
| ret.append("DuplicatingRowSet("); |
| |
| AppendRowSetStrings(old_rowsets_, &ret); |
| ret.append(" -> "); |
| AppendRowSetStrings(new_rowsets_, &ret); |
| ret.append(")"); |
| return ret; |
| } |
| |
| Status DuplicatingRowSet::NewRowIterator(const RowIteratorOptions& opts, |
| unique_ptr<RowwiseIterator>* out) const { |
| // Use the original rowset. |
| if (old_rowsets_.size() == 1) { |
| return old_rowsets_[0]->NewRowIterator(opts, out); |
| } |
| // Union or merge between them |
| |
| vector<IterWithBounds> iters; |
| for (const auto& rs : old_rowsets_) { |
| IterWithBounds iwb; |
| RETURN_NOT_OK_PREPEND(rs->NewRowIteratorWithBounds(opts, &iwb), |
| Substitute("Could not create iterator for rowset $0", |
| rs->ToString())); |
| iters.emplace_back(std::move(iwb)); |
| } |
| |
| switch (opts.order) { |
| case ORDERED: |
| *out = NewMergeIterator(MergeIteratorOptions(opts.include_deleted_rows), std::move(iters)); |
| break; |
| case UNORDERED: |
| *out = NewUnionIterator(std::move(iters)); |
| break; |
| default: |
| LOG(FATAL) << "unknown order: " << opts.order; |
| } |
| return Status::OK(); |
| } |
| |
| Status DuplicatingRowSet::NewCompactionInput(const Schema* /*projection*/, |
| const MvccSnapshot& /*snap*/, |
| const IOContext* /*io_context*/, |
| unique_ptr<CompactionInput>* /*out*/) const { |
| LOG(FATAL) << "duplicating rowsets do not act as compaction input"; |
| return Status::OK(); |
| } |
| |
| |
| Status DuplicatingRowSet::MutateRow(Timestamp timestamp, |
| const RowSetKeyProbe &probe, |
| const RowChangeList &update, |
| const consensus::OpId& op_id, |
| const IOContext* io_context, |
| ProbeStats* stats, |
| OperationResultPB* result) { |
| // Duplicate the update to both the relevant input rowset and the output rowset. |
| // |
| // It's crucial to do the mutation against the input side first, due to the potential |
| // for a race during flush: the output rowset may not yet hold a DELETE which |
| // is present in the input rowset. In that case, the UPDATE against the output rowset would |
| // succeed whereas it can't be applied to the input rowset. So, we update the input rowset first, |
| // and if it succeeds, propagate to the output. |
| |
| // First mutate the relevant input rowset. |
| bool updated = false; |
| for (const shared_ptr<RowSet> &rowset : old_rowsets_) { |
| Status s = rowset->MutateRow(timestamp, probe, update, op_id, io_context, stats, result); |
| if (s.ok()) { |
| updated = true; |
| break; |
| } else if (!s.IsNotFound()) { |
| LOG(ERROR) << "Unable to update key " |
| << probe.schema()->CreateKeyProjection().DebugRow(probe.row_key()) |
| << " (failed on rowset " << rowset->ToString() << "): " |
| << s.ToString(); |
| return s; |
| } |
| } |
| |
| if (!updated) { |
| return Status::NotFound("not found in any compaction input"); |
| } |
| |
| // If it succeeded there, we also need to mirror into the new rowset. |
| int mirrored_count = 0; |
| for (const shared_ptr<RowSet> &new_rowset : new_rowsets_) { |
| Status s = new_rowset->MutateRow(timestamp, probe, update, op_id, io_context, stats, result); |
| if (s.ok()) { |
| mirrored_count++; |
| #ifdef NDEBUG |
| // In non-DEBUG builds, we can break as soon as we find the correct |
| // rowset to mirror to. In a DEBUG build, though, we keep looking |
| // through all, and make sure that we only update in one of them. |
| break; |
| #endif |
| } else if (!s.IsNotFound()) { |
| RETURN_NOT_OK_PREPEND(s, Substitute("Unable to mirror update to rowset $0 for key: $1", |
| new_rowset->ToString(), probe.schema()->CreateKeyProjection().DebugRow(probe.row_key()))); |
| } |
| // IsNotFound is OK - it might be in a different one. |
| } |
| CHECK_EQ(mirrored_count, 1) |
| << "Updated row in compaction input, but didn't mirror in exactly 1 new rowset: " |
| << probe.schema()->CreateKeyProjection().DebugRow(probe.row_key()); |
| return Status::OK(); |
| } |
| |
| Status DuplicatingRowSet::CheckRowPresent(const RowSetKeyProbe &probe, const IOContext* io_context, |
| bool *present, ProbeStats* stats) const { |
| *present = false; |
| for (const shared_ptr<RowSet> &rowset : old_rowsets_) { |
| RETURN_NOT_OK(rowset->CheckRowPresent(probe, io_context, present, stats)); |
| if (*present) { |
| return Status::OK(); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status DuplicatingRowSet::CountRows(const IOContext* io_context, rowid_t *count) const { |
| int64_t accumulated_count = 0; |
| for (const shared_ptr<RowSet> &rs : new_rowsets_) { |
| rowid_t this_count; |
| RETURN_NOT_OK(rs->CountRows(io_context, &this_count)); |
| accumulated_count += this_count; |
| } |
| |
| CHECK_LT(accumulated_count, std::numeric_limits<rowid_t>::max()) |
| << "TODO: should make sure this is 64-bit safe - probably not right now" |
| << " because rowid_t is only 32-bit."; |
| *count = accumulated_count; |
| return Status::OK(); |
| } |
| |
| Status DuplicatingRowSet::CountLiveRows(uint64_t* count) const { |
| for (const shared_ptr<RowSet>& rs : old_rowsets_) { |
| uint64_t tmp = 0; |
| RETURN_NOT_OK(rs->CountLiveRows(&tmp)); |
| *count += tmp; |
| } |
| return Status::OK(); |
| } |
| |
| Status DuplicatingRowSet::GetBounds(string* min_encoded_key, |
| string* max_encoded_key) const { |
| // The range out of the output rowset always spans the full range |
| // of the input rowsets, since no new rows can be inserted. |
| // The output rowsets are in ascending order, so their total range |
| // spans the range [front().min, back().max]. |
| string junk; |
| RETURN_NOT_OK(new_rowsets_.front()->GetBounds(min_encoded_key, &junk)); |
| RETURN_NOT_OK(new_rowsets_.back()->GetBounds(&junk, max_encoded_key)); |
| return Status::OK(); |
| } |
| |
| uint64_t DuplicatingRowSet::OnDiskSize() const { |
| uint64_t size = 0; |
| for (const shared_ptr<RowSet> &rs : new_rowsets_) { |
| size += rs->OnDiskSize(); |
| } |
| return size; |
| } |
| |
| uint64_t DuplicatingRowSet::OnDiskBaseDataSize() const { |
| uint64_t size = 0; |
| for (const shared_ptr<RowSet> &rs : new_rowsets_) { |
| size += rs->OnDiskBaseDataSize(); |
| } |
| return size; |
| } |
| |
| uint64_t DuplicatingRowSet::OnDiskBaseDataColumnSize(const ColumnId& col_id) const { |
| uint64_t size = 0; |
| for (const shared_ptr<RowSet> &rs : new_rowsets_) { |
| size += rs->OnDiskBaseDataColumnSize(col_id); |
| } |
| return size; |
| } |
| |
| uint64_t DuplicatingRowSet::OnDiskBaseDataSizeWithRedos() const { |
| // The actual value of this doesn't matter, since it won't be selected |
| // for compaction. |
| uint64_t size = 0; |
| for (const shared_ptr<RowSet> &rs : new_rowsets_) { |
| size += rs->OnDiskBaseDataSizeWithRedos(); |
| } |
| return size; |
| } |
| |
| shared_ptr<RowSetMetadata> DuplicatingRowSet::metadata() { |
| return shared_ptr<RowSetMetadata>(reinterpret_cast<RowSetMetadata *>(NULL)); |
| } |
| |
| Status DuplicatingRowSet::DebugDump(vector<string> *lines) { // NOLINT(*) |
| int i = 1; |
| for (const shared_ptr<RowSet> &rs : old_rowsets_) { |
| LOG_STRING(INFO, lines) << "Duplicating rowset input " << ToString() << " " |
| << i << "/" << old_rowsets_.size() << ":"; |
| RETURN_NOT_OK(rs->DebugDump(lines)); |
| i++; |
| } |
| i = 1; |
| for (const shared_ptr<RowSet> &rs : new_rowsets_) { |
| LOG_STRING(INFO, lines) << "Duplicating rowset output " << ToString() << " " |
| << i << "/" << new_rowsets_.size() << ":"; |
| RETURN_NOT_OK(rs->DebugDump(lines)); |
| i++; |
| } |
| |
| return Status::OK(); |
| } |
| |
| } // namespace tablet |
| } // namespace kudu |