[compaction] style guide-related updates

I was debugging one compaction issue and found that compaction-related
code in compaction-test.cc isn't robust enough.  This patch just updates
the code in compaction.{h,cc} and compaction-test.cc to conform with the
project's style guidelines.  A follow-up patch is to make the code
in compaction-test.cc more robust.

There are no functional changes in this patch.

Change-Id: I4ef1981dad33a6bc2d0911630fd73c95b91e45fe
Reviewed-on: http://gerrit.cloudera.org:8080/18974
Tested-by: Kudu Jenkins
Reviewed-by: Yifan Zhang <chinazhangyifan@163.com>
diff --git a/src/kudu/tablet/compaction-test.cc b/src/kudu/tablet/compaction-test.cc
index 654b95c..e33f841 100644
--- a/src/kudu/tablet/compaction-test.cc
+++ b/src/kudu/tablet/compaction-test.cc
@@ -27,6 +27,7 @@
 #include <random>
 #include <string>
 #include <thread>
+#include <type_traits>
 #include <unordered_set>
 #include <vector>
 
@@ -61,7 +62,6 @@
 #include "kudu/tablet/mutation.h"
 #include "kudu/tablet/mvcc.h"
 #include "kudu/tablet/rowset.h"
-#include "kudu/tablet/rowset_metadata.h"
 #include "kudu/tablet/tablet-harness.h"
 #include "kudu/tablet/tablet-test-util.h"
 #include "kudu/tablet/tablet.h"
@@ -92,23 +92,24 @@
 
 DECLARE_string(block_manager);
 
+using kudu::consensus::OpId;
+using kudu::log::LogAnchorRegistry;
 using std::shared_ptr;
 using std::string;
 using std::thread;
 using std::unique_ptr;
 using std::unordered_set;
 using std::vector;
+using strings::Substitute;
 
 namespace kudu {
 namespace tablet {
 
-using consensus::OpId;
-using log::LogAnchorRegistry;
-using strings::Substitute;
+class RowSetMetadata;
 
-static const char *kRowKeyFormat = "hello %08d";
-static const size_t kLargeRollThreshold = 1024 * 1024 * 1024; // 1GB
-static const size_t kSmallRollThreshold = 1024; // 1KB
+constexpr const char* const kRowKeyFormat = "hello %08d";
+constexpr const size_t kLargeRollThreshold = 1024 * 1024 * 1024; // 1GB
+constexpr const size_t kSmallRollThreshold = 1024; // 1KB
 
 class TestCompaction : public KuduRowSetTest {
  public:
@@ -131,7 +132,7 @@
 
   // Insert n_rows rows of data.
   // Each row is the tuple: (string key=hello <n*10 + delta>, val=<n>)
-  void InsertRows(MemRowSet *mrs, int n_rows, int delta) {
+  void InsertRows(MemRowSet* mrs, int n_rows, int delta) {
     for (int32_t i = 0; i < n_rows; i++) {
       InsertRow(mrs, i * 10 + delta, i);
     }
@@ -140,7 +141,7 @@
   // Inserts a row.
   // The 'nullable_val' column is set to either NULL (when val is odd)
   // or 'val' (when val is even).
-  void InsertRow(MemRowSet *mrs, int row_key, int32_t val) {
+  void InsertRow(MemRowSet* mrs, int row_key, int32_t val) {
     ScopedOp op(&mvcc_, clock_.Now());
     op.StartApplying();
     InsertRowInOp(mrs, op, row_key, val);
@@ -179,7 +180,7 @@
     }
   }
 
-  void InsertRowInOp(MemRowSet *mrs,
+  void InsertRowInOp(MemRowSet* mrs,
                      const ScopedOp& op,
                      int row_key,
                      int32_t val) {
@@ -204,21 +205,21 @@
   // If 'val' is even, 'nullable_val' is set to NULL. Otherwise, set to 'val'.
   // Note that this is the opposite of InsertRow() above, so that the updates
   // flop NULL to non-NULL and vice versa.
-  void UpdateRows(RowSet *rowset, int n_rows, int delta, int32_t new_val) {
+  void UpdateRows(RowSet* rowset, int n_rows, int delta, int32_t new_val) {
     for (uint32_t i = 0; i < n_rows; i++) {
       SCOPED_TRACE(i);
       UpdateRow(rowset, i * 10 + delta, new_val);
     }
   }
 
-  void UpdateRow(RowSet *rowset, int row_key, int32_t new_val) {
+  void UpdateRow(RowSet* rowset, int row_key, int32_t new_val) {
     ScopedOp op(&mvcc_, clock_.Now());
     op.StartApplying();
     UpdateRowInOp(rowset, op, row_key, new_val);
     op.FinishApplying();
   }
 
-  void UpdateRowInOp(RowSet *rowset,
+  void UpdateRowInOp(RowSet* rowset,
                      const ScopedOp& op,
                      int row_key,
                      int32_t new_val) {
@@ -274,7 +275,7 @@
     op.FinishApplying();
   }
 
-  void DeleteRowInOp(RowSet *rowset, const ScopedOp& op, int row_key) {
+  void DeleteRowInOp(RowSet* rowset, const ScopedOp& op, int row_key) {
     char keybuf[256];
     faststring update_buf;
     snprintf(keybuf, sizeof(keybuf), kRowKeyFormat, row_key);
@@ -300,7 +301,7 @@
 
   // Iterate over the given compaction input, stringifying and dumping each
   // yielded row to *out
-  void IterateInput(CompactionInput *input, vector<string> *out) {
+  void IterateInput(CompactionInput* input, vector<string>* out) {
     ASSERT_OK(DebugDumpCompactionInput(input, out));
   }
 
@@ -308,8 +309,8 @@
   // If 'result_rowsets' is not NULL, reopens the resulting rowset(s) and appends
   // them to the vector.
   void DoFlushAndReopen(
-      CompactionInput *input, const Schema& projection, const MvccSnapshot &snap,
-      int64_t roll_threshold, vector<shared_ptr<DiskRowSet> >* result_rowsets) {
+      CompactionInput* input, const Schema& projection, const MvccSnapshot& snap,
+      int64_t roll_threshold, vector<shared_ptr<DiskRowSet>>* result_rowsets) {
     // Flush with a large roll threshold so we only write a single file.
     // This simplifies the test so we always need to reopen only a single rowset.
     RollingDiskRowSetWriter rsw(tablet()->metadata(), projection,
@@ -321,14 +322,14 @@
                                    input, snap, HistoryGcOpts::Disabled(), &rsw));
     ASSERT_OK(rsw.Finish());
 
-    vector<shared_ptr<RowSetMetadata> > metas;
+    vector<shared_ptr<RowSetMetadata>> metas;
     rsw.GetWrittenRowSetMetadata(&metas);
-    for (const shared_ptr<RowSetMetadata>& meta : metas) {
+    for (const auto& meta : metas) {
       ASSERT_TRUE(meta->HasBloomDataBlockForTests());
     }
     if (result_rowsets) {
       // Re-open the outputs
-      for (const shared_ptr<RowSetMetadata>& meta : metas) {
+      for (const auto& meta : metas) {
         shared_ptr<DiskRowSet> rs;
         ASSERT_OK(DiskRowSet::Open(meta, log_anchor_registry_.get(),
                                    mem_trackers_, nullptr, &rs));
@@ -338,11 +339,11 @@
   }
 
   static Status BuildCompactionInput(const MvccSnapshot& merge_snap,
-                                     const vector<shared_ptr<DiskRowSet> >& rowsets,
+                                     const vector<shared_ptr<DiskRowSet>>& rowsets,
                                      const Schema& projection,
                                      unique_ptr<CompactionInput>* out) {
-    vector<shared_ptr<CompactionInput> > merge_inputs;
-    for (const shared_ptr<DiskRowSet> &rs : rowsets) {
+    vector<shared_ptr<CompactionInput>> merge_inputs;
+    for (const auto& rs : rowsets) {
       unique_ptr<CompactionInput> input;
       RETURN_NOT_OK(CompactionInput::Create(*rs, &projection, merge_snap, nullptr, &input));
       merge_inputs.push_back(shared_ptr<CompactionInput>(input.release()));
@@ -354,9 +355,9 @@
   // Compacts a set of DRSs.
   // If 'result_rowsets' is not NULL, reopens the resulting rowset(s) and appends
   // them to the vector.
-  Status CompactAndReopen(const vector<shared_ptr<DiskRowSet> >& rowsets,
+  Status CompactAndReopen(const vector<shared_ptr<DiskRowSet>>& rowsets,
                           const Schema& projection, int64_t roll_threshold,
-                          vector<shared_ptr<DiskRowSet> >* result_rowsets) {
+                          vector<shared_ptr<DiskRowSet>>* result_rowsets) {
     MvccSnapshot merge_snap(mvcc_);
     unique_ptr<CompactionInput> compact_input;
     RETURN_NOT_OK(BuildCompactionInput(merge_snap, rowsets, projection, &compact_input));
@@ -366,10 +367,10 @@
   }
 
   // Same as above, but sets a high roll threshold so it only produces a single output.
-  void CompactAndReopenNoRoll(const vector<shared_ptr<DiskRowSet> >& input_rowsets,
+  void CompactAndReopenNoRoll(const vector<shared_ptr<DiskRowSet>>& input_rowsets,
                               const Schema& projection,
                               shared_ptr<DiskRowSet>* result_rs) {
-    vector<shared_ptr<DiskRowSet> > result_rowsets;
+    vector<shared_ptr<DiskRowSet>> result_rowsets;
     CompactAndReopen(input_rowsets, projection, kLargeRollThreshold, &result_rowsets);
     ASSERT_EQ(1, result_rowsets.size());
     *result_rs = result_rowsets[0];
@@ -380,9 +381,9 @@
   // them to the vector.
   void FlushMRSAndReopen(const MemRowSet& mrs, const Schema& projection,
                          int64_t roll_threshold,
-                         vector<shared_ptr<DiskRowSet> >* result_rowsets) {
+                         vector<shared_ptr<DiskRowSet>>* result_rowsets) {
     MvccSnapshot snap(mvcc_);
-    vector<shared_ptr<RowSetMetadata> > rowset_metas;
+    vector<shared_ptr<RowSetMetadata>> rowset_metas;
     unique_ptr<CompactionInput> input(CompactionInput::Create(mrs, &projection, snap));
     DoFlushAndReopen(input.get(), projection, snap, roll_threshold, result_rowsets);
   }
@@ -390,7 +391,7 @@
   // Same as above, but sets a high roll threshold so it only produces a single output.
   void FlushMRSAndReopenNoRoll(const MemRowSet& mrs, const Schema& projection,
                             shared_ptr<DiskRowSet>* result_rs) {
-    vector<shared_ptr<DiskRowSet> > rowsets;
+    vector<shared_ptr<DiskRowSet>> rowsets;
     FlushMRSAndReopen(mrs, projection, kLargeRollThreshold, &rowsets);
     ASSERT_EQ(1, rowsets.size());
     *result_rs = rowsets[0];
@@ -421,11 +422,11 @@
   // each of the input schemas. The output rowset will
   // have the 'projection' schema.
   void DoMerge(const Schema& projection, const vector<Schema>& schemas) {
-    vector<shared_ptr<DiskRowSet> > rowsets;
+    vector<shared_ptr<DiskRowSet>> rowsets;
 
     // Create one input rowset for each of the input schemas
     int delta = 0;
-    for (const Schema& schema : schemas) {
+    for (const auto& schema : schemas) {
       // Create a memrowset with a bunch of rows and updates.
       shared_ptr<MemRowSet> mrs;
       CHECK_OK(MemRowSet::Create(delta, schema, log_anchor_registry_.get(),
@@ -457,7 +458,7 @@
 
   template<bool OVERLAP_INPUTS>
   void DoBenchmark() {
-    vector<shared_ptr<DiskRowSet> > rowsets;
+    vector<shared_ptr<DiskRowSet>> rowsets;
 
     if (FLAGS_merge_benchmark_input_dir.empty()) {
       // Create inputs.
@@ -494,7 +495,7 @@
       scoped_refptr<TabletMetadata> input_meta;
       ASSERT_OK(TabletMetadata::Load(&fs_manager, tablet_id, &input_meta));
 
-      for (const shared_ptr<RowSetMetadata>& meta : input_meta->rowsets()) {
+      for (const auto& meta : input_meta->rowsets()) {
         shared_ptr<DiskRowSet> rs;
         CHECK_OK(DiskRowSet::Open(meta, log_anchor_registry_.get(),
                                   mem_trackers_, nullptr, &rs));
@@ -576,7 +577,7 @@
                               mem_trackers_.tablet_tracker, &mrs));
   InsertRows(mrs.get(), 30000, 0);
 
-  vector<shared_ptr<DiskRowSet> > rowsets;
+  vector<shared_ptr<DiskRowSet>> rowsets;
   FlushMRSAndReopen(*mrs, schema_, kSmallRollThreshold, &rowsets);
   ASSERT_GT(rowsets.size(), 1);
 
@@ -676,7 +677,7 @@
   }
 
   shared_ptr<DiskRowSet> result;
-  vector<shared_ptr<DiskRowSet> > all_rss;
+  vector<shared_ptr<DiskRowSet>> all_rss;
   all_rss.push_back(rs3);
   all_rss.push_back(rs1);
   all_rss.push_back(rs2);
@@ -857,7 +858,7 @@
   }
 
   vector<shared_ptr<CompactionInput>> inputs;
-  for (auto& row_set : row_sets) {
+  for (const auto& row_set : row_sets) {
     unique_ptr<CompactionInput> ci;
     CHECK_OK(row_set->NewCompactionInput(&schema_, all_snap, nullptr, &ci));
     inputs.push_back(shared_ptr<CompactionInput>(ci.release()));
@@ -1030,7 +1031,7 @@
   // Catch the updates that came in after the snapshot flush was made.
   // Note that we are merging two MRS, it's a hack
   MvccSnapshot snap2(mvcc_);
-  vector<shared_ptr<CompactionInput> > merge_inputs;
+  vector<shared_ptr<CompactionInput>> merge_inputs;
   merge_inputs.push_back(
         shared_ptr<CompactionInput>(CompactionInput::Create(*mrs, &schema_, snap2)));
   merge_inputs.push_back(
@@ -1093,7 +1094,7 @@
   InsertRows(mrs_a.get(), 5, 1);
 
   MvccSnapshot snap(mvcc_);
-  vector<shared_ptr<CompactionInput> > merge_inputs {
+  vector<shared_ptr<CompactionInput>> merge_inputs {
     shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_a, &schema_, snap)),
     shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_b, &schema_, snap))
   };
@@ -1113,7 +1114,7 @@
                               mem_trackers_.tablet_tracker, &mrs_b));
   InsertRows(mrs_b.get(), 10, 0);
   MvccSnapshot snap(mvcc_);
-  vector<shared_ptr<CompactionInput> > merge_inputs {
+  vector<shared_ptr<CompactionInput>> merge_inputs {
     shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_a, &schema_, snap)),
     shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_b, &schema_, snap))
   };
@@ -1238,7 +1239,7 @@
   // Despite the overlapping time ranges across these inputs, the compaction
   // should go off without a hitch.
   MvccSnapshot snap(mvcc_);
-  vector<shared_ptr<CompactionInput> > merge_inputs {
+  vector<shared_ptr<CompactionInput>> merge_inputs {
     shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_a, &schema_, snap)),
     shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_b, &schema_, snap)),
     shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_c, &schema_, snap)),
@@ -1255,7 +1256,7 @@
   shared_ptr<MemRowSet> mrs_a = CreateInvisibleMRS();
   shared_ptr<MemRowSet> mrs_b = CreateInvisibleMRS();
   MvccSnapshot snap(mvcc_);
-  vector<shared_ptr<CompactionInput> > merge_inputs {
+  vector<shared_ptr<CompactionInput>> merge_inputs {
     shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_a, &schema_, snap)),
     shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_b, &schema_, snap))
   };
diff --git a/src/kudu/tablet/compaction.cc b/src/kudu/tablet/compaction.cc
index 83a08f0..762c92b 100644
--- a/src/kudu/tablet/compaction.cc
+++ b/src/kudu/tablet/compaction.cc
@@ -23,6 +23,7 @@
 #include <memory>
 #include <ostream>
 #include <string>
+#include <type_traits>
 #include <unordered_set>
 #include <vector>
 
@@ -47,7 +48,6 @@
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/substitute.h"
-#include "kudu/tablet/cfile_set.h"
 #include "kudu/tablet/delta_store.h"
 #include "kudu/tablet/delta_tracker.h"
 #include "kudu/tablet/diskrowset.h"
@@ -138,7 +138,7 @@
     return has_more_blocks_;
   }
 
-  Status PrepareBlock(vector<CompactionInputRow> *block) override {
+  Status PrepareBlock(vector<CompactionInputRow>* block) override {
     int num_in_block = iter_->remaining_in_leaf();
     block->resize(num_in_block);
 
@@ -203,7 +203,7 @@
     return Status::OK();
   }
 
-  const Schema &schema() const override {
+  const Schema& schema() const override {
     return iter_->schema();
   }
 
@@ -234,8 +234,8 @@
         undo_delta_iter_(std::move(undo_delta_iter)),
         mem_(32 * 1024),
         block_(&base_iter_->schema(), kRowsPerBlock, &mem_),
-        redo_mutation_block_(kRowsPerBlock, static_cast<Mutation *>(nullptr)),
-        undo_mutation_block_(kRowsPerBlock, static_cast<Mutation *>(nullptr)) {}
+        redo_mutation_block_(kRowsPerBlock, static_cast<Mutation*>(nullptr)),
+        undo_mutation_block_(kRowsPerBlock, static_cast<Mutation*>(nullptr)) {}
 
   Status Init() override {
     ScanSpec spec;
@@ -252,12 +252,12 @@
     return base_iter_->HasNext();
   }
 
-  Status PrepareBlock(vector<CompactionInputRow> *block) override {
+  Status PrepareBlock(vector<CompactionInputRow>* block) override {
     RETURN_NOT_OK(base_iter_->NextBlock(&block_));
     std::fill(redo_mutation_block_.begin(), redo_mutation_block_.end(),
-              static_cast<Mutation *>(nullptr));
+              static_cast<Mutation*>(nullptr));
     std::fill(undo_mutation_block_.begin(), undo_mutation_block_.end(),
-                  static_cast<Mutation *>(nullptr));
+                  static_cast<Mutation*>(nullptr));
     RETURN_NOT_OK(redo_delta_iter_->PrepareBatch(
                       block_.nrows(), DeltaIterator::PREPARE_FOR_COLLECT));
     RETURN_NOT_OK(redo_delta_iter_->CollectMutations(&redo_mutation_block_, block_.arena()));
@@ -267,7 +267,7 @@
 
     block->resize(block_.nrows());
     for (int i = 0; i < block_.nrows(); i++) {
-      CompactionInputRow &input_row = block->at(i);
+      CompactionInputRow& input_row = block->at(i);
       input_row.row.Reset(&block_, i);
       input_row.redo_head = redo_mutation_block_[i];
       Mutation::ReverseMutationList(&input_row.redo_head);
@@ -284,7 +284,7 @@
     return Status::OK();
   }
 
-  const Schema &schema() const override {
+  const Schema& schema() const override {
     return base_iter_->schema();
   }
 
@@ -298,8 +298,8 @@
 
   // The current block of data which has come from the input iterator
   RowBlock block_;
-  vector<Mutation *> redo_mutation_block_;
-  vector<Mutation *> undo_mutation_block_;
+  vector<Mutation*> redo_mutation_block_;
+  vector<Mutation*> undo_mutation_block_;
 
   enum {
     kRowsPerBlock = 100
@@ -382,7 +382,9 @@
       CHECK(right_last->changelist().is_delete());
     }
 #endif
-    if (redos_overlap) *redos_overlap = false;
+    if (redos_overlap) {
+      *redos_overlap = false;
+    }
     return 1;
   }
   if (right.redo_head == nullptr) {
@@ -397,7 +399,9 @@
       CHECK(left_last->changelist().is_delete());
     }
 #endif
-    if (redos_overlap) *redos_overlap = false;
+    if (redos_overlap) {
+      *redos_overlap = false;
+    }
     return -1;
   }
   AdvanceToLastInList(&right_last);
@@ -477,7 +481,7 @@
 
 void CopyMutations(Mutation* from, Mutation** to, Arena* arena) {
   Mutation* previous = nullptr;
-  for (const Mutation* cur = from; cur != nullptr; cur = cur->acquire_next()) {
+  for (const auto* cur = from; cur != nullptr; cur = cur->acquire_next()) {
     Mutation* copy = Mutation::CreateInArena(arena,
                                              cur->timestamp(),
                                              cur->changelist());
@@ -558,7 +562,7 @@
     // row of this block is less than the first row of the other block.
     // In this case, we can remove the other input from the merge until
     // this input's current block has been exhausted.
-    bool Dominates(const MergeState &other, const Schema &schema) const {
+    bool Dominates(const MergeState& other, const Schema& schema) const {
       DCHECK(!empty());
       DCHECK(!other.empty());
 
@@ -569,15 +573,15 @@
     vector<CompactionInputRow> pending;
     int pending_idx;
 
-    vector<MergeState *> dominated;
+    vector<MergeState*> dominated;
   };
 
  public:
-  MergeCompactionInput(const vector<shared_ptr<CompactionInput> > &inputs,
+  MergeCompactionInput(const vector<shared_ptr<CompactionInput>>& inputs,
                        const Schema* schema)
     : schema_(schema),
       num_dup_rows_(0) {
-    for (const shared_ptr<CompactionInput>& input : inputs) {
+    for (const auto& input : inputs) {
       unique_ptr<MergeState> state(new MergeState);
       state->input = input;
       states_.push_back(state.release());
@@ -589,7 +593,7 @@
   }
 
   Status Init() override {
-    for (MergeState *state : states_) {
+    for (auto* state : states_) {
       RETURN_NOT_OK(state->input->Init());
     }
 
@@ -601,7 +605,7 @@
   bool HasMoreBlocks() override {
     // Return true if any of the input blocks has more rows pending
     // or more blocks which have yet to be pulled.
-    for (MergeState *state : states_) {
+    for (auto* state : states_) {
       if (!state->empty() ||
           state->input->HasMoreBlocks()) {
         return true;
@@ -611,7 +615,7 @@
     return false;
   }
 
-  Status PrepareBlock(vector<CompactionInputRow> *block) override {
+  Status PrepareBlock(vector<CompactionInputRow>* block) override {
     CHECK(!states_.empty());
 
     block->clear();
@@ -627,7 +631,7 @@
       // but some benchmarks indicated that the simpler code path of the O(n k) merge
       // actually ends up a bit faster.
       for (int i = 0; i < states_.size(); i++) {
-        MergeState *state = states_[i];
+        MergeState* state = states_[i];
 
         if (state->empty()) {
           prepared_block_arena_ = state->input->PreparedBlockArena();
@@ -710,7 +714,7 @@
     return ProcessEmptyInputs();
   }
 
-  const Schema &schema() const override {
+  const Schema& schema() const override {
     return *schema_;
   }
 
@@ -725,7 +729,7 @@
   Status ProcessEmptyInputs() {
     int j = 0;
     for (int i = 0; i < states_.size(); i++) {
-      MergeState *state = states_[i];
+      MergeState* state = states_[i];
       states_[j++] = state;
 
       if (!state->empty()) {
@@ -757,7 +761,7 @@
       // all of those dominance relations and remove any that are no longer
       // valid.
       for (auto it = state->dominated.begin(); it != state->dominated.end(); ++it) {
-        MergeState *dominated = *it;
+        auto* dominated = *it;
         if (!state->Dominates(*dominated, *schema_)) {
           states_.push_back(dominated);
           it = state->dominated.erase(it);
@@ -793,7 +797,7 @@
     return Status::OK();
   }
 
-  bool TryInsertIntoDominanceList(MergeState *dominator, MergeState *candidate) {
+  bool TryInsertIntoDominanceList(MergeState* dominator, MergeState* candidate) {
     if (dominator->Dominates(*candidate, *schema_)) {
       dominator->dominated.push_back(candidate);
       return true;
@@ -883,7 +887,7 @@
   }
 
   const Schema* schema_;
-  vector<MergeState *> states_;
+  vector<MergeState*> states_;
   Arena* prepared_block_arena_;
 
   // Vector to keep blocks that store duplicated row data.
@@ -1048,9 +1052,9 @@
 
 ////////////////////////////////////////////////////////////
 
-Status CompactionInput::Create(const DiskRowSet &rowset,
+Status CompactionInput::Create(const DiskRowSet& rowset,
                                const Schema* projection,
-                               const MvccSnapshot &snap,
+                               const MvccSnapshot& snap,
                                const IOContext* io_context,
                                unique_ptr<CompactionInput>* out) {
   CHECK(projection->has_column_ids());
@@ -1082,28 +1086,28 @@
   return Status::OK();
 }
 
-CompactionInput *CompactionInput::Create(const MemRowSet &memrowset,
+CompactionInput* CompactionInput::Create(const MemRowSet& memrowset,
                                          const Schema* projection,
-                                         const MvccSnapshot &snap) {
+                                         const MvccSnapshot& snap) {
   CHECK(projection->has_column_ids());
   return new MemRowSetCompactionInput(memrowset, snap, projection);
 }
 
-CompactionInput *CompactionInput::Merge(const vector<shared_ptr<CompactionInput> > &inputs,
+CompactionInput* CompactionInput::Merge(const vector<shared_ptr<CompactionInput>>& inputs,
                                         const Schema* schema) {
   CHECK(schema->has_column_ids());
   return new MergeCompactionInput(inputs, schema);
 }
 
 
-Status RowSetsInCompaction::CreateCompactionInput(const MvccSnapshot &snap,
+Status RowSetsInCompaction::CreateCompactionInput(const MvccSnapshot& snap,
                                                   const Schema* schema,
                                                   const IOContext* io_context,
-                                                  shared_ptr<CompactionInput> *out) const {
+                                                  shared_ptr<CompactionInput>* out) const {
   CHECK(schema->has_column_ids());
 
-  vector<shared_ptr<CompactionInput> > inputs;
-  for (const shared_ptr<RowSet> &rs : rowsets_) {
+  vector<shared_ptr<CompactionInput>> inputs;
+  for (const auto& rs : rowsets_) {
     unique_ptr<CompactionInput> input;
     RETURN_NOT_OK_PREPEND(rs->NewCompactionInput(schema, snap, io_context, &input),
                           Substitute("Could not create compaction input for rowset $0",
@@ -1123,7 +1127,7 @@
 void RowSetsInCompaction::DumpToLog() const {
   VLOG(1) << "Selected " << rowsets_.size() << " rowsets to compact:";
   // Dump the selected rowsets to the log, and collect corresponding iterators.
-  for (const shared_ptr<RowSet> &rs : rowsets_) {
+  for (const auto& rs : rowsets_) {
     VLOG(1) << rs->ToString() << "(current size on disk: ~"
             << rs->OnDiskSize() << " bytes)";
   }
@@ -1153,8 +1157,8 @@
   DVLOG(5) << "Ancient history mark: " << history_gc_opts.ancient_history_mark().ToString()
             << ": " << HybridClock::StringifyTimestamp(history_gc_opts.ancient_history_mark());
 
-  Mutation *prev_undo = nullptr;
-  Mutation *undo_mut = *undo_head;
+  Mutation* prev_undo = nullptr;
+  Mutation* undo_mut = *undo_head;
   while (undo_mut != nullptr) {
     if (history_gc_opts.IsAncientHistory(undo_mut->timestamp())) {
       // Drop all undos following this one in the list; Their timestamps will be lower.
@@ -1199,7 +1203,7 @@
   Mutation* redo_delete = nullptr;
 
   // Convert the redos into undos.
-  for (const Mutation *redo_mut = src_row.redo_head;
+  for (const auto* redo_mut = src_row.redo_head;
        redo_mut != nullptr;
        redo_mut = redo_mut->acquire_next()) {
 
@@ -1426,11 +1430,11 @@
 }
 
 Status ReupdateMissedDeltas(const IOContext* io_context,
-                            CompactionInput *input,
+                            CompactionInput* input,
                             const HistoryGcOpts& history_gc_opts,
-                            const MvccSnapshot &snap_to_exclude,
-                            const MvccSnapshot &snap_to_include,
-                            const RowSetVector &output_rowsets) {
+                            const MvccSnapshot& snap_to_exclude,
+                            const MvccSnapshot& snap_to_include,
+                            const RowSetVector& output_rowsets) {
   TRACE_EVENT0("tablet", "ReupdateMissedDeltas");
   RETURN_NOT_OK(input->Init());
 
@@ -1438,9 +1442,9 @@
     snap_to_exclude.ToString() << " and " << snap_to_include.ToString();
 
   // Collect the disk rowsets that we'll push the updates into.
-  deque<DiskRowSet *> diskrowsets;
-  for (const shared_ptr<RowSet> &rs : output_rowsets) {
-    diskrowsets.push_back(down_cast<DiskRowSet *>(rs.get()));
+  deque<DiskRowSet*> diskrowsets;
+  for (const auto& rs : output_rowsets) {
+    diskrowsets.push_back(down_cast<DiskRowSet*>(rs.get()));
   }
 
   // The set of updated delta trackers.
@@ -1462,11 +1466,11 @@
   while (input->HasMoreBlocks()) {
     RETURN_NOT_OK(input->PrepareBlock(&rows));
 
-    for (const CompactionInputRow &row : rows) {
+    for (const auto& row : rows) {
       DVLOG(4) << "Revisiting row: " << CompactionInputRowToString(row);
 
       bool is_garbage_collected = false;
-      for (const Mutation *mut = row.redo_head;
+      for (const auto* mut = row.redo_head;
            mut != nullptr;
            mut = mut->acquire_next()) {
         is_garbage_collected = false;
@@ -1588,7 +1592,7 @@
 
   {
     TRACE_EVENT0("tablet", "Flushing missed deltas");
-    for (DeltaTracker* tracker : updated_trackers) {
+    for (auto* tracker : updated_trackers) {
       VLOG(1) << "Flushing DeltaTracker updated with missed deltas...";
       RETURN_NOT_OK_PREPEND(tracker->Flush(io_context, DeltaTracker::NO_FLUSH_METADATA),
                             "Could not flush delta tracker after missed delta update");
@@ -1599,14 +1603,14 @@
 }
 
 
-Status DebugDumpCompactionInput(CompactionInput *input, vector<string> *lines) {
+Status DebugDumpCompactionInput(CompactionInput* input, vector<string>* lines) {
   RETURN_NOT_OK(input->Init());
   vector<CompactionInputRow> rows;
 
   while (input->HasMoreBlocks()) {
     RETURN_NOT_OK(input->PrepareBlock(&rows));
 
-    for (const CompactionInputRow &input_row : rows) {
+    for (const auto& input_row : rows) {
       LOG_STRING(INFO, lines) << CompactionInputRowToString(input_row);
     }
 
diff --git a/src/kudu/tablet/compaction.h b/src/kudu/tablet/compaction.h
index 7ffd7e3..f6e7dae 100644
--- a/src/kudu/tablet/compaction.h
+++ b/src/kudu/tablet/compaction.h
@@ -104,25 +104,25 @@
   // need to call snap.IsApplied() on each mutation.
   //
   // TODO: can we make the above less messy?
-  static Status Create(const DiskRowSet &rowset,
+  static Status Create(const DiskRowSet& rowset,
                        const Schema* projection,
-                       const MvccSnapshot &snap,
+                       const MvccSnapshot& snap,
                        const fs::IOContext* io_context,
                        std::unique_ptr<CompactionInput>* out);
 
   // Create an input which reads from the given memrowset, yielding base rows and updates
   // prior to the given snapshot.
-  static CompactionInput *Create(const MemRowSet &memrowset,
+  static CompactionInput* Create(const MemRowSet& memrowset,
                                  const Schema* projection,
-                                 const MvccSnapshot &snap);
+                                 const MvccSnapshot& snap);
 
   // Create an input which merges several other compaction inputs. The inputs are merged
   // in key-order according to the given schema. All inputs must have matching schemas.
-  static CompactionInput *Merge(const std::vector<std::shared_ptr<CompactionInput> > &inputs,
-                                const Schema *schema);
+  static CompactionInput* Merge(const std::vector<std::shared_ptr<CompactionInput>>& inputs,
+                                const Schema* schema);
 
   virtual Status Init() = 0;
-  virtual Status PrepareBlock(std::vector<CompactionInputRow> *block) = 0;
+  virtual Status PrepareBlock(std::vector<CompactionInputRow>* block) = 0;
 
   // Returns the arena for this compaction input corresponding to the last
   // prepared block. This must be called *after* PrepareBlock() as if this
@@ -132,7 +132,7 @@
   virtual Status FinishBlock() = 0;
 
   virtual bool HasMoreBlocks() = 0;
-  virtual const Schema &schema() const = 0;
+  virtual const Schema& schema() const = 0;
 
   virtual ~CompactionInput() {}
 };
@@ -140,7 +140,7 @@
 // The set of rowsets which are taking part in a given compaction.
 class RowSetsInCompaction {
  public:
-  void AddRowSet(const std::shared_ptr<RowSet> &rowset,
+  void AddRowSet(const std::shared_ptr<RowSet>& rowset,
                  std::unique_lock<std::mutex> lock) {
     CHECK(lock.owns_lock());
 
@@ -153,15 +153,15 @@
   //
   // 'schema' is the schema for the output of the compaction, and must remain valid
   // for the lifetime of the returned CompactionInput.
-  Status CreateCompactionInput(const MvccSnapshot &snap,
+  Status CreateCompactionInput(const MvccSnapshot& snap,
                                const Schema* schema,
                                const fs::IOContext* io_context,
-                               std::shared_ptr<CompactionInput> *out) const;
+                               std::shared_ptr<CompactionInput>* out) const;
 
   // Dump a log message indicating the chosen rowsets.
   void DumpToLog() const;
 
-  const RowSetVector &rowsets() const { return rowsets_; }
+  const RowSetVector& rowsets() const { return rowsets_; }
 
   size_t num_rowsets() const {
     return rowsets_.size();
@@ -230,9 +230,9 @@
 Status FlushCompactionInput(const std::string& tablet_id,
                             const fs::FsErrorManager* error_manager,
                             CompactionInput* input,
-                            const MvccSnapshot &snap,
+                            const MvccSnapshot& snap,
                             const HistoryGcOpts& history_gc_opts,
-                            RollingDiskRowSetWriter *out);
+                            RollingDiskRowSetWriter* out);
 
 // Iterate through this compaction input, finding any mutations which came
 // between snap_to_exclude and snap_to_include (ie those ops that were not yet
@@ -245,15 +245,15 @@
 // After return of this function, this CompactionInput object is "used up" and will
 // yield no further rows.
 Status ReupdateMissedDeltas(const fs::IOContext* io_context,
-                            CompactionInput *input,
+                            CompactionInput* input,
                             const HistoryGcOpts& history_gc_opts,
-                            const MvccSnapshot &snap_to_exclude,
-                            const MvccSnapshot &snap_to_include,
-                            const RowSetVector &output_rowsets);
+                            const MvccSnapshot& snap_to_exclude,
+                            const MvccSnapshot& snap_to_include,
+                            const RowSetVector& output_rowsets);
 
 // Dump the given compaction input to 'lines' or LOG(INFO) if it is NULL.
 // This consumes all of the input in the compaction input.
-Status DebugDumpCompactionInput(CompactionInput *input, std::vector<std::string> *lines);
+Status DebugDumpCompactionInput(CompactionInput* input, std::vector<std::string>* lines);
 
 // Helper methods to print a row with full history.
 std::string RowToString(const RowBlockRow& row,