[compaction] style guide-related updates
I was debugging one compaction issue and found that compaction-related
code in compaction-test.cc isn't robust enough. This patch just updates
the code in compaction.{h,cc} and compaction-test.cc to conform with the
project's style guidelines. A follow-up patch is to make the code
in compaction-test.cc more robust.
There are no functional changes in this patch.
Change-Id: I4ef1981dad33a6bc2d0911630fd73c95b91e45fe
Reviewed-on: http://gerrit.cloudera.org:8080/18974
Tested-by: Kudu Jenkins
Reviewed-by: Yifan Zhang <chinazhangyifan@163.com>
diff --git a/src/kudu/tablet/compaction-test.cc b/src/kudu/tablet/compaction-test.cc
index 654b95c..e33f841 100644
--- a/src/kudu/tablet/compaction-test.cc
+++ b/src/kudu/tablet/compaction-test.cc
@@ -27,6 +27,7 @@
#include <random>
#include <string>
#include <thread>
+#include <type_traits>
#include <unordered_set>
#include <vector>
@@ -61,7 +62,6 @@
#include "kudu/tablet/mutation.h"
#include "kudu/tablet/mvcc.h"
#include "kudu/tablet/rowset.h"
-#include "kudu/tablet/rowset_metadata.h"
#include "kudu/tablet/tablet-harness.h"
#include "kudu/tablet/tablet-test-util.h"
#include "kudu/tablet/tablet.h"
@@ -92,23 +92,24 @@
DECLARE_string(block_manager);
+using kudu::consensus::OpId;
+using kudu::log::LogAnchorRegistry;
using std::shared_ptr;
using std::string;
using std::thread;
using std::unique_ptr;
using std::unordered_set;
using std::vector;
+using strings::Substitute;
namespace kudu {
namespace tablet {
-using consensus::OpId;
-using log::LogAnchorRegistry;
-using strings::Substitute;
+class RowSetMetadata;
-static const char *kRowKeyFormat = "hello %08d";
-static const size_t kLargeRollThreshold = 1024 * 1024 * 1024; // 1GB
-static const size_t kSmallRollThreshold = 1024; // 1KB
+constexpr const char* const kRowKeyFormat = "hello %08d";
+constexpr const size_t kLargeRollThreshold = 1024 * 1024 * 1024; // 1GB
+constexpr const size_t kSmallRollThreshold = 1024; // 1KB
class TestCompaction : public KuduRowSetTest {
public:
@@ -131,7 +132,7 @@
// Insert n_rows rows of data.
// Each row is the tuple: (string key=hello <n*10 + delta>, val=<n>)
- void InsertRows(MemRowSet *mrs, int n_rows, int delta) {
+ void InsertRows(MemRowSet* mrs, int n_rows, int delta) {
for (int32_t i = 0; i < n_rows; i++) {
InsertRow(mrs, i * 10 + delta, i);
}
@@ -140,7 +141,7 @@
// Inserts a row.
// The 'nullable_val' column is set to either NULL (when val is odd)
// or 'val' (when val is even).
- void InsertRow(MemRowSet *mrs, int row_key, int32_t val) {
+ void InsertRow(MemRowSet* mrs, int row_key, int32_t val) {
ScopedOp op(&mvcc_, clock_.Now());
op.StartApplying();
InsertRowInOp(mrs, op, row_key, val);
@@ -179,7 +180,7 @@
}
}
- void InsertRowInOp(MemRowSet *mrs,
+ void InsertRowInOp(MemRowSet* mrs,
const ScopedOp& op,
int row_key,
int32_t val) {
@@ -204,21 +205,21 @@
// If 'val' is even, 'nullable_val' is set to NULL. Otherwise, set to 'val'.
// Note that this is the opposite of InsertRow() above, so that the updates
// flop NULL to non-NULL and vice versa.
- void UpdateRows(RowSet *rowset, int n_rows, int delta, int32_t new_val) {
+ void UpdateRows(RowSet* rowset, int n_rows, int delta, int32_t new_val) {
for (uint32_t i = 0; i < n_rows; i++) {
SCOPED_TRACE(i);
UpdateRow(rowset, i * 10 + delta, new_val);
}
}
- void UpdateRow(RowSet *rowset, int row_key, int32_t new_val) {
+ void UpdateRow(RowSet* rowset, int row_key, int32_t new_val) {
ScopedOp op(&mvcc_, clock_.Now());
op.StartApplying();
UpdateRowInOp(rowset, op, row_key, new_val);
op.FinishApplying();
}
- void UpdateRowInOp(RowSet *rowset,
+ void UpdateRowInOp(RowSet* rowset,
const ScopedOp& op,
int row_key,
int32_t new_val) {
@@ -274,7 +275,7 @@
op.FinishApplying();
}
- void DeleteRowInOp(RowSet *rowset, const ScopedOp& op, int row_key) {
+ void DeleteRowInOp(RowSet* rowset, const ScopedOp& op, int row_key) {
char keybuf[256];
faststring update_buf;
snprintf(keybuf, sizeof(keybuf), kRowKeyFormat, row_key);
@@ -300,7 +301,7 @@
// Iterate over the given compaction input, stringifying and dumping each
// yielded row to *out
- void IterateInput(CompactionInput *input, vector<string> *out) {
+ void IterateInput(CompactionInput* input, vector<string>* out) {
ASSERT_OK(DebugDumpCompactionInput(input, out));
}
@@ -308,8 +309,8 @@
// If 'result_rowsets' is not NULL, reopens the resulting rowset(s) and appends
// them to the vector.
void DoFlushAndReopen(
- CompactionInput *input, const Schema& projection, const MvccSnapshot &snap,
- int64_t roll_threshold, vector<shared_ptr<DiskRowSet> >* result_rowsets) {
+ CompactionInput* input, const Schema& projection, const MvccSnapshot& snap,
+ int64_t roll_threshold, vector<shared_ptr<DiskRowSet>>* result_rowsets) {
// Flush with a large roll threshold so we only write a single file.
// This simplifies the test so we always need to reopen only a single rowset.
RollingDiskRowSetWriter rsw(tablet()->metadata(), projection,
@@ -321,14 +322,14 @@
input, snap, HistoryGcOpts::Disabled(), &rsw));
ASSERT_OK(rsw.Finish());
- vector<shared_ptr<RowSetMetadata> > metas;
+ vector<shared_ptr<RowSetMetadata>> metas;
rsw.GetWrittenRowSetMetadata(&metas);
- for (const shared_ptr<RowSetMetadata>& meta : metas) {
+ for (const auto& meta : metas) {
ASSERT_TRUE(meta->HasBloomDataBlockForTests());
}
if (result_rowsets) {
// Re-open the outputs
- for (const shared_ptr<RowSetMetadata>& meta : metas) {
+ for (const auto& meta : metas) {
shared_ptr<DiskRowSet> rs;
ASSERT_OK(DiskRowSet::Open(meta, log_anchor_registry_.get(),
mem_trackers_, nullptr, &rs));
@@ -338,11 +339,11 @@
}
static Status BuildCompactionInput(const MvccSnapshot& merge_snap,
- const vector<shared_ptr<DiskRowSet> >& rowsets,
+ const vector<shared_ptr<DiskRowSet>>& rowsets,
const Schema& projection,
unique_ptr<CompactionInput>* out) {
- vector<shared_ptr<CompactionInput> > merge_inputs;
- for (const shared_ptr<DiskRowSet> &rs : rowsets) {
+ vector<shared_ptr<CompactionInput>> merge_inputs;
+ for (const auto& rs : rowsets) {
unique_ptr<CompactionInput> input;
RETURN_NOT_OK(CompactionInput::Create(*rs, &projection, merge_snap, nullptr, &input));
merge_inputs.push_back(shared_ptr<CompactionInput>(input.release()));
@@ -354,9 +355,9 @@
// Compacts a set of DRSs.
// If 'result_rowsets' is not NULL, reopens the resulting rowset(s) and appends
// them to the vector.
- Status CompactAndReopen(const vector<shared_ptr<DiskRowSet> >& rowsets,
+ Status CompactAndReopen(const vector<shared_ptr<DiskRowSet>>& rowsets,
const Schema& projection, int64_t roll_threshold,
- vector<shared_ptr<DiskRowSet> >* result_rowsets) {
+ vector<shared_ptr<DiskRowSet>>* result_rowsets) {
MvccSnapshot merge_snap(mvcc_);
unique_ptr<CompactionInput> compact_input;
RETURN_NOT_OK(BuildCompactionInput(merge_snap, rowsets, projection, &compact_input));
@@ -366,10 +367,10 @@
}
// Same as above, but sets a high roll threshold so it only produces a single output.
- void CompactAndReopenNoRoll(const vector<shared_ptr<DiskRowSet> >& input_rowsets,
+ void CompactAndReopenNoRoll(const vector<shared_ptr<DiskRowSet>>& input_rowsets,
const Schema& projection,
shared_ptr<DiskRowSet>* result_rs) {
- vector<shared_ptr<DiskRowSet> > result_rowsets;
+ vector<shared_ptr<DiskRowSet>> result_rowsets;
CompactAndReopen(input_rowsets, projection, kLargeRollThreshold, &result_rowsets);
ASSERT_EQ(1, result_rowsets.size());
*result_rs = result_rowsets[0];
@@ -380,9 +381,9 @@
// them to the vector.
void FlushMRSAndReopen(const MemRowSet& mrs, const Schema& projection,
int64_t roll_threshold,
- vector<shared_ptr<DiskRowSet> >* result_rowsets) {
+ vector<shared_ptr<DiskRowSet>>* result_rowsets) {
MvccSnapshot snap(mvcc_);
- vector<shared_ptr<RowSetMetadata> > rowset_metas;
+ vector<shared_ptr<RowSetMetadata>> rowset_metas;
unique_ptr<CompactionInput> input(CompactionInput::Create(mrs, &projection, snap));
DoFlushAndReopen(input.get(), projection, snap, roll_threshold, result_rowsets);
}
@@ -390,7 +391,7 @@
// Same as above, but sets a high roll threshold so it only produces a single output.
void FlushMRSAndReopenNoRoll(const MemRowSet& mrs, const Schema& projection,
shared_ptr<DiskRowSet>* result_rs) {
- vector<shared_ptr<DiskRowSet> > rowsets;
+ vector<shared_ptr<DiskRowSet>> rowsets;
FlushMRSAndReopen(mrs, projection, kLargeRollThreshold, &rowsets);
ASSERT_EQ(1, rowsets.size());
*result_rs = rowsets[0];
@@ -421,11 +422,11 @@
// each of the input schemas. The output rowset will
// have the 'projection' schema.
void DoMerge(const Schema& projection, const vector<Schema>& schemas) {
- vector<shared_ptr<DiskRowSet> > rowsets;
+ vector<shared_ptr<DiskRowSet>> rowsets;
// Create one input rowset for each of the input schemas
int delta = 0;
- for (const Schema& schema : schemas) {
+ for (const auto& schema : schemas) {
// Create a memrowset with a bunch of rows and updates.
shared_ptr<MemRowSet> mrs;
CHECK_OK(MemRowSet::Create(delta, schema, log_anchor_registry_.get(),
@@ -457,7 +458,7 @@
template<bool OVERLAP_INPUTS>
void DoBenchmark() {
- vector<shared_ptr<DiskRowSet> > rowsets;
+ vector<shared_ptr<DiskRowSet>> rowsets;
if (FLAGS_merge_benchmark_input_dir.empty()) {
// Create inputs.
@@ -494,7 +495,7 @@
scoped_refptr<TabletMetadata> input_meta;
ASSERT_OK(TabletMetadata::Load(&fs_manager, tablet_id, &input_meta));
- for (const shared_ptr<RowSetMetadata>& meta : input_meta->rowsets()) {
+ for (const auto& meta : input_meta->rowsets()) {
shared_ptr<DiskRowSet> rs;
CHECK_OK(DiskRowSet::Open(meta, log_anchor_registry_.get(),
mem_trackers_, nullptr, &rs));
@@ -576,7 +577,7 @@
mem_trackers_.tablet_tracker, &mrs));
InsertRows(mrs.get(), 30000, 0);
- vector<shared_ptr<DiskRowSet> > rowsets;
+ vector<shared_ptr<DiskRowSet>> rowsets;
FlushMRSAndReopen(*mrs, schema_, kSmallRollThreshold, &rowsets);
ASSERT_GT(rowsets.size(), 1);
@@ -676,7 +677,7 @@
}
shared_ptr<DiskRowSet> result;
- vector<shared_ptr<DiskRowSet> > all_rss;
+ vector<shared_ptr<DiskRowSet>> all_rss;
all_rss.push_back(rs3);
all_rss.push_back(rs1);
all_rss.push_back(rs2);
@@ -857,7 +858,7 @@
}
vector<shared_ptr<CompactionInput>> inputs;
- for (auto& row_set : row_sets) {
+ for (const auto& row_set : row_sets) {
unique_ptr<CompactionInput> ci;
CHECK_OK(row_set->NewCompactionInput(&schema_, all_snap, nullptr, &ci));
inputs.push_back(shared_ptr<CompactionInput>(ci.release()));
@@ -1030,7 +1031,7 @@
// Catch the updates that came in after the snapshot flush was made.
// Note that we are merging two MRS, it's a hack
MvccSnapshot snap2(mvcc_);
- vector<shared_ptr<CompactionInput> > merge_inputs;
+ vector<shared_ptr<CompactionInput>> merge_inputs;
merge_inputs.push_back(
shared_ptr<CompactionInput>(CompactionInput::Create(*mrs, &schema_, snap2)));
merge_inputs.push_back(
@@ -1093,7 +1094,7 @@
InsertRows(mrs_a.get(), 5, 1);
MvccSnapshot snap(mvcc_);
- vector<shared_ptr<CompactionInput> > merge_inputs {
+ vector<shared_ptr<CompactionInput>> merge_inputs {
shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_a, &schema_, snap)),
shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_b, &schema_, snap))
};
@@ -1113,7 +1114,7 @@
mem_trackers_.tablet_tracker, &mrs_b));
InsertRows(mrs_b.get(), 10, 0);
MvccSnapshot snap(mvcc_);
- vector<shared_ptr<CompactionInput> > merge_inputs {
+ vector<shared_ptr<CompactionInput>> merge_inputs {
shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_a, &schema_, snap)),
shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_b, &schema_, snap))
};
@@ -1238,7 +1239,7 @@
// Despite the overlapping time ranges across these inputs, the compaction
// should go off without a hitch.
MvccSnapshot snap(mvcc_);
- vector<shared_ptr<CompactionInput> > merge_inputs {
+ vector<shared_ptr<CompactionInput>> merge_inputs {
shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_a, &schema_, snap)),
shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_b, &schema_, snap)),
shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_c, &schema_, snap)),
@@ -1255,7 +1256,7 @@
shared_ptr<MemRowSet> mrs_a = CreateInvisibleMRS();
shared_ptr<MemRowSet> mrs_b = CreateInvisibleMRS();
MvccSnapshot snap(mvcc_);
- vector<shared_ptr<CompactionInput> > merge_inputs {
+ vector<shared_ptr<CompactionInput>> merge_inputs {
shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_a, &schema_, snap)),
shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_b, &schema_, snap))
};
diff --git a/src/kudu/tablet/compaction.cc b/src/kudu/tablet/compaction.cc
index 83a08f0..762c92b 100644
--- a/src/kudu/tablet/compaction.cc
+++ b/src/kudu/tablet/compaction.cc
@@ -23,6 +23,7 @@
#include <memory>
#include <ostream>
#include <string>
+#include <type_traits>
#include <unordered_set>
#include <vector>
@@ -47,7 +48,6 @@
#include "kudu/gutil/port.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"
-#include "kudu/tablet/cfile_set.h"
#include "kudu/tablet/delta_store.h"
#include "kudu/tablet/delta_tracker.h"
#include "kudu/tablet/diskrowset.h"
@@ -138,7 +138,7 @@
return has_more_blocks_;
}
- Status PrepareBlock(vector<CompactionInputRow> *block) override {
+ Status PrepareBlock(vector<CompactionInputRow>* block) override {
int num_in_block = iter_->remaining_in_leaf();
block->resize(num_in_block);
@@ -203,7 +203,7 @@
return Status::OK();
}
- const Schema &schema() const override {
+ const Schema& schema() const override {
return iter_->schema();
}
@@ -234,8 +234,8 @@
undo_delta_iter_(std::move(undo_delta_iter)),
mem_(32 * 1024),
block_(&base_iter_->schema(), kRowsPerBlock, &mem_),
- redo_mutation_block_(kRowsPerBlock, static_cast<Mutation *>(nullptr)),
- undo_mutation_block_(kRowsPerBlock, static_cast<Mutation *>(nullptr)) {}
+ redo_mutation_block_(kRowsPerBlock, static_cast<Mutation*>(nullptr)),
+ undo_mutation_block_(kRowsPerBlock, static_cast<Mutation*>(nullptr)) {}
Status Init() override {
ScanSpec spec;
@@ -252,12 +252,12 @@
return base_iter_->HasNext();
}
- Status PrepareBlock(vector<CompactionInputRow> *block) override {
+ Status PrepareBlock(vector<CompactionInputRow>* block) override {
RETURN_NOT_OK(base_iter_->NextBlock(&block_));
std::fill(redo_mutation_block_.begin(), redo_mutation_block_.end(),
- static_cast<Mutation *>(nullptr));
+ static_cast<Mutation*>(nullptr));
std::fill(undo_mutation_block_.begin(), undo_mutation_block_.end(),
- static_cast<Mutation *>(nullptr));
+ static_cast<Mutation*>(nullptr));
RETURN_NOT_OK(redo_delta_iter_->PrepareBatch(
block_.nrows(), DeltaIterator::PREPARE_FOR_COLLECT));
RETURN_NOT_OK(redo_delta_iter_->CollectMutations(&redo_mutation_block_, block_.arena()));
@@ -267,7 +267,7 @@
block->resize(block_.nrows());
for (int i = 0; i < block_.nrows(); i++) {
- CompactionInputRow &input_row = block->at(i);
+ CompactionInputRow& input_row = block->at(i);
input_row.row.Reset(&block_, i);
input_row.redo_head = redo_mutation_block_[i];
Mutation::ReverseMutationList(&input_row.redo_head);
@@ -284,7 +284,7 @@
return Status::OK();
}
- const Schema &schema() const override {
+ const Schema& schema() const override {
return base_iter_->schema();
}
@@ -298,8 +298,8 @@
// The current block of data which has come from the input iterator
RowBlock block_;
- vector<Mutation *> redo_mutation_block_;
- vector<Mutation *> undo_mutation_block_;
+ vector<Mutation*> redo_mutation_block_;
+ vector<Mutation*> undo_mutation_block_;
enum {
kRowsPerBlock = 100
@@ -382,7 +382,9 @@
CHECK(right_last->changelist().is_delete());
}
#endif
- if (redos_overlap) *redos_overlap = false;
+ if (redos_overlap) {
+ *redos_overlap = false;
+ }
return 1;
}
if (right.redo_head == nullptr) {
@@ -397,7 +399,9 @@
CHECK(left_last->changelist().is_delete());
}
#endif
- if (redos_overlap) *redos_overlap = false;
+ if (redos_overlap) {
+ *redos_overlap = false;
+ }
return -1;
}
AdvanceToLastInList(&right_last);
@@ -477,7 +481,7 @@
void CopyMutations(Mutation* from, Mutation** to, Arena* arena) {
Mutation* previous = nullptr;
- for (const Mutation* cur = from; cur != nullptr; cur = cur->acquire_next()) {
+ for (const auto* cur = from; cur != nullptr; cur = cur->acquire_next()) {
Mutation* copy = Mutation::CreateInArena(arena,
cur->timestamp(),
cur->changelist());
@@ -558,7 +562,7 @@
// row of this block is less than the first row of the other block.
// In this case, we can remove the other input from the merge until
// this input's current block has been exhausted.
- bool Dominates(const MergeState &other, const Schema &schema) const {
+ bool Dominates(const MergeState& other, const Schema& schema) const {
DCHECK(!empty());
DCHECK(!other.empty());
@@ -569,15 +573,15 @@
vector<CompactionInputRow> pending;
int pending_idx;
- vector<MergeState *> dominated;
+ vector<MergeState*> dominated;
};
public:
- MergeCompactionInput(const vector<shared_ptr<CompactionInput> > &inputs,
+ MergeCompactionInput(const vector<shared_ptr<CompactionInput>>& inputs,
const Schema* schema)
: schema_(schema),
num_dup_rows_(0) {
- for (const shared_ptr<CompactionInput>& input : inputs) {
+ for (const auto& input : inputs) {
unique_ptr<MergeState> state(new MergeState);
state->input = input;
states_.push_back(state.release());
@@ -589,7 +593,7 @@
}
Status Init() override {
- for (MergeState *state : states_) {
+ for (auto* state : states_) {
RETURN_NOT_OK(state->input->Init());
}
@@ -601,7 +605,7 @@
bool HasMoreBlocks() override {
// Return true if any of the input blocks has more rows pending
// or more blocks which have yet to be pulled.
- for (MergeState *state : states_) {
+ for (auto* state : states_) {
if (!state->empty() ||
state->input->HasMoreBlocks()) {
return true;
@@ -611,7 +615,7 @@
return false;
}
- Status PrepareBlock(vector<CompactionInputRow> *block) override {
+ Status PrepareBlock(vector<CompactionInputRow>* block) override {
CHECK(!states_.empty());
block->clear();
@@ -627,7 +631,7 @@
// but some benchmarks indicated that the simpler code path of the O(n k) merge
// actually ends up a bit faster.
for (int i = 0; i < states_.size(); i++) {
- MergeState *state = states_[i];
+ MergeState* state = states_[i];
if (state->empty()) {
prepared_block_arena_ = state->input->PreparedBlockArena();
@@ -710,7 +714,7 @@
return ProcessEmptyInputs();
}
- const Schema &schema() const override {
+ const Schema& schema() const override {
return *schema_;
}
@@ -725,7 +729,7 @@
Status ProcessEmptyInputs() {
int j = 0;
for (int i = 0; i < states_.size(); i++) {
- MergeState *state = states_[i];
+ MergeState* state = states_[i];
states_[j++] = state;
if (!state->empty()) {
@@ -757,7 +761,7 @@
// all of those dominance relations and remove any that are no longer
// valid.
for (auto it = state->dominated.begin(); it != state->dominated.end(); ++it) {
- MergeState *dominated = *it;
+ auto* dominated = *it;
if (!state->Dominates(*dominated, *schema_)) {
states_.push_back(dominated);
it = state->dominated.erase(it);
@@ -793,7 +797,7 @@
return Status::OK();
}
- bool TryInsertIntoDominanceList(MergeState *dominator, MergeState *candidate) {
+ bool TryInsertIntoDominanceList(MergeState* dominator, MergeState* candidate) {
if (dominator->Dominates(*candidate, *schema_)) {
dominator->dominated.push_back(candidate);
return true;
@@ -883,7 +887,7 @@
}
const Schema* schema_;
- vector<MergeState *> states_;
+ vector<MergeState*> states_;
Arena* prepared_block_arena_;
// Vector to keep blocks that store duplicated row data.
@@ -1048,9 +1052,9 @@
////////////////////////////////////////////////////////////
-Status CompactionInput::Create(const DiskRowSet &rowset,
+Status CompactionInput::Create(const DiskRowSet& rowset,
const Schema* projection,
- const MvccSnapshot &snap,
+ const MvccSnapshot& snap,
const IOContext* io_context,
unique_ptr<CompactionInput>* out) {
CHECK(projection->has_column_ids());
@@ -1082,28 +1086,28 @@
return Status::OK();
}
-CompactionInput *CompactionInput::Create(const MemRowSet &memrowset,
+CompactionInput* CompactionInput::Create(const MemRowSet& memrowset,
const Schema* projection,
- const MvccSnapshot &snap) {
+ const MvccSnapshot& snap) {
CHECK(projection->has_column_ids());
return new MemRowSetCompactionInput(memrowset, snap, projection);
}
-CompactionInput *CompactionInput::Merge(const vector<shared_ptr<CompactionInput> > &inputs,
+CompactionInput* CompactionInput::Merge(const vector<shared_ptr<CompactionInput>>& inputs,
const Schema* schema) {
CHECK(schema->has_column_ids());
return new MergeCompactionInput(inputs, schema);
}
-Status RowSetsInCompaction::CreateCompactionInput(const MvccSnapshot &snap,
+Status RowSetsInCompaction::CreateCompactionInput(const MvccSnapshot& snap,
const Schema* schema,
const IOContext* io_context,
- shared_ptr<CompactionInput> *out) const {
+ shared_ptr<CompactionInput>* out) const {
CHECK(schema->has_column_ids());
- vector<shared_ptr<CompactionInput> > inputs;
- for (const shared_ptr<RowSet> &rs : rowsets_) {
+ vector<shared_ptr<CompactionInput>> inputs;
+ for (const auto& rs : rowsets_) {
unique_ptr<CompactionInput> input;
RETURN_NOT_OK_PREPEND(rs->NewCompactionInput(schema, snap, io_context, &input),
Substitute("Could not create compaction input for rowset $0",
@@ -1123,7 +1127,7 @@
void RowSetsInCompaction::DumpToLog() const {
VLOG(1) << "Selected " << rowsets_.size() << " rowsets to compact:";
// Dump the selected rowsets to the log, and collect corresponding iterators.
- for (const shared_ptr<RowSet> &rs : rowsets_) {
+ for (const auto& rs : rowsets_) {
VLOG(1) << rs->ToString() << "(current size on disk: ~"
<< rs->OnDiskSize() << " bytes)";
}
@@ -1153,8 +1157,8 @@
DVLOG(5) << "Ancient history mark: " << history_gc_opts.ancient_history_mark().ToString()
<< ": " << HybridClock::StringifyTimestamp(history_gc_opts.ancient_history_mark());
- Mutation *prev_undo = nullptr;
- Mutation *undo_mut = *undo_head;
+ Mutation* prev_undo = nullptr;
+ Mutation* undo_mut = *undo_head;
while (undo_mut != nullptr) {
if (history_gc_opts.IsAncientHistory(undo_mut->timestamp())) {
// Drop all undos following this one in the list; Their timestamps will be lower.
@@ -1199,7 +1203,7 @@
Mutation* redo_delete = nullptr;
// Convert the redos into undos.
- for (const Mutation *redo_mut = src_row.redo_head;
+ for (const auto* redo_mut = src_row.redo_head;
redo_mut != nullptr;
redo_mut = redo_mut->acquire_next()) {
@@ -1426,11 +1430,11 @@
}
Status ReupdateMissedDeltas(const IOContext* io_context,
- CompactionInput *input,
+ CompactionInput* input,
const HistoryGcOpts& history_gc_opts,
- const MvccSnapshot &snap_to_exclude,
- const MvccSnapshot &snap_to_include,
- const RowSetVector &output_rowsets) {
+ const MvccSnapshot& snap_to_exclude,
+ const MvccSnapshot& snap_to_include,
+ const RowSetVector& output_rowsets) {
TRACE_EVENT0("tablet", "ReupdateMissedDeltas");
RETURN_NOT_OK(input->Init());
@@ -1438,9 +1442,9 @@
snap_to_exclude.ToString() << " and " << snap_to_include.ToString();
// Collect the disk rowsets that we'll push the updates into.
- deque<DiskRowSet *> diskrowsets;
- for (const shared_ptr<RowSet> &rs : output_rowsets) {
- diskrowsets.push_back(down_cast<DiskRowSet *>(rs.get()));
+ deque<DiskRowSet*> diskrowsets;
+ for (const auto& rs : output_rowsets) {
+ diskrowsets.push_back(down_cast<DiskRowSet*>(rs.get()));
}
// The set of updated delta trackers.
@@ -1462,11 +1466,11 @@
while (input->HasMoreBlocks()) {
RETURN_NOT_OK(input->PrepareBlock(&rows));
- for (const CompactionInputRow &row : rows) {
+ for (const auto& row : rows) {
DVLOG(4) << "Revisiting row: " << CompactionInputRowToString(row);
bool is_garbage_collected = false;
- for (const Mutation *mut = row.redo_head;
+ for (const auto* mut = row.redo_head;
mut != nullptr;
mut = mut->acquire_next()) {
is_garbage_collected = false;
@@ -1588,7 +1592,7 @@
{
TRACE_EVENT0("tablet", "Flushing missed deltas");
- for (DeltaTracker* tracker : updated_trackers) {
+ for (auto* tracker : updated_trackers) {
VLOG(1) << "Flushing DeltaTracker updated with missed deltas...";
RETURN_NOT_OK_PREPEND(tracker->Flush(io_context, DeltaTracker::NO_FLUSH_METADATA),
"Could not flush delta tracker after missed delta update");
@@ -1599,14 +1603,14 @@
}
-Status DebugDumpCompactionInput(CompactionInput *input, vector<string> *lines) {
+Status DebugDumpCompactionInput(CompactionInput* input, vector<string>* lines) {
RETURN_NOT_OK(input->Init());
vector<CompactionInputRow> rows;
while (input->HasMoreBlocks()) {
RETURN_NOT_OK(input->PrepareBlock(&rows));
- for (const CompactionInputRow &input_row : rows) {
+ for (const auto& input_row : rows) {
LOG_STRING(INFO, lines) << CompactionInputRowToString(input_row);
}
diff --git a/src/kudu/tablet/compaction.h b/src/kudu/tablet/compaction.h
index 7ffd7e3..f6e7dae 100644
--- a/src/kudu/tablet/compaction.h
+++ b/src/kudu/tablet/compaction.h
@@ -104,25 +104,25 @@
// need to call snap.IsApplied() on each mutation.
//
// TODO: can we make the above less messy?
- static Status Create(const DiskRowSet &rowset,
+ static Status Create(const DiskRowSet& rowset,
const Schema* projection,
- const MvccSnapshot &snap,
+ const MvccSnapshot& snap,
const fs::IOContext* io_context,
std::unique_ptr<CompactionInput>* out);
// Create an input which reads from the given memrowset, yielding base rows and updates
// prior to the given snapshot.
- static CompactionInput *Create(const MemRowSet &memrowset,
+ static CompactionInput* Create(const MemRowSet& memrowset,
const Schema* projection,
- const MvccSnapshot &snap);
+ const MvccSnapshot& snap);
// Create an input which merges several other compaction inputs. The inputs are merged
// in key-order according to the given schema. All inputs must have matching schemas.
- static CompactionInput *Merge(const std::vector<std::shared_ptr<CompactionInput> > &inputs,
- const Schema *schema);
+ static CompactionInput* Merge(const std::vector<std::shared_ptr<CompactionInput>>& inputs,
+ const Schema* schema);
virtual Status Init() = 0;
- virtual Status PrepareBlock(std::vector<CompactionInputRow> *block) = 0;
+ virtual Status PrepareBlock(std::vector<CompactionInputRow>* block) = 0;
// Returns the arena for this compaction input corresponding to the last
// prepared block. This must be called *after* PrepareBlock() as if this
@@ -132,7 +132,7 @@
virtual Status FinishBlock() = 0;
virtual bool HasMoreBlocks() = 0;
- virtual const Schema &schema() const = 0;
+ virtual const Schema& schema() const = 0;
virtual ~CompactionInput() {}
};
@@ -140,7 +140,7 @@
// The set of rowsets which are taking part in a given compaction.
class RowSetsInCompaction {
public:
- void AddRowSet(const std::shared_ptr<RowSet> &rowset,
+ void AddRowSet(const std::shared_ptr<RowSet>& rowset,
std::unique_lock<std::mutex> lock) {
CHECK(lock.owns_lock());
@@ -153,15 +153,15 @@
//
// 'schema' is the schema for the output of the compaction, and must remain valid
// for the lifetime of the returned CompactionInput.
- Status CreateCompactionInput(const MvccSnapshot &snap,
+ Status CreateCompactionInput(const MvccSnapshot& snap,
const Schema* schema,
const fs::IOContext* io_context,
- std::shared_ptr<CompactionInput> *out) const;
+ std::shared_ptr<CompactionInput>* out) const;
// Dump a log message indicating the chosen rowsets.
void DumpToLog() const;
- const RowSetVector &rowsets() const { return rowsets_; }
+ const RowSetVector& rowsets() const { return rowsets_; }
size_t num_rowsets() const {
return rowsets_.size();
@@ -230,9 +230,9 @@
Status FlushCompactionInput(const std::string& tablet_id,
const fs::FsErrorManager* error_manager,
CompactionInput* input,
- const MvccSnapshot &snap,
+ const MvccSnapshot& snap,
const HistoryGcOpts& history_gc_opts,
- RollingDiskRowSetWriter *out);
+ RollingDiskRowSetWriter* out);
// Iterate through this compaction input, finding any mutations which came
// between snap_to_exclude and snap_to_include (ie those ops that were not yet
@@ -245,15 +245,15 @@
// After return of this function, this CompactionInput object is "used up" and will
// yield no further rows.
Status ReupdateMissedDeltas(const fs::IOContext* io_context,
- CompactionInput *input,
+ CompactionInput* input,
const HistoryGcOpts& history_gc_opts,
- const MvccSnapshot &snap_to_exclude,
- const MvccSnapshot &snap_to_include,
- const RowSetVector &output_rowsets);
+ const MvccSnapshot& snap_to_exclude,
+ const MvccSnapshot& snap_to_include,
+ const RowSetVector& output_rowsets);
// Dump the given compaction input to 'lines' or LOG(INFO) if it is NULL.
// This consumes all of the input in the compaction input.
-Status DebugDumpCompactionInput(CompactionInput *input, std::vector<std::string> *lines);
+Status DebugDumpCompactionInput(CompactionInput* input, std::vector<std::string>* lines);
// Helper methods to print a row with full history.
std::string RowToString(const RowBlockRow& row,