blob: 04d629250753690891588ae1cb68e3ba17650c56 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tablet/delta_tracker.h"
#include <mutex>
#include <set>
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/strip.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tablet/delta_applier.h"
#include "kudu/tablet/delta_compaction.h"
#include "kudu/tablet/delta_iterator_merger.h"
#include "kudu/tablet/delta_store.h"
#include "kudu/tablet/deltafile.h"
#include "kudu/tablet/diskrowset.h"
#include "kudu/tablet/tablet.pb.h"
#include "kudu/util/status.h"
namespace kudu {
namespace tablet {
using fs::ReadableBlock;
using fs::WritableBlock;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using strings::Substitute;
DeltaTracker::DeltaTracker(shared_ptr<RowSetMetadata> rowset_metadata,
rowid_t num_rows,
log::LogAnchorRegistry* log_anchor_registry,
shared_ptr<MemTracker> parent_tracker)
: rowset_metadata_(std::move(rowset_metadata)),
num_rows_(num_rows),
open_(false),
log_anchor_registry_(log_anchor_registry),
parent_tracker_(std::move(parent_tracker)),
dms_empty_(true) {
}
Status DeltaTracker::OpenDeltaReaders(const vector<BlockId>& blocks,
vector<shared_ptr<DeltaStore> >* stores,
DeltaType type) {
FsManager* fs = rowset_metadata_->fs_manager();
for (const BlockId& block_id : blocks) {
gscoped_ptr<ReadableBlock> block;
Status s = fs->OpenBlock(block_id, &block);
if (!s.ok()) {
LOG(ERROR) << "Failed to open " << DeltaType_Name(type)
<< " delta file " << block_id.ToString() << ": "
<< s.ToString();
return s;
}
shared_ptr<DeltaFileReader> dfr;
s = DeltaFileReader::OpenNoInit(std::move(block), block_id, &dfr, type);
if (!s.ok()) {
LOG(ERROR) << "Failed to open " << DeltaType_Name(type)
<< " delta file reader " << block_id.ToString() << ": "
<< s.ToString();
return s;
}
VLOG(1) << "Successfully opened " << DeltaType_Name(type)
<< " delta file " << block_id.ToString();
stores->push_back(dfr);
}
return Status::OK();
}
// Open any previously flushed DeltaFiles in this rowset
Status DeltaTracker::Open() {
CHECK(redo_delta_stores_.empty()) << "should call before opening any readers";
CHECK(undo_delta_stores_.empty()) << "should call before opening any readers";
CHECK(!open_);
RETURN_NOT_OK(OpenDeltaReaders(rowset_metadata_->redo_delta_blocks(),
&redo_delta_stores_,
REDO));
RETURN_NOT_OK(OpenDeltaReaders(rowset_metadata_->undo_delta_blocks(),
&undo_delta_stores_,
UNDO));
// the id of the first DeltaMemStore is the max id of the current ones +1
dms_.reset(new DeltaMemStore(rowset_metadata_->last_durable_redo_dms_id() + 1,
rowset_metadata_->id(),
log_anchor_registry_,
parent_tracker_));
open_ = true;
return Status::OK();
}
Status DeltaTracker::MakeDeltaIteratorMergerUnlocked(size_t start_idx, size_t end_idx,
const Schema* projection,
vector<shared_ptr<DeltaStore> > *target_stores,
vector<BlockId> *target_blocks,
std::unique_ptr<DeltaIterator> *out) {
CHECK(open_);
CHECK_LE(start_idx, end_idx);
CHECK_LT(end_idx, redo_delta_stores_.size());
vector<shared_ptr<DeltaStore> > inputs;
for (size_t idx = start_idx; idx <= end_idx; ++idx) {
shared_ptr<DeltaStore> &delta_store = redo_delta_stores_[idx];
// In DEBUG mode, the following asserts that the object is of the right type
// (using RTTI)
ignore_result(down_cast<DeltaFileReader*>(delta_store.get()));
shared_ptr<DeltaFileReader> dfr = std::static_pointer_cast<DeltaFileReader>(delta_store);
LOG(INFO) << "Preparing to minor compact delta file: " << dfr->ToString();
inputs.push_back(delta_store);
target_stores->push_back(delta_store);
target_blocks->push_back(dfr->block_id());
}
RETURN_NOT_OK(DeltaIteratorMerger::Create(
inputs, projection,
MvccSnapshot::CreateSnapshotIncludingAllTransactions(), out));
return Status::OK();
}
namespace {
string JoinDeltaStoreStrings(const SharedDeltaStoreVector& stores) {
vector<string> strings;
for (const shared_ptr<DeltaStore>& store : stores) {
strings.push_back(store->ToString());
}
return ::JoinStrings(strings, ",");
}
} // anonymous namespace
Status DeltaTracker::AtomicUpdateStores(const SharedDeltaStoreVector& to_remove,
const vector<BlockId>& new_delta_blocks,
DeltaType type) {
SharedDeltaStoreVector new_stores;
RETURN_NOT_OK_PREPEND(OpenDeltaReaders(new_delta_blocks, &new_stores, type),
"Unable to open delta blocks");
std::lock_guard<rw_spinlock> lock(component_lock_);
SharedDeltaStoreVector* stores_to_update =
type == REDO ? &redo_delta_stores_ : &undo_delta_stores_;
SharedDeltaStoreVector::iterator start_it;
// TODO this is hacky, we do this because UNDOs don't currently get replaced and we need to
// front-load them. When we start GCing UNDO files (KUDU-236) we'll need to be able to atomically
// replace them too, and in their right order.
if (!to_remove.empty()) {
start_it =
std::find(stores_to_update->begin(), stores_to_update->end(), to_remove[0]);
auto end_it = start_it;
for (const shared_ptr<DeltaStore>& ds : to_remove) {
if (end_it == stores_to_update->end() || *end_it != ds) {
return Status::InvalidArgument(
strings::Substitute("Cannot find deltastore sequence <$0> in <$1>",
JoinDeltaStoreStrings(to_remove),
JoinDeltaStoreStrings(*stores_to_update)));
}
++end_it;
}
// Remove the old stores
stores_to_update->erase(start_it, end_it);
} else {
start_it = stores_to_update->begin();
}
// Insert the new store
stores_to_update->insert(start_it, new_stores.begin(), new_stores.end());
VLOG(1) << "New " << DeltaType_Name(type) << " stores: "
<< JoinDeltaStoreStrings(*stores_to_update);
return Status::OK();
}
Status DeltaTracker::Compact() {
return CompactStores(0, -1);
}
Status DeltaTracker::CompactStores(int start_idx, int end_idx) {
// Prevent concurrent compactions or a compaction concurrent with a flush
//
// TODO(perf): this could be more fine grained
std::lock_guard<Mutex> l(compact_flush_lock_);
// At the time of writing, minor delta compaction only compacts REDO delta
// files, so we need at least 2 REDO delta stores to proceed.
if (CountRedoDeltaStores() <= 1) {
return Status::OK();
}
if (end_idx == -1) {
end_idx = redo_delta_stores_.size() - 1;
}
CHECK_LE(start_idx, end_idx);
CHECK_LT(end_idx, redo_delta_stores_.size());
CHECK(open_);
// Open a writer for the new destination delta block
FsManager* fs = rowset_metadata_->fs_manager();
gscoped_ptr<WritableBlock> block;
RETURN_NOT_OK_PREPEND(fs->CreateNewBlock(&block),
"Could not allocate delta block");
BlockId new_block_id(block->id());
// Merge and compact the stores and write and output to "data_writer"
vector<shared_ptr<DeltaStore> > compacted_stores;
vector<BlockId> compacted_blocks;
RETURN_NOT_OK(DoCompactStores(start_idx, end_idx, std::move(block),
&compacted_stores, &compacted_blocks));
// Update delta_stores_, removing the compacted delta files and inserted the new
RETURN_NOT_OK(AtomicUpdateStores(compacted_stores, { new_block_id }, REDO));
LOG(INFO) << "Opened delta block for read: " << new_block_id.ToString();
// Update the metadata accordingly
RowSetMetadataUpdate update;
update.ReplaceRedoDeltaBlocks(compacted_blocks, { new_block_id });
// TODO: need to have some error handling here -- if we somehow can't persist the
// metadata, do we end up losing data on recovery?
CHECK_OK(rowset_metadata_->CommitUpdate(update));
Status s = rowset_metadata_->Flush();
if (!s.ok()) {
// TODO: again need to figure out some way of making this safe. Should we be
// writing the metadata _ahead_ of the actual store swap? Probably.
LOG(FATAL) << "Unable to commit delta data block metadata for "
<< new_block_id.ToString() << ": " << s.ToString();
return s;
}
return Status::OK();
}
Status DeltaTracker::DoCompactStores(size_t start_idx, size_t end_idx,
gscoped_ptr<WritableBlock> block,
vector<shared_ptr<DeltaStore> > *compacted_stores,
vector<BlockId> *compacted_blocks) {
unique_ptr<DeltaIterator> inputs_merge;
// Currently, DeltaFile iterators ignore the passed-in projection in
// FilterColumnIdsAndCollectDeltas(). So, we just pass an empty schema here.
// If this changes in the future, we'll have to pass in the current tablet
// schema here.
Schema empty_schema;
RETURN_NOT_OK(MakeDeltaIteratorMergerUnlocked(start_idx, end_idx, &empty_schema, compacted_stores,
compacted_blocks, &inputs_merge));
LOG(INFO) << "Compacting " << (end_idx - start_idx + 1) << " delta files.";
DeltaFileWriter dfw(std::move(block));
RETURN_NOT_OK(dfw.Start());
RETURN_NOT_OK(WriteDeltaIteratorToFile<REDO>(inputs_merge.get(),
ITERATE_OVER_ALL_ROWS,
&dfw));
RETURN_NOT_OK(dfw.Finish());
LOG(INFO) << "Succesfully compacted the specified delta files.";
return Status::OK();
}
void DeltaTracker::CollectStores(vector<shared_ptr<DeltaStore>>* deltas,
WhichStores which) const {
std::lock_guard<rw_spinlock> lock(component_lock_);
if (which != REDOS_ONLY) {
deltas->assign(undo_delta_stores_.begin(), undo_delta_stores_.end());
}
if (which != UNDOS_ONLY) {
deltas->insert(deltas->end(), redo_delta_stores_.begin(), redo_delta_stores_.end());
deltas->push_back(dms_);
}
}
Status DeltaTracker::NewDeltaIterator(const Schema* schema,
const MvccSnapshot& snap,
WhichStores which,
unique_ptr<DeltaIterator>* out) const {
std::vector<shared_ptr<DeltaStore> > stores;
CollectStores(&stores, which);
return DeltaIteratorMerger::Create(stores, schema, snap, out);
}
Status DeltaTracker::NewDeltaFileIterator(
const Schema* schema,
const MvccSnapshot& snap,
DeltaType type,
vector<shared_ptr<DeltaStore> >* included_stores,
unique_ptr<DeltaIterator>* out) const {
{
std::lock_guard<rw_spinlock> lock(component_lock_);
// TODO perf: is this really needed? Will check
// DeltaIteratorMerger::Create()
if (type == UNDO) {
*included_stores = undo_delta_stores_;
} else if (type == REDO) {
*included_stores = redo_delta_stores_;
} else {
LOG(FATAL);
}
}
// Verify that we're only merging files and not DeltaMemStores.
// TODO: we need to somehow ensure this doesn't fail - we have to somehow coordinate
// minor delta compaction against delta flush. Add a test case here to trigger this
// condition.
for (const shared_ptr<DeltaStore>& store : *included_stores) {
ignore_result(down_cast<DeltaFileReader*>(store.get()));
}
return DeltaIteratorMerger::Create(*included_stores, schema, snap, out);
}
Status DeltaTracker::WrapIterator(const shared_ptr<CFileSet::Iterator> &base,
const MvccSnapshot &mvcc_snap,
gscoped_ptr<ColumnwiseIterator>* out) const {
unique_ptr<DeltaIterator> iter;
RETURN_NOT_OK(NewDeltaIterator(&base->schema(), mvcc_snap, &iter));
out->reset(new DeltaApplier(base, std::move(iter)));
return Status::OK();
}
Status DeltaTracker::Update(Timestamp timestamp,
rowid_t row_idx,
const RowChangeList &update,
const consensus::OpId& op_id,
OperationResultPB* result) {
// TODO: can probably lock this more fine-grained.
shared_lock<rw_spinlock> lock(component_lock_);
DCHECK_LT(row_idx, num_rows_);
Status s = dms_->Update(timestamp, row_idx, update, op_id);
if (s.ok()) {
dms_empty_.Store(false);
MemStoreTargetPB* target = result->add_mutated_stores();
target->set_rs_id(rowset_metadata_->id());
target->set_dms_id(dms_->id());
}
return s;
}
Status DeltaTracker::CheckRowDeleted(rowid_t row_idx, bool *deleted,
ProbeStats* stats) const {
shared_lock<rw_spinlock> lock(component_lock_);
DCHECK_LT(row_idx, num_rows_);
*deleted = false;
// Check if the row has a deletion in DeltaMemStore.
RETURN_NOT_OK(dms_->CheckRowDeleted(row_idx, deleted));
if (*deleted) {
return Status::OK();
}
// Then check backwards through the list of trackers.
for (auto ds = redo_delta_stores_.crbegin(); ds != redo_delta_stores_.crend(); ds++) {
stats->deltas_consulted++;
RETURN_NOT_OK((*ds)->CheckRowDeleted(row_idx, deleted));
if (*deleted) {
return Status::OK();
}
}
return Status::OK();
}
Status DeltaTracker::FlushDMS(DeltaMemStore* dms,
shared_ptr<DeltaFileReader>* dfr,
MetadataFlushType flush_type) {
// Open file for write.
FsManager* fs = rowset_metadata_->fs_manager();
gscoped_ptr<WritableBlock> writable_block;
RETURN_NOT_OK_PREPEND(fs->CreateNewBlock(&writable_block),
"Unable to allocate new delta data writable_block");
BlockId block_id(writable_block->id());
DeltaFileWriter dfw(std::move(writable_block));
RETURN_NOT_OK_PREPEND(dfw.Start(),
Substitute("Unable to start writing to delta block $0",
block_id.ToString()));
gscoped_ptr<DeltaStats> stats;
RETURN_NOT_OK(dms->FlushToFile(&dfw, &stats));
RETURN_NOT_OK(dfw.Finish());
LOG(INFO) << "Flushed delta block: " << block_id.ToString()
<< " ts range: [" << stats->min_timestamp() << ", " << stats->max_timestamp() << "]";
// Now re-open for read
gscoped_ptr<ReadableBlock> readable_block;
RETURN_NOT_OK(fs->OpenBlock(block_id, &readable_block));
RETURN_NOT_OK(DeltaFileReader::OpenNoInit(std::move(readable_block), block_id, dfr, REDO));
LOG(INFO) << "Reopened delta block for read: " << block_id.ToString();
RETURN_NOT_OK(rowset_metadata_->CommitRedoDeltaDataBlock(dms->id(), block_id));
if (flush_type == FLUSH_METADATA) {
RETURN_NOT_OK_PREPEND(rowset_metadata_->Flush(),
Substitute("Unable to commit Delta block metadata for: $0",
block_id.ToString()));
}
return Status::OK();
}
Status DeltaTracker::Flush(MetadataFlushType flush_type) {
std::lock_guard<Mutex> l(compact_flush_lock_);
// First, swap out the old DeltaMemStore a new one,
// and add it to the list of delta stores to be reflected
// in reads.
shared_ptr<DeltaMemStore> old_dms;
size_t count;
{
// Lock the component_lock_ in exclusive mode.
// This shuts out any concurrent readers or writers.
std::lock_guard<rw_spinlock> lock(component_lock_);
count = dms_->Count();
// Swap the DeltaMemStore to use the new schema
old_dms = dms_;
dms_.reset(new DeltaMemStore(old_dms->id() + 1, rowset_metadata_->id(),
log_anchor_registry_, parent_tracker_));
dms_empty_.Store(true);
if (count == 0) {
// No need to flush if there are no deltas.
// Ensure that the DeltaMemStore is using the latest schema.
return Status::OK();
}
redo_delta_stores_.push_back(old_dms);
}
LOG(INFO) << "Flushing " << count << " deltas from DMS " << old_dms->id() << "...";
// Now, actually flush the contents of the old DMS.
// TODO: need another lock to prevent concurrent flushers
// at some point.
shared_ptr<DeltaFileReader> dfr;
Status s = FlushDMS(old_dms.get(), &dfr, flush_type);
CHECK(s.ok())
<< "Failed to flush DMS: " << s.ToString()
<< "\nTODO: need to figure out what to do with error handling "
<< "if this fails -- we end up with a DeltaMemStore permanently "
<< "in the store list. For now, abort.";
// Now, re-take the lock and swap in the DeltaFileReader in place of
// of the DeltaMemStore
{
std::lock_guard<rw_spinlock> lock(component_lock_);
size_t idx = redo_delta_stores_.size() - 1;
CHECK_EQ(redo_delta_stores_[idx], old_dms)
<< "Another thread modified the delta store list during flush";
redo_delta_stores_[idx] = dfr;
}
return Status::OK();
// TODO: wherever we write stuff, we should write to a tmp path
// and rename to final path!
}
size_t DeltaTracker::DeltaMemStoreSize() const {
shared_lock<rw_spinlock> lock(component_lock_);
return dms_->memory_footprint();
}
int64_t DeltaTracker::MinUnflushedLogIndex() const {
shared_lock<rw_spinlock> lock(component_lock_);
return dms_->MinLogIndex();
}
size_t DeltaTracker::CountRedoDeltaStores() const {
shared_lock<rw_spinlock> lock(component_lock_);
return redo_delta_stores_.size();
}
uint64_t DeltaTracker::EstimateOnDiskSize() const {
shared_lock<rw_spinlock> lock(component_lock_);
uint64_t size = 0;
for (const shared_ptr<DeltaStore>& ds : redo_delta_stores_) {
size += ds->EstimateSize();
}
return size;
}
void DeltaTracker::GetColumnIdsWithUpdates(std::vector<ColumnId>* col_ids) const {
shared_lock<rw_spinlock> lock(component_lock_);
set<ColumnId> column_ids_with_updates;
for (const shared_ptr<DeltaStore>& ds : redo_delta_stores_) {
// We won't force open files just to read their stats.
if (!ds->Initted()) {
continue;
}
ds->delta_stats().AddColumnIdsWithUpdates(&column_ids_with_updates);
}
col_ids->assign(column_ids_with_updates.begin(), column_ids_with_updates.end());
}
} // namespace tablet
} // namespace kudu